美文网首页
Rxjava1.0_事件订阅和线程调度流程解析

Rxjava1.0_事件订阅和线程调度流程解析

作者: lzt橘子 | 来源:发表于2018-03-05 23:11 被阅读107次

前言

            Rxjava从15年底开始,热度直线上升,自此之后,基本Android端的项目开发,多多少少都会有它的身影。Rxjava出现后,1.相对于以往使用Handler或者AsyncTask作为线程转换和通信的手段,Rxjava实在是太方便了,可以在你想在的地方,任意的切换线程。2.相对于EventBus,Rxjava可以通过单例和ConcurrentHashMap简单的构造一个RxBus,就能实现类似EventBus的事件传递的功能。3.网络请求,各种操作符可以轻松应对api接口的各种事前处理和事后处理,可以合并,过滤,多段,循环请求等,和Retrofit配合,更是现在很多项目开发的标配。 4.Android控件的异步响应,黄油刀作者的RxView库能够极大的简化各种控件的事件监听和事前处理。

            Rxjava故名思意,异步是它的最大特点,通过Scheduler(调度器)可以自由的切换线程,实际使用中,一般使用Observable(被观察者)的subscribeOn()和observeOn()方法即可实现线程切换,确实很神奇,此外,很多开发人员也用它作为链式编程的一大利器,丰富而强大的操作符,可以明显的减少if else 和 for 循环的 使用和嵌套,降低代码的逻辑层次,可读性大大提高.


基本构成

            在分析Rxjava的工作流程之前,先说说它的基本构成:

Rxjava主要由6个极为紧密的对象和概念在一起运作:

1.Observable: 被观察者;

2.OnSubscribe:订阅行为;

3.subcribe: 订阅/取消订阅;

4.Observer:观察者(Subscriber是其的一个抽象实现,而Subscriber同时实现了Subscription接口);

5. 事件:(指onCompleted onError onNext);

6. hook: 挂钩(参见后面的内部流程解析)。

外部流程-无线程切换

基本流程:

1.四者基本运作

由图_1 可以很清晰的看到,Observable 和 Observer 分别创立后,需要通过 subscribe方法将两者联系起来,然后Observable可 间接向订阅的Observer 发送三种事件,事件在到达Observer 时被消费,不管事件是否消费完毕,都可以通过Subscription的unsubscribe取消订阅,此时Observer将不再接收Observable发送的事件.

线程切换

一般线程切换的流程:

2-线程切换

图_2中,Observable 和 Observer,subscribe 和事件四者的位置呈一个链式分布,很清晰的流程,而通过各种调度器Scheduler,可以随意的切换操作所在的线程,如该图:                                                                                                                                 1.事件发送前,事件发送(onNext,onCompleted),第一个filter操作符都被 第一个subscribeOn指定在io线程执行,直到第一个observeOn出现前;                                                                                                                                                                   2.第二个subscribeOn是无效的,在上述情况中subscribeOn在整个流程中只执行一次有效操作                                                     3.第二个filter操作符被第一个observeOn指定在主线程执行;                                                                                                     4.事件的消费(即subscribe后的操作),被第二个observeOn指定在io线程执行.                                                                     

特殊线程切换

需要额外的准备操作和临时事件的线程切换流程:

3-特殊线程切换

图_3中,除了一般线程切换的调度器外,还插入了两个新的操作:doOnSubscribe和doOnNext,如该图

1.第二个subscribeOn的线程切换是有效的,doOnSubscribe里的操作会在新线程执行;

2.doOnNext中的操作已指定在io线程执行,故此事件不会对create中的事件顺序产生影响,是独立的。

其余调度器作用同上.

内部流程解析-无线程切换

1.通过 Observable(被观察者)的静态方法: create() 创建一个 Observable 对象: 

4-创建被观察者对象

2.此时会先调用hook(挂钩)的 onCreate() 方法,明显这个hook是一个单例对象,在Observable类中搜索这个hook,可以找到如下图-5的声明:

5-hook(挂钩)单例

3.我们继续延着 hook的 onCreate() 方法点进去:

6-通过hook构造一个订阅行为对象

返回的是一个 OnSubscribe(订阅行为) 对象,这一步hook并没有对OnSubscribe做什么特殊处理,传进来后便直接返回了。

4. 重新回到图-4,我们找到Observable所用到的构造函数:

7-使用到的构造函数

可以看到:第一,将OnSubscribe对象赋值给新创建的Observable;第二,这个OnSubscribe对象是final修饰的。

以上1-4步,简单的说,就是创建了一个Observable(被观察者)对象,同时创建该Observable的一个订阅行为对象OnSubscribe。

5. 先创建一个Subscriber(观察者):

8-创建观察者1 9-创建观察者2

SubscriptionList是Subscription的一个实现类,所以Subscriber具备取消订阅和获知自身是否已经取消订阅的两个功能.

10-Subscription接口

6.然后调用刚创建的Observable对象 的: subscribe()方法,这一步会解释订阅是如何产生的,我们先看下图:

11-开始订阅1 11-开始订阅2

里面是Observable的一个静态方法,将前5步创建的Subscriber(观察者)和Observable(被观察者)作为参数传递进去。

先是:subscriber.onStart(), 执行Subscriber的onStart()函数,这一步是先于前面的OnSubscribe(订阅行为)执行的。

然后:hook.onSubscribeStart(observable,observable.onSubscribe).call(subscriber),这句代码分前后两部分来分析。

第一步:是调用hook的onSubscribeStart()方法,传入Observable和OnSubscribe,内部也没有做任何处理,直接返回OnSubscribe(订阅行为)对象。

12-订阅行为

这个OnSubscribe(订阅行为)对象就是Observable所持有的OnSubscribe,即我们在第3步创建的那个OnSubscribe。

第二步:调用OnSubscribe的call()方法,此时将第5步创建的Subscriber(观察者)作为参数传进去。这里就会触发先前我们在OnSubscribe(订阅行为)的call()方法中编写好的各种操作和Subscriber的事件操作(如onNext,onCompleted等).

最后:hook.onSubscribeReturn(subscriber),返回一个Subscription(即第5步创建的那个Subscriber(观察者)),可以用于随时取消订阅.

13-返回观察者

7.取消订阅的事前处理,在subscriber.onStart()和hook.onSubscribeStart()方法之间,会将Subscriber(观察者进行处理)转成SafeSubscriber,如下图:

14-自动取消订阅

SafeSubscriber重写了onComplete()和onError()方法:

15-onComplete 16-onError1 16-onError2

在onComplete()和onError()方法末尾,都会调用unsubscribe()方法来取消订阅,所以只要调用这两个方法之一,都会取消Subscriber的订阅.

以上5-7步,就是在Observable的subscribe()方法中,通过先前创建的OnSubscribe对象,将ObservableSubscriber联通起来。

内部流程解析-线程切换

subscribeOn工作流程分析

subsscribeOn流程解析

1.创建Observable1号和OnSubscribe1号后,调用subscribeOn(Schedulers.io())方法.

2.获取一个io线程的Schedulers(线程调度):

17-获取io线程的线程调度1 18-获取io线程的线程调度2

同时创建io线程集

19-创建io线程池

3.Rxjava中是如何限制subscribeOn()方法对线程切换的有效性的:

20-线程切换io线程1 21-线程切换io线程2 22-线程切换io线程3

顺序观察以上三步可以看到,一般情况下,第一次调用subscribeOn()方法后,Observable会转为ScalarSynchronousObservable,此后再调用subscribeOn()方法,都不会执行Observable的nest()和lift()方法,而线程切换的操作则在lift()方法中.

4.第1步:创建了Observable1号,OnSubscribe1号,参数Subscriber1号(即subscriber);                                                        第4步:通过Observable1号的nest()方法,创建了Observable2号和OnSubscribe2号,参数Subscriber2号(即s),Subscriber2号订阅的onNext()事件以Observable1号(即t)为参数。

23-创建新的观察者1 24-创建新的观察者2 24-创建新的观察者3

5.第5步:通过Observable2号的lift()方法,创建了Observable3号和OnSubscribe3号,参数Subscriber3号(即o).

26-线程切换1

6.第6步:调用Observable3号的subscribe()进行订阅,然后执行OnSubscribe3号的call()方法,显然Subscriber3号(即o)就是我们最后在subscribe传进去的那个Subscriber_Last.

7.那么如何实现线程切换呢,我们继续看hook.onLift(operator).call(o)这句:

hook.onLift(operator)也没做任何处理,直接返回OperatorSubscribeOn(本身也是Function接口的实现类,类似OnSubscribe),OperatorSubscribeOn内部持有了第2步创建的Schedulers(线程调度)对象,

   第7步:图-27中,调用OperatorSubscribeOn的call()方法,根据subscribeOn()传进来的线程调度类型,创建一个工作线程,然后将这个工作线程赋值给subscriber(即上图的Subscriber3号/Subscriber_Last),并新创建一个Subscriber4号返回(即上图-26中的st)。

   第8步:再看图-26,调用onSubscribe(即OnSubscribe2号,不记得的可以看前面的分析)的call()方法,回到图-24中的call()方法里面有一句: s.onNext(t), s即Subscriber4号(图-26中的st),t即我们最初创建的Observable1号.

   第9步:回到图-27,Subscriber4号的onNext()方法里面, 执行了inner.schedule(),显然线程调度就在这里,因为inner在第1步中指定为io线程,所以schedule()方法自然也在io线程中执行.

  第10步:最后在图-28中,Action0的call()方法执行后,执行o.unsafeSubscribe(),将o(即Observable1号)和新创建的Subscriber5号关联,执行OnSubscribe1号的call()方法,最后通过Subscriber5号执行Subscriber_Last的订阅事件.

第9步决定了第10步的所有操作都是在io线程中执行的。

27-线程切换1 28-线程切换2 29-执行订阅

observeOn工程流程分析

加入observeOn()

30-线程切换2

1.Rxjava中observeOn()为什么可以多次切换线程:

31-线程切换的有效性

明显,observeOn()内部没有使用Observable的nest()方法,所以无论调用多少次observeOn(),都会走lift()方法,即可以多次切换线程。

2.对比图-26和图32,observeOn()内部变换线程的原理和subscribeOn()是一样的,只不过subscribeOn()内部先调用的nest()方法,nest()方法中又可以回调最初的Observable1号,所以可以指定最初创建的OnSubscribe1号(订阅行为)的call()方法在指定的线程执行,而observeOn()直接调用lift()方法,只能回调上一级的Observable,因而只能指定上一级的OnSubscribe的call()方法的工作所在的线程。

32-线程切换3

结语

            上面的分析有点绕,不过建议可以自己写一小段代码,然后按上面的步骤,一步步的点进去,大概三两次,就会清楚Rxjava的事件是如何订阅的,事件订阅的行为又是如何在不同的线程中变换的,关于Rxjava的操作符,那更多就是如何使用上的问题了,否则一个个来分析,那将是一项很庞大的工程。此次分析,如果觉得不错,可以给我一个赞!

相关文章

网友评论

      本文标题:Rxjava1.0_事件订阅和线程调度流程解析

      本文链接:https://www.haomeiwen.com/subject/ligixftx.html