前言
欢迎来到深入理解 RxJava2 系列第三篇。在上一篇中,我们详细地介绍了 Scheduler 与 Worker 的概念,并分析了ComputationScheduler与IoScheduler的实现,以帮助大家加深理解。本篇文章将基于 Scheduler ,来和大家分享 RxJava2 非常重要的概念:线程操作符。顺带提一下,本系列文章所有内容如不特别说明,均是指 Flowable相关的概念,因为这是 RxJava2 遵循 RS 的实现。
定义
Scheduler 相关操作符
RxJava 有很多基于 Scheduler 的操作符,如timer、interval、debounce等,但是笔者认为这些操作符与subscribeOn、unsubscribeOn、observeOn有本质上的区别。
其他的操作符,把 Scheduler 当做了计时工具,而 Scheduler 的调度导致线程切换是其附带属性,其核心是操作符本身的特性,如:
- buffer / window 按照时间段缓存数据
- throttle / debounce / throttle / skip 按照时间段采样数据
- timer/interval 按照时间段产生数据
- delay 延迟数据
- ...
线程操作符
因此笔者定义狭义上的线程操作符,其目的是为了改变上下游的某些操作所在的线程。更严格的说法是,其目的是将上下游的某些操作由目标 Scheduler 调度执行,因为某些 Scheduler 的调度并不一定会切换线程,如Schedulers.trampoline()。虽然如此,但是我们还是称之为线程操作符,因为通常我们的本意是为了切换线程。
以下是所有的线程操作符:
- subscribeOn:调度上游的
Flowable的subscribe方法,可能会调度上游Subscription的request方法 - unsubscribeOn:调度上游的
Subscription的cancel方法 - observeOn:调度下游
Subscriber的onNext / onError / onComplete方法
详解
通常subscribeOn与observeOn更受大家关注一些,因为unsubscribeOn使用的场景很少。因此本文就不会再花费过多笔墨在unsubscribeOn上,而且这个操作符本身的实现就非常简单,诸位一览便知。
subscribeOn
subscribeOn顾名思义,改变了上游的subscribe所在的线程。在传统的 Observable 中,只是改变了Observable.subscribe所在的线程,而在 Flowable 中不仅如此,还同样的改变了Subscription.request所在的线程。
这里就涉及到subscribeOn设计的用途,它最主要的目标是改变发射数据源的线程。因此在 Observable中数据的发射,也就是耗时操作一般在subscribe所在的线程(这里不考虑在onSubscribe后内部开线程异步回调的情况)。
而在 RS 的规范中数据的回调是由消费者主动调用Subscription.request 来触发的,因此在Flowable的实现中也要处理request的情况。
Asynchronous 数据源
上面我们提到 RS 的规范中由消费者主动调用Subscription.request 来触发回调数据,但是有些数据是异步产生的,可能在subscribe的一刻或者在那之前,譬如下面 2 个 API:
create
create 方法接受FlowableOnSubscribe作为真正的数据源。这个方法其实相比 RxJava1 已经做了很大的限制,通过封装了一层来支持 Backpressure。
关于此方法的细节,不再详细介绍,笔者之前有写过一篇文章分析过这个方法《Rx2:小create,大文章》,有兴趣的读者可以去看看。
但是即便封装后支持了 Backpressure,背压的逻辑更多的还是隐藏在操作符内部了,对外部的使用者还是尽量屏蔽了这些细节。FlowableEmitter 唯一能与 Backpressure 交互的接口仅是long requested();,并不能实时的响应Subscription.request。
unsafeCreate / fromPublisher
这两者是几乎一致的,接受一个Publisher作为数据源,外面封了一层Flowable代理该Publisher对象,通过这种方式来提供Flowable的丰富的操作符。
换种角度来看,其实这两个方法更像 RxJava1.x 中的 create 方法。因为数据源是来自Publisher,因此使用更加自由与随意。
强与弱
基于上述原因,在subscribeOn还提供了第二个参数来控制request的调度。
我们看一下方法的签名:
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn)
再看一眼唯一使用该参数的地方:
void requestUpstream(final long n, final Subscription s) {
if (nonScheduledRequests || Thread.currentThread() == get()) {
s.request(n);
} else {
worker.schedule(new Request(s, n));
}
}
注意这里nonScheduledRequests = !requestOn,该参数的作用就很明显了。
如果requestOn = true,确保Subscription.request方法一定在目标线程执行。反之requestOn = false,则直接在当前线程执行request。
我们再看一下重载的单一参数的方法:
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}
这里解释一下 FlowableCreate 是Flowable.create方法返回的类名,也就是说除了create作为上游的 Flowable,其他都推荐用强调度的方式。为什么单单create不可以用强调度呢。
我们用一个例子演示一下:
举例
Flowable.<Integer>create(t -> {
t.onNext(1);
Thread.sleep(500);
t.onNext(2);
t.onNext(3);
t.onComplete();
}, BackpressureStrategy.DROP)
// 注释 1 .map(i -> i + 1)
// 注释 2 .subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
new Thread(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
s.request(2);
}).start();
}
@Override
public void onNext(Integer integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("complete");
}
});
我们在create中发射了一个 1,延时 500ms,再次发射 2、3,随后结束,但是我们在订阅的时候先请求了 1 个数据,随后延时 100ms 再次请求 2个数据。
按照正常的流程,虽然数据请求延迟 100ms,但是数据发射延迟了 500ms,因而Subscriber能正确的收到3个数据:
1
2
3
complete
非常棒,一切都很美好。此时我们把注释 2 处给取消掉,再次执行结果依然同上。
此时我们应该清楚,重载的函数传入的参数是 false。好我们再试一下,但是这次把注释 2 处的代码换成:
.subscribeOn(Schedulers.io(), true)
结果:
1
complete
很意外,2 和 3 去哪了?其实原因很简单,因为我们把参数改成 true 以后,request方法要被 worker 调度后执行。
我们在《深入理解 RxJava2:Scheduler(2)》中强调过, Worker 有一个职责,保证入队的任务是串行执行的,换言之,我们的
t -> {
t.onNext(1);
Thread.sleep(500);
t.onNext(2);
t.onNext(3);
t.onComplete();
}
是在 Worker 中执行的,因为这里的函数没有执行完,就无法执行后续的 request 任务。因此在数据发射过程中,上游自始至终都认为下游一开始只请求了一次数据,所以多发射的 2 与 3 就被丢弃了。
不仅如此,我们再把注释 1 与 2 同时取消掉:
.map(i -> i + 1)
.subscribeOn(Schedulers.io())
结果:
2
complete
如果读者能理解笔者上面分享的内容,就能知道是为什么,奥秘就在:
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}
在subscribeOn前面增加了map操作符后,对象就不再是FlowableCreate了,而被map封了一层。所以导致requestOn错误的判别为true,最终导致线程锁住了request的个数。
因此subscribeOn看起来简单,使用起来还是有不少道道的,望大家留心。
线程影响
上面我们提过subscribeOn会影响发射数据的线程,从而间接的影响了消费者的消费的线程。
但是,消费线程和生产线程依然是同一个线程,这里从官网取一张示意图:
image
数据产生后在传递给下游的过程中,是不会发生线程切换的,请大家谨记。
结语
笔者本想一起介绍subscribeOn与observeOn的,奈何洋洋洒洒地一写便收不住,为了避免文章过长导致读者厌倦,observeOn以及这两者的结合与对比留待下篇分享。
感觉大家的阅读,欢迎关注笔者公众号,可以第一时间获取更新,同时欢迎留言沟通。
image












网友评论