美文网首页
RxJava浅析

RxJava浅析

作者: 程序员要多喝水 | 来源:发表于2019-12-09 21:10 被阅读0次

测试用例:

     new Thread( new Runnable() {
            @Override
            public void run() {
                rxjavaTest();
            }
        },"rxjava_test").start();
    

    @SuppressLint("CheckResult")
    private void rxjavaTest() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
            }
        }).subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LOG_TAG("onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        LOG_TAG("onNext");
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

    private void LOG_TAG(String prefix){
        Log.d(TAG,prefix + " ;"+Thread.currentThread().getName());
    }

显示结果,3个方法在一个线程中:

2019-12-02 17:05:21.843 18402-18434/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;rxjava_test
2019-12-02 17:05:21.843 18402-18434/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;rxjava_test
2019-12-02 17:05:21.843 18402-18434/com.mi.learn.rxjava D/Rxjava-Study: onNext ;rxjava_test

分析代码:
ObservableOnSubscribe

public interface ObservableOnSubscribe<T> {

    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}

Observable#create

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }

发现,经过Observable.create方法,会将ObservableOnSubscribe封装成ObservableCreate类型;

然后看Observable#subscribe方法干了啥?

public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
    }
    
@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
            ...
            subscribeActual(observer);
            ...
            throw npe;
        }
    }   

关键就是调用subscribeActual方法,此时回头看上方在执行到subscribe方法,其实就是过程是
ObservableCreate#subscribeActual的方法:
来看其源码:

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

其会subscribeActual(observer)的observer是subscribe(new Observer)这里面的new出来的;
CreateEmitter<T> parent = new CreateEmitter<T>(observer);会将这个new Observer封装成CreateEmitter,之后直接执行observer.onSubscribe方法;因此日志打印最开始执行的是

2019-12-02 17:05:21.843 18402-18434/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;rxjava_test

;之后看下 source.subscribe(parent)方法;其中source是回头
Observable#create

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
 }

这个source就是new ObservableOnSubscribe,因此source.subscribe会调用到ObservableOnSubscribe#subscribe方法,所以第二个打印的会是:

2019-12-02 17:05:21.843 18402-18434/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;rxjava_test
 @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
            }

调用 emitter.onNext("test");时候,因为 observer.onSubscribe(parent)这里面的parent是将ObservableEmitter封装成CreateEmitter,所以在调用onNext时候会回调到CreateEmitter#onNext方法,代码如下:

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
    
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
   }     

所以在打印

2019-12-02 17:05:21.843 18402-18434/com.mi.learn.rxjava D/Rxjava-Study: onNext ;rxjava_test

流程图大概如下:

graph TD
  id1(ObservableOnSubscribe)
  id2(ObservableCreate)
  id3(Observer)
  id1 --> id2
  id2 --subscribe--> id3

现在添加一些事件切换过程,比如map,flatMap

   @SuppressLint("CheckResult")
    private void rxjavaTest() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
            }
        })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LOG_TAG("test map");
                        return "test map";
                    }
                })
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        return new ObservableSource<String>() {
                            @Override
                            public void subscribe(Observer<? super String> observer) {
                                LOG_TAG("test flatMap");
                                observer.onNext("test flatMap");
                            }
                        };
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LOG_TAG("onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        LOG_TAG("onNext:"+s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

打印日志结果如下:

2019-12-02 17:31:54.740 19012-19042/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;rxjava_test
2019-12-02 17:31:54.740 19012-19042/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;rxjava_test
2019-12-02 17:31:54.740 19012-19042/com.mi.learn.rxjava D/Rxjava-Study: test map ;rxjava_test
2019-12-02 17:31:54.741 19012-19042/com.mi.learn.rxjava D/Rxjava-Study: test flatMap ;rxjava_test
2019-12-02 17:31:54.741 19012-19042/com.mi.learn.rxjava D/Rxjava-Study: onNext:test flatMap ;rxjava_test

看下Observable#map/flatMap:

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    
  public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }  

分析源码发现,其实也就是将事件封装,从ObservableCreate封装成ObservableMap/ObservableFlatMap;
来看ObservableMap源码:

  @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
 }
 
  static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }
            ...
            downstream.onNext(v);
        }
    }

其中当调用subscribe方法时候,会调用到ObservableCreate#subscribeActual(observer)方法,而ObservableCreate#subscribeActual中的observer是封装后的ObservableMap/ObservableFlatMap,
因此会调用到ObservableMap/ObservableFlatMap#subscribeActual,所以会显按顺序打印出来日志,流程如下;

graph TD
  id1(ObservableOnSubscribe)
  id2(ObservableCreate)
  id3(ObservableMap)
  id4(ObservableFlatMap)
  id5(Observer)
  id1 --> id2
  id2 --> id3
  id3 --> id4
  id4 --subscribe--> id5

切断事件联系:Disposable

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
                emitter.onNext("test1");
                emitter.onNext("test2");
                emitter.onNext("test3");
            }
        }).subscribe(
                new Observer<String>() {
                    private Disposable mDisposable = null;
                    @Override
                    public void onSubscribe(Disposable d) {
                        LOG_TAG("onSubscribe");
                        mDisposable = d;
                    }

                    @Override
                    public void onNext(String s) {
                        LOG_TAG("onNext:" + s);
                        if (s.equals("test2")){
                            mDisposable.dispose();
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

在发送事件时候,最后是调用到ObservableCreate#subscribeActual方法,执行ObservableEmitter封装的CreateEmitter方法发送onNext事件,代码如下:
CreateEmitter#onNext

  public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
    }
        
   public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
    }
        
   public static boolean isDisposed(Disposable d) {
            return d == DISPOSED;
    }
        
   public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }

这里会判断是否isDisposed,如果是,则不在执行后续的onNext方法,事件也就被抛弃;

事件发送是串型发送,非并行

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
                emitter.onNext("test1");
                emitter.onNext("test2");
                emitter.onNext("test3");
            }
        }).subscribe(
                new Observer<String>() {
                    private Disposable mDisposable = null;
                    @Override
                    public void onSubscribe(Disposable d) {
                        LOG_TAG("onSubscribe");
                        mDisposable = d;
                    }

                    @Override
                    public void onNext(String s) {
                        if (s.equals("test2")){
                            SystemClock.sleep(10000);
                        }
                        LOG_TAG("onNext:" + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

打印结果:

2019-12-03 19:58:59.485 6200-6222/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;rxjava_test
2019-12-03 19:58:59.485 6200-6222/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;rxjava_test
2019-12-03 19:58:59.485 6200-6222/com.mi.learn.rxjava D/Rxjava-Study: onNext:test ;rxjava_test
2019-12-03 19:58:59.486 6200-6222/com.mi.learn.rxjava D/Rxjava-Study: onNext:test1 ;rxjava_test
2019-12-03 19:59:09.487 6200-6222/com.mi.learn.rxjava D/Rxjava-Study: onNext:test2 ;rxjava_test
2019-12-03 19:59:09.487 6200-6222/com.mi.learn.rxjava D/Rxjava-Study: onNext:test3 ;rxjava_test

结果显示onNext:test1和onNext:test2 事件差10S;

前面说的都没设计到线程,其实设计线程也就是多2个封装类而已,就是ObservableObserveOn/ObservableSubscribeOn

@SuppressLint("CheckResult")
    private void rxjavaTest() {
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
            }
        })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LOG_TAG("test map");
                        return "test map";
                    }
                })
                .flatMap(new Function<String, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(String s) throws Exception {
                        return new ObservableSource<String>() {
                            @Override
                            public void subscribe(Observer<? super String> observer) {
                                LOG_TAG("test flatMap");
                                observer.onNext("test flatMap");
                            }
                        };
                    }
                }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LOG_TAG("onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        LOG_TAG("onNext:"+s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

打印日志如下:

2019-12-02 17:59:40.959 19257-19279/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;rxjava_test
2019-12-02 17:59:40.986 19257-19284/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;RxCachedThreadScheduler-1
2019-12-02 17:59:40.986 19257-19284/com.mi.learn.rxjava D/Rxjava-Study: test map ;RxCachedThreadScheduler-1
2019-12-02 17:59:40.986 19257-19284/com.mi.learn.rxjava D/Rxjava-Study: test flatMap ;RxCachedThreadScheduler-1
2019-12-02 17:59:41.060 19257-19257/com.mi.learn.rxjava D/Rxjava-Study: onNext:test flatMap ;main

来分析一波,subscribeOn/observeOn

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }    

将上面返回的observable封装成ObservableSubscribeOn/ObservableObserveOn;
还是分析subscribeActual方法:
ObservableSubscribeOn#subscribeActual

 @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

首先会将上面返回的ObservableFlatMap封装成SubscribeOnObserver,然后调用onSubscribe方法,注意ObservableCreate#subscribeActual方法也有observer.onSubscribe:

  protected void subscribeActual(Observer<? super T> c) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

注意这里打印的onSubscribe日志是ObservableSubscribeOn#subscribeActual打印来出的,
原因是因为Override了onSubscribe方法,当ObservableCreate#subscribeActual中调用observer.onSubscribe时候,调用到SubscribeOnObserver#onSubscribe就没了;

static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        @Override
        public void onSubscribe(Disposable d) {
            DisposableHelper.setOnce(this.upstream, d);
        }
}

如果有多个onSubscribe(xxx)方法,那么调用的也会是最后一次调用到ObservableSubscribeOn#subscribeActual中的observer.onSubscribe(parent);

到这里还没有出现过线程切换问题,因此onSubscribe日志打印就在当前方法执行的线程中,也会是第一个执行的方法,日志如下:

19279/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;rxjava_test

接下来会来看subscribe是如何执行的,以及执行在哪个线程中,
onSubscribe完后
会执行parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

先分析下SubscribeTask源码

  final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

发现其是继承Runnable,Run方法中就会执行subscribe方法,其中,source就是封装传递过来的Observable,直到调用到ObservableCreate#subscribeActual执行 source.subscribe(parent);打印ObservableOnSubscribe#subscribe中回调方法日志;
因此第二个打印日志是

2019-12-02 17:59:40.986 19257-19284/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;RxCachedThreadScheduler-1

接下来分析下其执行线程,

observable.subscribeOn(Schedulers.io())

@NonNull
public static Scheduler io() {
     return RxJavaPlugins.onIoScheduler(IO);
}
    
IO = RxJavaPlugins.initIoScheduler(new IOTask());

static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
}

static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
}

public IoScheduler() {
   this(WORKER_THREAD_FACTORY);
}

public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<CachedWorkerPool>(NONE);
        start();
}

然后回来看线程执行:

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

scheduler是IoScheduler;

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
   public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

接下来看Worker是如何创建的,代码如下

    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
        
        
   public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        ...  
        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } 
        ...
        return sr;
    }     

来看这个executor是从哪里来的:

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
}


ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }
        
static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
 }

public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
 }
 
 public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
 }

到这就看出来,其实整个过程,就是往线程池中submit/schedule添加线程去执行,所以执行过程是在线程池指定线程中;如果指定多个

   Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
            }
        }).subscribeOn(Schedulers.io())
                .subscribeOn(Schedulers.single())
                .subscribeOn(AndroidSchedulers.mainThread())

打印的日志subscribe也只是运行在第一次指定的线程中,因此subscribe方法调用递归往上执行,最下方subscribeOn(AndroidSchedulers.mainThread())会先执行,最后才执行subscribeOn(Schedulers.io()),因此执行到ObservableCreate#subscribeActual时候是运行在第一次执行的线程中;

解析来看observeOn过程:
先分析下AndroidSchedulers.mainThread()如何指定运行到主线程的:

 public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    
 private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });   
            
private static final class MainHolder {
        static final Scheduler DEFAULT
            = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
    }

HandlerScheduler(Handler handler, boolean async) {
        this.handler = handler;
        this.async = async;
    }

    @Override
    @SuppressLint("NewApi") // Async will only be true when the API is available to call.
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        ...
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        Message message = Message.obtain(handler, scheduled);
        if (async) {
            message.setAsynchronous(true);
        }
        handler.sendMessageDelayed(message, unit.toMillis(delay));
        return scheduled;
    }

    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler, async);
    }

到这里其实以及知道,切换主线程是根据Android的Handler+Message方法去切换的;

然后分析下observable.observeOn(AndroidSchedulers.mainThread())过程

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn继续分析其subscribeActual方法

  @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //指定当前线程,直接执行
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //创建Worker
            Scheduler.Worker w = scheduler.createWorker();
            //这里注意,subscribe并没有执行运行线程哦
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

我们知道大概流程是一直调用到subscribe直到ObservableCreate#subscribeActual执行ObservableOnSubscribe#subscribe方法,调用emitter#onNext过程与subscribe相反;
来看ObserveOnObserver代码:

final Observer<? super T> downstream;
final Scheduler.Worker worker;

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

这里创建的work传递的是HandlerScheduler#HandlerWorker,然后执行看其回调的onNext方法:

 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
         void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

至此,这里OnNext执行线程已经确定了;

 public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            ...
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; 
            if (async) {
                message.setAsynchronous(true);
            }
            handler.sendMessageDelayed(message, unit.toMillis(delay));
            ...

            return scheduled;
        }

这样就完成了线程切换过程,其实纵观全局,发现ObservableSubscribeOn#subscribeActual指定的是从下往上subscribe过程运行在哪个线程,而ObservableObserveOn#subscribeActual是指定从上往下的onNext过程运行在哪个线程;

总结:
1.每执行一个过程,其实都是对Observable进行一次封装;
2.subscribe是从下往上执行,其执行最终是执行subscribeActual方法,简称上游;
3.onNext/onComplete/onError是回调过程,从上往下执行,简称下游;
4.指定上游线程使用subscribeOn(xxx),指定下游使用observeOn(xxx)
5.切断事件发送使用Disposable#dispose;
6.onNext事件是串型发送,非并行发送,前一个耗时久,后面就得等;

盗用网上一张图:


image.png

最后来一个练习:

//主线程执行:
private void rxjavaTest() {
  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                LOG_TAG("subscribe");
                emitter.onNext("test");
            }
        })
                .subscribeOn(Schedulers.single())
                .observeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LOG_TAG("map1");
                        return "test map1";
                    }
                })
                .lift(new ObservableOperator<String, String>() {
                    @Override
                    public Observer<? super String> apply(Observer<? super String> observer) throws Exception {
                        LOG_TAG("lift");
                        return observer;
                    }
                })
                .subscribeOn(Schedulers.io())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LOG_TAG("map2");
                        return "test map2";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        LOG_TAG("map3");
                        return "test map1";
                    }
                })
                .subscribe(
                new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        LOG_TAG("onSubscribe");
                    }

                    @Override
                    public void onNext(String s) {
                        LOG_TAG("onNext:" + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }               

ObservableLift#subscribeActual

    public void subscribeActual(Observer<? super R> observer) {
        Observer<? super T> liftedObserver;
        try {
            liftedObserver = ObjectHelper.requireNonNull(operator.apply(observer), "Operator " + operator + " returned a null Observer");
        } 
         ...
         source.subscribe(liftedObserver);
    }

lift中apply方法是在source.subscribe之前执行,因此属于上游时候执行;

ObservableMap#subscribeActual

    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }

MapObserver.java

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        
        
          @Override
        public void onNext(T t) {
            ...
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } 
            ...
            downstream.onNext(v);
        }

可以看出map的apply方法是在downstream.onNext方法之前执行,因此属于下游方法;

打印日志:

2019-12-03 20:36:30.013 7486-7486/com.mi.learn.rxjava D/Rxjava-Study: onSubscribe ;main
2019-12-03 20:36:30.014 7486-7510/com.mi.learn.rxjava D/Rxjava-Study: lift ;RxCachedThreadScheduler-1
2019-12-03 20:36:30.016 7486-7514/com.mi.learn.rxjava D/Rxjava-Study: subscribe ;RxSingleScheduler-1
2019-12-03 20:36:30.017 7486-7515/com.mi.learn.rxjava D/Rxjava-Study: map1 ;RxCachedThreadScheduler-2
2019-12-03 20:36:30.017 7486-7515/com.mi.learn.rxjava D/Rxjava-Study: map2 ;RxCachedThreadScheduler-2
2019-12-03 20:36:30.038 7486-7486/com.mi.learn.rxjava D/Rxjava-Study: map3 ;main
2019-12-03 20:36:30.038 7486-7486/com.mi.learn.rxjava D/Rxjava-Study: onNext:test map1 ;main

相关文章

  • Rxjava2浅析

    Rxjava2浅析 最近在Android开发阵营中RxJava被经常提起,似乎不看看RxJava马上就会被Out一...

  • RxJava2 源码浅析

    RxJava2 源码浅析 ReactiveX 历史:ReactiveX是Reactive Extensions的缩...

  • RxJava源码浅析———订阅流程、map与Filter操作符实

    RxJava源码浅析———订阅流程、map与Filter操作符实现原理 RxJava是一个非常流行的基于观察者模式...

  • RxJava浅析

    扩展后的观察者模式。 OS模式的不足: 1、不知道事件何时结束。 2、缺少错误通知机制。 RxJava对以上的改进...

  • RxJava浅析

    测试用例: 显示结果,3个方法在一个线程中: 分析代码:ObservableOnSubscribe Observa...

  • RxJava 源码浅析

    本文只分析 RxJava 的基本原理与流程,不深入探讨具体操作符的实现细节。 背景 为什么使用 RxJava? 解...

  • RxJava 源码浅析

    https://www.jianshu.com/p/9e3a0bc5680a 如果遇到问题请到这里讨论 背景 本来...

  • RxJava2线程调度源码分析(二)

    在RxJava2源码浅析(一) 里我们分析RxJava2最简单的用法,实际上就是复杂一点的回调.今天一起来看看线程...

  • RxJava源码浅析(一)

    源码地址:RxJava 1. 功能介绍 1.1 RxJava RxJava is a Java VM implem...

  • Android rxjava源码浅析

    下面谈谈我的理解 rxjava原理说白了就是三条流: API构建流 事件订阅流 事件回调流 这里结合retrofi...

网友评论

      本文标题:RxJava浅析

      本文链接:https://www.haomeiwen.com/subject/wgvigctx.html