美文网首页
Rxjava2.0 切换线程过程解析

Rxjava2.0 切换线程过程解析

作者: Android程序员老鸦 | 来源:发表于2021-07-07 17:36 被阅读0次

rxjava线程切换也是一大亮点,还是以第一篇的下载图片并展示的例子来分析:

      Observable.just("https://img1.sycdn.imooc.com/5b8ab353000181b204000284.jpg")
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(@NonNull String s) throws Exception {
                        //1.下载图片
                        URL url = new URL(s);
                        URLConnection urlConnection = url.openConnection();
                        InputStream inputStream = urlConnection.getInputStream();
                        return BitmapFactory.decodeStream(inputStream);
                    }
                })
                .subscribeOn(Schedulers.io())//io线程下载
                .observeOn(AndroidSchedulers.mainThread())//android主线程接收图片bitmap
                .subscribe(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        //2.设置图片
                        ivImage.setImageBitmap(bitmap);
                    }
                });

根据上一篇map()的经验,subscribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread())应该也是封装成了另外的一个形式的Observable,先来看看subscribeOn(Schedulers.io()),这个方法是指定被观察者运行的线程:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //又是这个方法,不用再说了吧,最终返回的还是入参ObservableSubscribeOn对象
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

看看ObservableSubscribeOn类:

//也是继承AbstractObservableWithUpstream,从命名规则也知道了这也是个observable,而且这种类跟
//之前的ObservableMap结构很像,只是换了个成员变量scheduler
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
      //  也是新建了一个内部类对象,SubscribeOnObserver,入参是后面的Observer,
      //SubscribeOnObserver一看就知道也是一个观察者
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
      //调用了Observer的onSubscribe()方法
        s.onSubscribe(parent);
      //SubscribeTask 是个Runnable,后面有贴出,run()方法就是执行了source(源observable)的subscribe()
      //方法,就是在这里把之前的observable都放到了run里面执行,至于scheduler 就是Schedulers.io()
      //创造出来的线程了,这里不深究,这里scheduleDirect()方法就会去执行runnable
      //setDisposable()方法会在线程执行完的时候关掉它
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    //静态内部类,实现了Observer
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;
        //构造函数,初始化了内部的一个Observer
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }
        //这个onNext()方法实际调用的是下一级observer的onNext()
        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
      //最终调用了这个方法
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //源observable执行subscribe()后会触发parent.onNext()
            source.subscribe(parent);
        }
    }
}
总结下来,rxjava指定被观察者在特定线程执行的核心就是把上层的observable放在指定的线程执行subscribe()方法。

下面来看看指定下游观察者执行的线程,也就是observeOn(AndroidSchedulers.mainThread())方法:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

   public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //最终还是调用了onAssembly(),返回的依旧是入参ObservableObserveOn(可观察者被观察在xxx线程)
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

继续看ObservableObserveOn类:

//传入了主线程的Scheduler,也是被封装成了一个 Observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    //当他调用subscribe()方法的时候,执行这里
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //走这里
            Scheduler.Worker w = scheduler.createWorker();
            //调用上游的Observable的subscribe()方法,observer和Scheduler.Worker(线程)被封装
            //在ObserveOnObserver里,这里按顺序第一是看ObserveOnObserver的onSubscribe()方法
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    //静态内部类ObserveOnObserver,实现了Observer和Runnable 
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Scheduler.Worker worker;
        final boolean delayError;
        final int bufferSize;

        SimpleQueue<T> queue;

        Disposable s;

        Throwable error;
        volatile boolean done;

        volatile boolean cancelled;

        int sourceMode;

        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                //此案例的上个Disposable 不属于QueueDisposable,不走这条分支
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }
                //走这里
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //调用传进来的observer的方法,接下来执行onNext()
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //执行schedule()
            schedule();
        }

        @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }

        @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }

        @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }

        @Override
        public boolean isDisposed() {
            return cancelled;
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                //worker是传进来的线程,最终执行了自身,因为它自己就是Runnable,看看他的run()方法
                worker.schedule(this);
            }
        }

       //
        @Override
        public void run() {
            //outputFused背压相关,默认false,走drainNormal()
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }

        void drainNormal() {
            int missed = 1;

            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;

            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }

                for (;;) {
                    boolean d = done;
                    T v;

                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;

                    if (checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        break;
                    }
                    //主要的代码在这里,这个a是传进来的observer,最终它的onNext()执行在了指定的线程
                    a.onNext(v);
                }

                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
      ...
    }
总结下来,rxjava指定观察者在特定线程执行的核心就是把下层的observer放在指定的线程执行onNext()方法。依然是用Observable子类去包装实现链式响应。一直要记得rxjava源码的那种套娃结构。

下篇分析分析rxjava封装的线程Scheduler相关。

相关文章

  • Rxjava2.0 切换线程过程解析

    rxjava线程切换也是一大亮点,还是以第一篇的下载图片并展示的例子来分析: 根据上一篇map()的经验,subs...

  • RxJava2.0线程切换原理

    前言 理解线程切换原理有什么意义? 1、可以清楚的知道这个线程切换操作影响到哪些代码的执行线程,不会影响到哪些代码...

  • RxJava源码解析(二)

    前言 本篇主要解析RxJava的线程切换的原理实现 subscribeOn 首先, 我们先看下subscribeO...

  • 并发编程的挑战

    1、线程上下文切换:任务从保存到再加载的过程就是一次上线文切换,线程上下文切换是需要开销的。 2、引起线程上下文切...

  • Rx中的线程切换

    初学者在使用RxJava的过程中,经常搞不清Observable的事件序列和每次操作应该怎样切换线程,切换哪个线程...

  • 2018技术栈总结

    rxjava源码解析 线程切换https://www.jianshu.com/p/a36e5d257b03 ht...

  • Rxjava2.0 map变换过程解析

    接着上一篇,还是那个组装abc字符串的例子: 上篇分析过了just()方法之后就是新建了一个Observable对...

  • 2-RxJava源码分析之 --- 订阅过程和线程切换

    RxJava订阅过程和线程切换原理 1 - Observable.just("hello world").subs...

  • Rxjava 过程分析三之 subscribeOn

    Rxjava 过程分析三之 subscribeOn 说明 只分析 Rxjava 线程切换的大致过程和思想。 以弄明...

  • java基础_线程池

    为什么要有线程池 线程的创建和销毁为什么会消耗性能呢?线程的创建和运行是从用户态切换到核心态的过程,这个过程比较耗...

网友评论

      本文标题:Rxjava2.0 切换线程过程解析

      本文链接:https://www.haomeiwen.com/subject/pvroultx.html