美文网首页
RxJava源码解析

RxJava源码解析

作者: kjy_112233 | 来源:发表于2018-09-13 14:20 被阅读0次

基本框架

  • Observable (可观察者,即被观察者)
  • Observer (观察者)
  • subscribe (订阅) 通过该方法,将 Observable 与 Observer 关联起来
  • 事件 (包括 onNext,onComplete,onError 等事件)
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {

            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

(1)创建Observable流程源码分析
create方法源码

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
        Objects.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
    }

返回值是Observable,参数是ObservableOnSubscribe
ObservableOnSubscribe源码

@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}

ObservableOnSubscribe是一个接口,里面就一个我们实现的那个方法。该方法的参数是ObservableEmitter
ObservableEmitter源码

public interface ObservableEmitter<@NonNull T> extends Emitter<T> {
    //code...
}

ObservableEmitter也是一个接口。它继承了 Emitter<T> 接口
Emitter<T>源码

public interface Emitter<@NonNull T> {

    void onNext(@NonNull T value);

    void onError(@NonNull Throwable error);

    void onComplete();
}

定义了 我们在ObservableOnSubscribe中实现subscribe()方法里最常用的三个方法
create()源码方法里就一句话return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
RxJavaPlugins.onAssembly方法源码

    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

RxJavaPlugins.onAssembly中返回source,即传入的对象,也就是new ObservableCreate<T>(source)
create需要返回的是Observable,而我现在有的是ObservableOnSubscribe对象,ObservableCreate将ObservableOnSubscribe适配成Observable
至此,创建流程结束,我们得到了Observable<T>对象,其实就是ObservableCreate<T>
(2)subscribe订阅流程分析

  • subscribe源码
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            //code...
            //真正的订阅处
            subscribeActual(observer);
        } catch (NullPointerException e) {
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
  • 将第一节提到的ObservableCreate里的subscribeActual()方法拿出来看看
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //创建CreateEmitter,也是一个适配器
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        //onSubscribe()参数是Disposable ,所以CreateEmitter可以将Observer->Disposable 。还有一点要注意的是onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。
        observer.onSubscribe(parent);

        try {
            //将ObservableOnSubscribe源头与CreateEmitter联系起来
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
  • CreateEmitter
        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //code...
            //如果没有被dispose,会调用Observer的onNext()方法
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
            }
            //如果没有被dispose,会调用Observer的onError()方法
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose(); //一定会自动dispose()
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            //如果没有被dispose,会调用Observer的onComplete()方法
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();// //一定会自动dispose()
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public String toString() {
            return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
        }

Observer源码

public interface Observer<@NonNull T> {

    void onSubscribe(@NonNull Disposable d);

    void onNext(@NonNull T t);

    void onError(@NonNull Throwable e);

    void onComplete();

}

Observer是一个接口,里面就四个方法,我们在开头的例子中已经全部实现

  • Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
  • Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()
  • Observable和Observer关联时(订阅时),Observable才会开始发送数据
  • 在subscribeActual()方法中,源头和终点关联起来。

主要用到的设计模式:

  • 适配器模式
  • 观察者模式
  • 装饰者模式

相关文章

  • RxPermissions 源码解析之举一反三

    [toc] RxPermissions 源码解析 简介 RxPermissions 是基于 RxJava 开发的用...

  • RxJava2框架源码分析三(map篇)

    1.回顾 上篇已经讲了RxJava2创建操作符create源码解析,不清楚的可以查看RxJava2框架源码分析二(...

  • Rxjava2.x源码解析(二): 线程切换

    上一篇文章Rxjava2.x源码解析(一): 订阅流程中我们讲了 RxJava2 的订阅部分的源码。但 RxJav...

  • RxJava 2 源码解析之线程切换

    在分析RxJava2的线程切换源码之前,再看看在上一篇RxJava 2 源码解析之创建-订阅-变换-发布里总结的流...

  • rxjava2理解

    本文从建立模型的角度分析rxjava2的源码实现,适合看了众多rxjava2源码解析还是一头雾水的同学,附带少量代...

  • Rxjava源码解析

    先上代码: 上面是Rxjava最简单的实现模型。从链式调用的返回值来看: 所以最后的调用对象是 从上面的返回值可以...

  • RxJava源码解析

    基本框架 Observable (可观察者,即被观察者) Observer (观察者) subscribe (订阅...

  • Rxjava源码解析

    这边文章主要记录使用Rxjava过程中对map方法以及flatmap方法的源码理解,自认为也是RxJava的一个精...

  • Rxjava源码解析

    Rxjava本质上是一个异步操作库。是一个能让你用非常简单的逻辑 去处理那些繁琐复杂任务的 异步的操作库。 一、观...

  • rxjava源码解析

    线程切换原理 案例 subscribeOn切换子线程 先看subscribe的执行,最后会执行Observable...

网友评论

      本文标题:RxJava源码解析

      本文链接:https://www.haomeiwen.com/subject/jitwgftx.html