已经很久没有对Android开发知识点进行学习了。这半年来除了公司的正经事要忙,还抽空把flutter和小程序学习了一遍。现在是时候重新对Android温故而知新了
RxJava2是一个面试环节必问的一个框架。万变不离其宗,你把源码流程思想都嚼烂了,害怕能问出什么幺蛾子呢?希望本篇文章能够帮到大家更好的理解RxJava2的源码,理解其设计思路
前言
RxJava2与RxJava1相比,基本上没有太大变化,但是多了一个背压策略。所以在分析RxJava2源码的时候,一定会多带上它。
那么我们话不多说,直接开始吧
RxJava2源码分析(无背压)
先来看一下RxJava2无背压版使用范例。通过范例的回调结果,我们可以更加轻松的了解整个RxJava2的实现流程
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
Log.d("Demo", "subscribe")
emitter.onNext("Hello")
emitter.onComplete()
}
}).subscribe(object : Observer<String> {
override fun onComplete() {
Log.d("Demo", "onComplete")
}
override fun onSubscribe(d: Disposable) {
Log.d("Demo", "onSubscribe")
}
override fun onNext(t: String) {
Log.d("Demo", "onNext $t")
}
override fun onError(e: Throwable) {
Log.d("Demo", "onError")
}
})
运行之后得到的结果如下
onSubscribe
subscribe
onNext Hello
onComplete
因此,我们可以从结果推断出事件处理顺序为:
-
Observable被观察者的create()先被触发 -
Observable被观察者与Observer观察者通过subscribe()进行绑定关联 -
Observer观察者的onSubscribe()被触发 -
Observable被观察者中的ObservableOnSubscribe接口被触发,并执行了subscribe()。ObservableEmitter对象执行了onNext() -
Observer观察者的onNext()被触发 -
ObservableEmitter对象执行了onComplete() -
Observer观察者的onComplete()被触发
RxJava2执行流程
了解到这个顺序之后,我们再来看源码,看一下我们的推断与源码描述是否一致
1. 观察者Observable
Observable中包含RxJava2所有操作符的操作方法,但是最重要2个方法是create()和subscribe()。
先进入create()。create()虽然只有短短两行代码,但每一行都是阅读源码的关键
// create()将我们自定义的ObservableOnSubscribe对象作为参数传递进去
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// requireNonNull()仅仅是一个非空检查,其实并不关键,但之所以说它很关键,是因为RxJava2源码中有很多地方都调用了这个方法。
// 你只要记得这个写法就行,以后不需要太关注
ObjectHelper.requireNonNull(source, "source is null");
// 返回ObservableCreate对象
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
入参ObservableOnSubscribe正是串联Observable与Observer的关键,看名字就知道。
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
重点在return的对象了。先来看看RxJavaPlugins的onAssembly(),它的返回值即为我们传入的source对象,即Observable
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 我们并未调用过`setOnObservableAssembly()`对`onObservableAssembly`进行赋值
// 因此`onObservableAssembly`的值为null
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
小结一下,Observable被观察者通过create()创建出来ObservableCreate对象。持续的小结很重要,这是源码阅读的一个方法。ObservableCreate我们稍后再说,继续把Observable看完,接下来看下subscribe()。subscribe()就是观察者订阅被观察者的地方。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// 这里又出现requireNonNull了
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 包装一下,还是返回Observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 关键方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
} catch (Throwable e) {
}
}
这里有一个重要的抽象方法subscribeActual()。无论是操作符控制还是线程调度都会涉及到各种subscribeActual()。因此,当你看到subscribeActual()的时候都要打起十二分精神。这里还是先放一下,等稍后再说
protected abstract void subscribeActual(Observer<? super T> observer);
2. 桥梁ObservableOnSubscribe
刚才我们没有对ObservableOnSubscribe进行详细说明,现在补充一下。ObservableOnSubscribe起到的是一个桥梁的作用,由它串联Observable与Observer,从而将ObservableEmitter产生的事件交由Observer。我们看下ObservableEmitter,ObservableEmitter继承自Emitter
public interface ObservableEmitter<T> extends Emitter<T>
Emitter有几个我们很熟悉的方法,它们就是我们自定义ObservableOnSubscribe中所用到的方法
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
ObservableOnSubscribe在后面会有闪亮的动作,先到这里结束
3. ObservableCreate
接下来重头戏就在这了。刚才我们已经知道,create()方法最后返回的是ObservableCreate对象。其构造方法接收一个自定义的ObservableOnSubscribe对象
// 记住这个source,会有很多地方要用
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
该类里有一个重要的方法叫subscribeActual(),这个方法实现了之前我们所说Observable中的相应抽象方法。你可以回头再看下刚才的subscribe(),这里的Observer就是我们自定义的观察者。这样,ObservableCreate本身作为Observable,同时持有Observer与ObservableOnSubscribe的引用了
@Override
// 传入的是`Observer`观察者对象
protected void subscribeActual(Observer<? super T> observer) {
// 创建了CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 执行了observer中的onSubscribe(),也就是上文所说的步骤三
observer.onSubscribe(parent);
try {
// 执行ObservableOnSubscribe中的subscribe(),入参为CreateEmitter对象,进入上文所说的步骤四
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
该方法首先创建一个CreateEmitter对象,并将Observer传递进去。那我们势必要知道CreateEmitter的作用
4. CreateEmitter
CreateEmitter类也实现了ObservableEmitter接口,这就意味着它跟ObservableOnSubscribe有某种关联。Observer对象作为参数传递到其构造方法中。我们以onNext()方法为例来进行说明
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// CreateEmitter中的`onNext()`、`onError()`、`onComplete()`与`Observer`中的相应方法进行关联
// 这样,只要调用CreateEmitter中的上述三种方法,即可触发`Observer`中的同名方法
@Override
public void onNext(T t) {
// RxJava2不允许传递null类型数据
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 执行`Observer`中的`onNext()`
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
onNext()、onError()、onComplete()与Observer中的相应方法进行关联,这就意味着你触发ObservableEmitter相关方法,也会同步触发Observer的同名方法
小结一下。subscribeActual()创建CreateEmitter,将onNext()、onError()、onComplete()与Observer中的相应方法进行关联。之后就是调用Observer的onSubscribe()。随后通过source调用ObservableOnSubscribe的subscribe()。当触发Emitter的onNext()的时候,同时触发Observer的onNext(),onComplete()同理。这样,示意图中的第4、5、6、7步就有理可据,整个事件流程就被完整的串联起来了
5. 观察者Observer
`Observer的部分已经不是特别重要了,它具体的功能完全是我们自定义的
// `Observer`是一个接口,它有4个方法
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
最后,用一张UML图来总结一下RxJava2无背压版源码
1. Observable通过create()与subscribe()两个方法将ObservableOnSubscribe与Observer收入囊中
2. 通过Observable的create()得到ObservableCreate对象
3. 通过ObservableCreate的subscribeActual()方法得到CreateEmitter对象
4. 得到CreateEmitter对象的同时,触发Observer的onSubscribe(),并执行ObservableOnSubscribe中的subscribe,入参为CreateEmitter
5. 通过对CreateEmitter中onNext()、onError()与onComplete()的调用完成事件向Observer同名方法的传递
RxJava2源码分析(无背压)
RxJava2源码分析(有背压)
不支持背压的Observable与支持背压的Flowable事件流程基本一致,唯一不同之处在于后者多了背压策略。
我们依然从范例对事件流程进行回顾
Flowable.create(object : FlowableOnSubscribe<String> {
override fun subscribe(emitter: FlowableEmitter<String>) {
Log.d("Demo", "subscribe")
emitter.onNext("123")
emitter.onNext("456")
emitter.onNext("789")
emitter.onComplete()
}
}, BackpressureStrategy.DROP).subscribe(object : FlowableSubscriber<String> {
override fun onComplete() {
Log.d("Demo", "onComplete")
}
override fun onSubscribe(s: Subscription) {
Log.d("Demo", "onSubscribe")
s.request(2)
}
override fun onNext(t: String?) {
Log.d("Demo", "onNext $t")
}
override fun onError(t: Throwable?) {
Log.d("Demo", "onError")
}
})
运行之后得到的结果如下:
onSubscribe
subscribe
onNext 123
onNext 456
onComplete
看上去好像跟无背压版本差不多,但是细心的你肯定会发现:我明明传递了3个onNext()事件,怎么最后就打印出来了2个?
那我们就继续通过对源码的学习,来看看这个问题到底是如何产生的
1. 观察者Flowable
Flowable实现了Publisher接口
public abstract class Flowable<T> implements Publisher<T>
Publisher同样也有subscribe方法。这里同无背压版在原理上是一致的
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
同样,重点还是在create()与subscribe()。create()与之前无背压版不同,多了一个背压策略BackpressureStrategy
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
背压策略一共有5种,分别是MISSING、ERROR、BUFFER、DROP、LATEST。
public enum BackpressureStrategy {
MISSING,
ERROR,
BUFFER,
DROP,
LATEST
}
RxJavaPlugins.onAssembly这个就不用多说了,与之前一样。显然create()方法返回值就是FlowableCreate对象了。
我们依然稍后再说FlowableCreate,因为它与无背压版的ObservableCreate一样都很关键
看subscribe()方法,其实总的来说,这个跟无背压版subscribe()实际上也是一样的
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
// 将FlowableSubscriber进行包装
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 将包装好的对象传递到实现subscribeActual抽象方法的类
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}
2. 桥梁FlowableOnSubscribe
看看它们的命名,与ObservableOnSubscribe如出一辙,这里也不再多说了
public interface FlowableOnSubscribe<T> {
void subscribe(@NonNull FlowableEmitter<T> emitter) throws Exception;
}
3. FlowableCreate
这就是重点类,构造方法中传入FlowableOnSubscribe和BackpressureStrategy
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
重中之重依然是subscribeActual方法。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
// 根据不同的背压模式进行不同的处理
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
// 后面的部分与`ObservableCreate`一致,都是调用`Subscriber`的onSubscribe以及`FlowableOnSubscribe`的`subscribe`
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
emitter参数我们虽然知道它的大概意思,但是毕竟涉及到背压策略,所以我们还是进去看一下。以背压丢弃策略DropAsyncEmitter为例进行说明即可,因为流程上这5中策略差别不会很大
4. 背压丢弃策略DropAsyncEmitter
DropAsyncEmitter继承自NoOverflowBaseAsyncEmitter
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
// downstream即为传入的Subscriber对象
DropAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// nothing to do
}
}
NoOverflowBaseAsyncEmitter则继承自BaseEmitter
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T>
BaseEmitter又继承自FlowableEmitter
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription
而FlowableEmitter又继承自Emitter
public interface FlowableEmitter<T> extends Emitter<T>
最终都回到了Emitter,因此总的来说背压版流程与无背压版是一致的。来看看区别在哪里,在NoOverflowBaseAsyncEmitter类使用了AtomicLong来确定事件下发的数量并确保计数的准确性
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 这里是关键。因为BaseEmitter继承自AtomicLong,因此如果其数值为0,则不会触发downstream.onNext(t)
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
有get()的地方肯定就有add(),进入BaseEmitter类
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
// 这里就是设置可处理数量的地方
BackpressureHelper.add(this, n);
onRequested();
}
}
进入BackpressureHelper类
public static long add(AtomicLong requested, long n) {
for (;;) {
// 获取当前`AtomicLong`中尚可发射事件的总量
long r = requested.get();
// 如果当前量为`Long.MAX_VALUE`,则直接返回
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 通过`addCap`将当前量与新增量相加重新设置AtomicLong的值
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
有加就有减。刚才NoOverflowBaseAsyncEmitter的onNext()就是事件发出的时候进行AtomicLong数量-1的操作
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
再次进入BackpressureHelper类查看produced()相关的代码
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 当前值-1
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
// 重新赋值完成更新
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
所以,我们的代码中request(2),前2个onNext()会执行downstream.onNext(t),同时剩余量将会-1。当总量为0的时候,执行onOverflow空操作
最后,用一张UML图来总结一下RxJava2有背压版源码
1. Flowable通过create()与subscribe()两个方法将FlowableOnSubscribe与FlowableSubscriber收入囊中
2. 通过Flowable的create()得到FlowableCreate对象
3. 通过FlowableCreate的subscribeActual()方法得到DropAsyncEmitter对象
4. 得到DropAsyncEmitter对象的同时,触发Subscriber的onSubscribe(),并执行FlowableOnSubscribe中的subscribe,入参为DropAsyncEmitter
5. 通过对DropAsyncEmitter中onNext()、onError()与onComplete()的调用完成事件向Subscriber同名方法的传递
6. 在DropAsyncEmitter触发onNext()之前必须对其自身进行原子操作,设置背压事件发送的总数。每成功发送一次,总数减1,当总数为0,停止向Subscriber发送onNext()
RxJava2源码分析(有背压)
操作符
还是先来看一个范例
Observable.create<String> {
it.onNext("1")
it.onNext("2")
}.map(object : Function<String, Int> {
override fun apply(t: String): Int {
Log.d("Demo", "Function $t")
return t.toInt() + 100
}
}).subscribe {
Log.d("Demo", "onNext $it")
}
注意一下事件的打印顺序,map操作符的参数Function中的apply(t)回调方法会比Observer中的onNext()提前返回
Function 1
onNext 101
Function 2
onNext 102
那操作符的执行流程是怎样的呢,我们又得从源码进行查看。所有操作符的执行流程应该是一致的,这里以map操作符为例进行说明
1. ObservableMap
进入map()方法。map()的入参为Function类,它的作用很简单,就是将T类型的数据通过apply()执行之后得到R类型
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
RxJava2中有很多以Function为前缀的类,它们的命名因入参的个数不同而有所差异,但是它们的功能是一样的
FunctionX
继续往下看。通过之前的学习我们知道,map()的返回值是ObservableMap对象
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap对象在初始化的时候完成了Observable对象以及Function对象的持有
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
}
注意这个source对象,它在ObservableMap初始化的时候传递给它的父类AbstractObservableWithUpstream类。AbstractObservableWithUpstream类是一个抽象类,它继承自Observable。之前传入原始ObservableSource对象可以通过source()方法获取
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
到这里我们就明白,通过操作符操作后原Observable就被转换成包含原Observable与Function的一个新对象ObservableMap
ObservableMap
后面subscribe()没啥好说的,跟上文描述无背压版都是一致的
重点来看看ObservableMap的subscribeActual()方法。在这里,当原始Observer对象t被送到ObservableMap对象的时候,会发生一次拆合的动作。t和function被整合成一个新的MapObserver对象,让这个新的Observer重新被source订阅
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
2. MapObserver
MapObserver对象继承自BasicFuseableObserver
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
}
注意入参actual,它是下游的Observer对象。在本文范例中,它是我们自定义的原始Observer对象,如果你下游还有其他操作符比如map(),那么它可能是MapObserver等其他类型的Observer
actual被传递到而其父类BasicFuseableObserver,BasicFuseableObserver实现了Observer接口
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
protected final Observer<? super R> downstream;
protected Disposable upstream;
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
}
注意这里downstream,它就是刚才我们所说的Observer。所以这里对MapObserver调用onSubscribe的时候,通过downstream.onSubscribe(this);调用的是原始Observer中的onSubscribe
那么我们现在回过头来再看ObservableMap中的subscribeActual()。这里同样会与之前无背压版中所描述的那样,在ObservableCreate中的subscribeActual()创建CreateEmitter对象,依次发射onNext()、onError()、onComplete(),但是不同的是接收者为MapObserver,所以我们要去MapObserver中去看看这些事件是怎么被处理的
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
首先看看onNext()
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
// 注意一下这里,这里执行了mapper.apply(t)并得到了返回值
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 调用传入的原始Observer中的onNext()
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
这样一切就通了,将MapObserver进行拆分,先执行Function得到返回值,随后将返回值传递给原始Observer。剩下的就没啥好说的了,又直接调用原始Observer对象继续它的onNext()方法
总结一下操作符map的源码
1. 将原始Observable与Function包装成ObservableMap
2. 当执行ObservableMap的subscribe()方法时,会调用到其subscribeActual()
3. 对当前ObservableMap进行拆分,将其中的Function与Observer进行整合得到MapObserver,并交由Observable进行订阅
4. 继续向上对操作符包装好的Observable重复拆分直到原始Observable为止
5. Emitter触发的onNext()等操作将传递给MapObserver,MapObserver的onNext()中先执行Function的apply(),再将返回值通过MapObserver中的Observer向下发送直到原始Observer为止
线程调度
线程调度这个概念我们再熟悉不过了,在使用RxJava2进行网络请求、文件流读写等情况下我们都会使用到。RxJava2提供了5种线程调度器,我们使用Schedulers.x来声明我们所需要的线程类型
.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
这里就出现了一个关键字Schedulers
1. Schedulers
说到Schedulers,顾名思义就是多个Scheduler的意思。Schedulers中声明了5个静态变量,它们分别代表5种不同的线程调度器
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
这些静态变量在初始化的时候被赋值,他们本质上都是一个个的Task
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
你可能会问:“AndroidSchedulers.mainThread()到哪去了呢?”。别着急,它算Rx项目组专门为Android量身定做的一个线程调度器。我们稍后会讲到它
由于每种调度器实现原理都不同,所以我们选择相对容易理解的Schedulers.NEW_THREAD来进行分析。Schedulers.NEW_THREAD其实是NewThreadTask,它实现了Callable<Scheduler>接口。该接口有一个call()的回调方法,返回的是NewThreadHolder.DEFAULT对象
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
该对象也是Scheduler类型,即NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
最后就是要找到调用call()的地方了。回到Schedulers类中定义NEW_THREAD的地方。进入initNewThreadScheduler()
RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())
callRequireNonNull()的入参defaultScheduler是NewThreadTask类,不要忘记了
public static Scheduler initNewThreadScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
// 未给onInitNewThreadHandler赋值,f为null
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitNewThreadHandler;
if (f == null) {
// defaultScheduler是NewThreadTask类
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
最终我们在callRequireNonNull(defaultScheduler)发现了我们一直想要知道的s.call()。到这里NewThreadTask里的call()才被调用,NewThreadScheduler对象创建完毕。还真是绕了一大圈
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
到这里,一句话小结一下之前所描述的内容:我们所指定的线程调度器,其实都是不同的Scheduler对象
2. Scheduler
下面就是真正的线程切换部分了,切换线程也就是切换不同的Scheduler。我们分别对subscribeOn()与observeOn()进行学习。不过不要着急,我们先通过Scheduler类来了解一下线程是如何进行调度的
Scheduler是一个抽象类
public abstract class Scheduler
它有一个重要的抽象方法createWorker(),顾名思义它是创建Worker类的对象
public abstract Worker createWorker();
Worker是负责执行线程调度的关键类。它是一个抽象类,实现了Disposable接口
public abstract static class Worker implements Disposable
除此之外,Scheduler类还有一个重要的方法是scheduleDirect(),我们的事件就是在这里完成线程调度的
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 创建好Worker对象
final Worker w = createWorker();
// 选择好事件将要运行在的线程
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 对线程与Worker对象进行包装
DisposeTask task = new DisposeTask(decoratedRun, w);
// 执行线程调度
w.schedule(task, delay, unit);
return task;
}
3. scheduleDirect()解读
刚说了createWorker()是一个抽象方法,所以我们从实现类NewThreadScheduler的createWorker()开始看。这里返回的是NewThreadWorker对象
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
NewThreadWorker类继承Scheduler.Worker类并实现Disposable接口。这里面最重要的地方就是,在初始化的同时,创建了线程池ScheduledExecutorService
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
// 创建了线程池
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
}
回到Scheduler类继续往下。通过RxJavaPlugins.onSchedule(run);得到一个Runnable。这个对象是什么,我们稍后揭开谜底
再往下,这个Runnable对象与Worker对象又被包装成DisposeTask,但DisposeTask本质上依然是一个Runnable
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection
最后就是Worker调用schedule()。schedule()又是Worker中的一个抽象方法,我们来到实现类NewThreadWorker
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
跳转到scheduleActual()。这里很明显了,传进来的Runnable被包装为ScheduledRunnable,它本质上依然是一个Runnable。随后它被添加到线程池中进行执行
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
依然小结一下,线程调度器就是通过Scheduler中的createWorker创建的Worker对象后,在调用Worker中的schedule方法来完成线程调度的
这么看来,剩下的问题就是Runnable从哪里来了
4. subscribeOn()
我们先学习subscribeOn(),之后再学习observeOn()。毕竟两者有所差别
进入subscribeOn(),这里不用说我们也知道,该方法返回了一个ObservableSubscribeOn对象。ObservableSubscribeOn类有2个入参:当前Observable对象和线程调度器对象
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn继承自AbstractObservableWithUpstream,这个我们在操作符里面已经说过了,重点就是source对象
在ObservableSubscribeOn类中我们又看到熟悉的声影subscribeActual()。不过这个方法里面的内容有点复杂,我们细细的说
public void subscribeActual(final Observer<? super T> observer) {
// 得到包装后的Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 执行Observer的onSubscribe()方法
observer.onSubscribe(parent);
// 执行scheduler中的`scheduleDirect()`进行线程调度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
经过SubscribeOnObserver类包装observer得到的parent对象实际仍然是Observer对象。包装类SubscribeOnObserver里面都是对传入进来的观察者Observer的事件进行回调操作
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
// 此为传入的Observer
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
回到ObservableSubscribeOn类中的subscribeActual()继续说。这里出现了之前所说的scheduleDirect(),那么我们所需要的Runnable就是SubscribeTask对象了。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
最终在那个线程池中执行的就是SubscribeTask中的run()里的代码source.subscribe(parent);,即订阅关系的绑定。这样后续Observable中的subscribeActual()也就是上游事件都是在该线程中执行
总结一下subscribeOn()里线程调度的流程:
1. 通过传入的不同的Schedule来指定所使用的线程
2. 通过继承AbstractObservableWithUpstream得到ObservableSubscribeOn对象
3. 实现subscribeActual(),并将包装好的Observer通过scheduleDirect()传递给Scheduler
4. 使用已选择的Scheduler来创建Worker,再将被观察者的事件流通过schedule()放置在相应的线程中执行
5. observeOn()
看完了subscribeOn()我们再来看看observeOn()。总的来说,它们两的调度流程是类似的,毕竟传入的都是Scheduler对象
进入observeOn()。这个方法返回的对象是ObservableObserveOn类,默认入参是我们声明的Scheduler对象。其他参数跟线程配置有关系,不影响我们的主流程,我们可以暂时不管它
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
进入ObservableObserveOn。又是一波熟悉的节奏,与之前ObservableSubscribeOn如出一辙,那么真正做事情的地方自然而然又是subscribeActual()
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
subscribeActual()基本上都是相似的操作:先创建Worker,然后执行source.subscribe()。
这里有一个非常关键的地方,source.subscribe()绑定关系依旧会运行在之前subscribeOn()指定的线程里,因为这里的worker只跟observer有关联,并没有和source发生任何关系,也就是只有下游事件才会受到影响。
这个我将在最后用Demo进行详细的阐述
ObserveOnObserver类包装了Observer和Worker,说明我们定义的线程调度器将在这里发挥作用。
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver又实现了Observer和Runnable
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable
作为一个Observer,自然就要注意它那些常规方法,重点看onNext()、onError()和onComplete()。它们有一个共性:都有schedule()方法。所以schedule()就是线程调度的关键。
同时还要注意一下,事件发射的参数t,被加入到队列queue中了
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
果不其然,schedule()中出现了Worker,而调用的那个this,就是Runnable。worker.schedule()的出现说明线程切换在这里执行了。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
直接看run(),里面是一个判断。选择drainNormal()看看
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里面最重要的就是a.onNext(v);。a就是Observer,v就是我们发射的那个泛型值。数值从队列中被取出并向下发射这个操作就这样运行在我们自定义的线程调度器中执行
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
最后总结一下observeOn()里线程调度的流程:
1. 通过传入的不同的Schedule来指定所使用的线程
2. 通过继承AbstractObservableWithUpstream得到ObservableObserveOn对象
3. 实现subscribeActual()
4. 使用已选择的Scheduler来创建Worker,同时将Observer与Worker整合成一个新的ObserveOnObserver对象来重新订阅之前的Observable
5. 在调用观察者onNext()等方法时将相应的事件流通过schedule()放置在相应的线程中执行
6. AndroidSchedulers
我们重点关注一下为什么使用AndroidSchedulers.mainThread()可以实现切换到UI线程。这个线程调度器本质上是HandlerScheduler,里面有一个重要的入参Handler:new Handler(Looper.getMainLooper())。我想只要你懂android开发,应该都明白了个大概
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
进入HandlerScheduler类,直接看schedule。这里就不用多说了吧
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
// 包装handler跟run
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 创建Message对象
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
// 使用handler发送message,handler运行环境为主线程
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
7. 关于线程切换的Demo
请回答subscribe()与onNext()分别打印的是哪个线程?
Observable.create(ObservableOnSubscribe<String> { emitter ->
Log.d("Demo", "subscribe: " + Thread.currentThread().name)
emitter.onNext("2")
emitter.onComplete()
})
.subscribeOn(AndroidSchedulers.mainThread())
.map {
Log.d("Demo", "map: " + Thread.currentThread().name)
return@map "2$it"
}
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.d("Demo", "onNext: " + Thread.currentThread().name)
}
override fun onError(e: Throwable) {
}
})
给出答案
subscribe: main
map: main
onNext: RxNewThreadScheduler-2
就像我们之前所说的,subscribeOn()管的只是观察者与被观察者绑定的那个部分,所以自然而然是逐层向上,越靠近Observable的那个线程策略才会生效。而observeOn()面对的是观察者,自然而然是逐层向下,对下游产生影响。至于中间的那个map操作符,是因为上游的emitter运行在AndroidSchedulers.mainThread(),在没有遇到observeOn()改变策略前肯定是不会发生变化的
结合官网图片加深理解吧
RxJava2线程切换











网友评论