RxJava:封装和使用

作者: Jessewo | 来源:发表于2017-08-13 18:11 被阅读421次

声明:本文只讲封装,基本用法请参考官方文档或者其他文章~

0. 依赖

2.0已出,但是暂时还没有来得及去看;所以还是那目前项目中在用的来讲吧。

    compile 'io.reactivex:rxjava:1.2.4'
    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'com.artemzin.rxjava:proguard-rules:1.2.7.0'
    compile 'com.trello:rxlifecycle:1.0'
    compile 'com.trello:rxlifecycle-components:1.0'
    compile 'com.jakewharton.rxbinding:rxbinding:1.0.0'

以下展示目前项目中最常用的几个例子:

1. View相关

点击事件
  • 使用
Rx.clicks(mBtn, this, v -> doAction());
  • 功能点
  1. 过滤500ms内的重复点击;
  2. 绑定当前activity或者fragment的生命周期,避免内存泄漏;
  • 封装
    //view点击事件,500毫秒过滤重复点击
    public static void clicks(View view, BaseActivity activity, final Action1<Void> onNext) {
        RxView.clicks(view)
                .throttleFirst(500, TimeUnit.MILLISECONDS)
                .compose(activity.bindToLifecycle())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(onNext, e -> e.printStackTrace());
    }
输入框文字变动
  • 使用
Rx.afterTextChangeEvents(this, mEditText, event -> doAction());
  • 功能点
  1. 过滤500ms内的请求,尤其是当输入框文字变动后需要进行网络请求时,可以有效避免产生大量请求;
  2. 绑定当前activity或者fragment的生命周期,避免内存泄漏;
  • 封装
    //TextView watcher,间隔500毫秒
    public static void afterTextChangeEvents(BaseActivity activity, TextView textView, Action1<TextViewAfterTextChangeEvent> onNext) {
        RxTextView.afterTextChangeEvents(textView)
                .throttleLast(500, TimeUnit.MILLISECONDS)
                .compose(activity.bindToLifecycle())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(onNext, e -> e.printStackTrace());
    }

2. 网络请求(划重点!)

  • 使用
 API.uploadImg(picInfo) //以上传图片为例
                .compose(RxTransformers.doApi(this))
                .subscribe((Result result) -> {
                    //to do

                });

简洁到爆炸有木有!!!

  • 功能点
  1. 保留原有的链式调用方式;
  2. subscribe()中可以仅仅传入 onNextonErroronComplete可选,所以再加上lamda加持,做到代码最简洁~(原先若只传入onNext而不传入onError,当网络异常或者onNext执行发生异常时,会抛出OnErrorNotImplementedException);
  3. 线程切换;
  4. 绑定当前页面生命周期,当onPause时,停止未完成的请求(抛掉已经请求来的response,不进行处理);
  5. 请求结果统一预处理:
    5.1 网络异常处理与上报;
    5.2 接口请求错误信息展示;
    5.3 loading UI的显示和隐藏;
    5.4 token失效后的统一处理;
  • 封装
    若想理解以下封装原理,请先通读compose()lift()两个操作符的源码;
API.uploadImg(picInfo)是基于retrofit的封装

此处省略

compose()操作符中传入的自定义Transformer
public class RxTransformers {

    public static <T> Observable.Transformer<T, T> io_main() {
        return (Observable<T> observable) -> observable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
    }

    public static <T> Observable.Transformer<T, T> doApi(BaseActivity activity, HttpResultInterceptor.Type type) {
        return (Observable<T> observable) -> observable
                .compose(io_main())//线程切换
                .compose(activity.bindToLifecycle())//生命周期
                .lift(HttpResultInterceptor.get(activity, type));//请求结果预处理
    }

    public static <T> Observable.Transformer<T, T> doApi(BaseActivity activity) {
        return doApi(activity, HttpResultInterceptor.Type.ALL);
    }

}

线程切换和页面生命周期绑定没啥好讲的,重点是lift()中传入的自定义operator

lift(HttpResultInterceptor.get(activity, type));

请求结果预处理

/**
 * Created by Jessewo on 2017/7/25.
 * <p>
 * 1.API链式调用
 * 2.lamda表达式(第一层subscriber,可以只实现onNext(),onError/onComplete非必须)
 */

public class HttpResultInterceptor {

    private static final String TAG = "HttpResultInterceptor";

    public enum Type {
        /**
         * 1. filter exception and show exception msg when onError<br/>
         * 2. show loading when onStart, and hide when onComplete or onError<br/>
         * 3. show error message when onNext
         */
        ALL,
        /**
         * 1. filter exception and show exception msg when onError<br/>
         * 2. show error message when onNext
         */
        ERROR_MSG,
        /**
         * 1. filter exception and show exception msg when onError<br/>
         * 2. show loading when onStart, and hide when onComplete or onError
         */
        LOADING,
        /**
         * 1. filter exception and show exception msg when onError
         */
        NONE
    }

    public static Observable.Operator get(HttpResultHandler handler, Type type) {
        return new OperatorHttpResult(handler, type);
    }

    private static class OperatorHttpResult<T> implements Observable.Operator<T, T>, Subscription {

        private SoftReference<HttpResultHandler> mHandler;

        private Type mType;

        OperatorHttpResult(HttpResultHandler httpResultHandler) {
            mHandler = new SoftReference<>(httpResultHandler);
        }

        OperatorHttpResult(HttpResultHandler httpResultHandler, Type type) {
            mHandler = new SoftReference<>(httpResultHandler);
            mType = type;
        }

        @Override
        public Subscriber<? super T> call(Subscriber<? super T> subscriber) {
            HttpResultSubscriber<? super T> parent = new HttpResultSubscriber<>(subscriber, mHandler, mType);
            //parent subscriber 独立控制 unSubscribe行为
            parent.add(this);
            return parent;
        }

        @Override
        public void unsubscribe() {
            mHandler.clear();
        }

        @Override
        public boolean isUnsubscribed() {
            return mHandler.get() == null;
        }
    }

    /**
     * 异常处理原则与safeSubscriber稍有不同:<br/>
     * 1. onNext所有异常抓取->onError;(同)<br/>
     * 2. onComplete所有异常抛出;(同)<br/>
     * 3. onError抛出除了OnErrorNotImplementedException之外的所有异常;(不同)
     */
    private static class HttpResultSubscriber<T> extends Subscriber<T> {

        private Subscriber<? super T> mChild;
        private SoftReference<HttpResultHandler> mHandler;
        private boolean showProgress;
        private boolean showError;

        private boolean done;

        HttpResultSubscriber(Subscriber<? super T> child,
                             SoftReference<HttpResultHandler> preHandler,
                             Type type) {
            mChild = child;
            mHandler = preHandler;
            switch (type) {
                case ERROR_MSG:
                    showProgress = false;
                    showError = true;
                    break;
                case LOADING:
                    showProgress = true;
                    showError = false;
                    break;
                case NONE:
                    showProgress = false;
                    showError = false;
                    break;
                default://all
                    showProgress = true;
                    showError = true;
                    break;
            }
        }

        @Override
        public void onStart() {
            //main thread
            showProgress();
        }

        @Override
        public void onCompleted() {
            if (done || isUnsubscribed() || mChild.isUnsubscribed())
                return;
            done = true;
            try {
                dismissProgress();
                mChild.onCompleted();
            } finally {
                try {
                    unsubscribe();
                } catch (Throwable e) {
                    RxJavaHooks.onError(e);
                    throw new OnCompletedFailedException(e.getMessage(), e);
                }
            }
        }

        @Override
        public void onError(Throwable e) {
            if (done || isUnsubscribed() || mChild.isUnsubscribed())
                return;
            done = true;
            try {
                dismissProgress();

                if (e instanceof OnErrorNotImplementedException) {
                    e = e.getCause();
                }
                e.printStackTrace();
                String error = App.getInstance().getString(R.string.error_network);
                if (e instanceof SocketTimeoutException) {
                    ToastUtil.show(error + "(" + NETWORK_ERROR_TIMEOUT + ")");
                } else if (e instanceof ConnectException) {
                    ToastUtil.show(error + "(" + NETWORK_ERROR_INTERRUPTION + ")");
                } else if (e instanceof UnknownHostException
                        || (!TextUtils.isEmpty(e.getMessage()) && e.getMessage().contains("No address associated with hostname"))) {
                    ToastUtil.show(error + "(" + NETWORK_ERROR_UNKNOWN_HOST + ")");
                } else {
                    ToastUtil.show(error + "(" + NETWORK_ERROR_UNKNOWN + ")");
                    Analytics.getInstance().onError(e);//对于非常规异常上报后台监控
                }

                mChild.onError(e);
            } catch (OnErrorNotImplementedException e2) {
                //ignore
                LOG.d(TAG, "onError: OnErrorNotImplementedException");
            } finally {
                try {
                    unsubscribe();
                } catch (Throwable e3) {
                    RxJavaHooks.onError(e3);
                    throw new OnErrorFailedException(e3.getMessage(), e3);
                }
            }
        }

        @Override
        public void onNext(T t) {
            if (done || isUnsubscribed() || mChild.isUnsubscribed())
                return;
            try {
                if (showError) {
                    if (t instanceof Result) {
                        Result result = (Result) t;
                        checkResult(result);
                    } else if (t instanceof MixResult) {
                        MixResult mixResult = (MixResult) t;
                        Result result1 = mixResult.getResult1();
                        Result result2 = mixResult.getResult2();
                        if (result1 != null && result2 != null) {
                            checkResult(result1);
                            checkResult(result2);
                        }
                    }
                }
                mChild.onNext(t);
            } catch (Throwable e) {
                onError(e);
            }
        }

        private void checkResult(Result result) {
            int status = result.getStatus();
            switch (status) {
                case API.SUCCESS_CODE:
                case API.SKIP_CODE:
                    break;
                case API.RELOGIN_CODE:
                    //token失效
                    String msg = result.getMsg();
                    HttpResultHandler preHandler = mHandler.get();
                    if (preHandler != null) {
                        preHandler.onTokenInvalid(msg);
                    } else {
                        ToastUtil.show(msg);
                    }
                    break;
                default:
                    ErrorMsg.getInstance().show(result);
                    break;
            }
        }

        private void showProgress() {
            if (showProgress && mHandler.get() != null)
                mHandler.get().showProgress();
        }

        private void dismissProgress() {
            if (showProgress && mHandler.get() != null)
                mHandler.get().dismissProgress();
        }
    }

    //BaseActivity或者BaseFragment需要实现此接口
    public interface HttpResultHandler {

        void showProgress();

        void dismissProgress();

        void onTokenInvalid(String msg);

        //其他功能可扩展
        //        void showMessage(String msg);
        //        void lowerVersion();
    }
}

相关文章

网友评论

    本文标题:RxJava:封装和使用

    本文链接:https://www.haomeiwen.com/subject/oaderxtx.html