正文
本文将通过一段实例代码的实际执行顺序来分析RxJava2的源码.这样梳理一遍之后,思路会清晰很多.RxJava版本为2.0.8.
//代码片段一
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
})
.subscribeOn(Schedulers.io())
.map(new Function<String, Integer>() {
@Override
public Integer apply(@NonNull String s) throws Exception {
return Integer.parseInt(s);
}
})
.observeOn(Schedulers.computation())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
print("onSubscribe called");
}
@Override
public void onNext(@NonNull Integer integer) {
print("onNext called " + integer);
}
@Override
public void onError(@NonNull Throwable e) {
print("onError called");
}
@Override
public void onComplete() {
print("onComplete called");
}
});
上面就是实例代码,我们接下来以上面的代码为引线,分析RxJava2源码.首先从create()开始.
//代码片段二
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//判空,后面很多类似的代码,后面就不在赘述
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins这个类都是hook相关的,本例中不涉及hook,所以可以暂时忽略相关代码,认为onAssembly()传入什么就返回什么就可以了.这里传入的是ObservableCreate,暂且先记着这个类吧,后面回调的时候我们又会回来的
//代码片段三
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
ObservableCreate将传入的ObservableOnSubscribe source参数先保存起来.source通过字面意思我们可以把它理解为事件源,也就是事件流中的被观察者.先不管这个回调,我们直接往下看subScribeOn()这个方法
//代码片段四
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
这个方法跟上面的create()方法结构相似,不过这里保存的是ObservableSubscribeOn
//代码片段五
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
这里也是把事件源先保存起来,同时把传进来的线程调度也保存起来.接下来看map()
//代码片段六
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
还是跟上面类似的结构,我们直接看ObservableMap
//代码片段七
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
这里保存事件源,并且把转换事件流的方法先保存起来.接下来看observeOn()
//代码片段八
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这里暂时不管delayError和bufferSize,先看ObservableObserveOn
//代码片段九
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
保存了事件源即其他的相应参数.总结以下,到目前为止,代码只是创建了很多被观察者,并保存起来.初看源码的时候会被这些类的命名搞的晕头转向,但是等你真正理通了整个流程,会发现这些命名还是很有规律的.接下来就是比较关键的subscribe()方法
//代码片段十
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribe()里面做了一些判空操作之后,调用了最关键的subscribeActual(),这里面就是观察者和被观察者真正建立订阅关系的地方.由于subscribe()是基类Observable里面的抽象方法,所以类似一个模板方法,如果子类没有@Override这个方法,那么subscribe()都将执行这段代码,所以后文中我们会从subscribe()方法直接跳到subscribeActual().这里调用subscribeActual()的是observeOn()返回的ObservableObserveOn对象
//代码片段十一
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里我们用的不是TrampolineScheduler,则进入else的流程.里面创建了一个Worker对象, 然后调用了subscribe()方法.这里的source就是之前map()保存的事件源ObservableMap(见代码片段六).我们去看看它的subscribeActual()
//代码片段十二
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这里的source是之前subscribeOn()里面保存的事件源ObservableSubscribeOn(见代码片段四),我们去看看它的subscribeActual()
//代码片段十三
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里我们重点关注最后一句,这里的scheduler就是subscribeOn()(见代码片段五)传进来的线程参数,也就是我们指定的被观察者需要运行的线程.我们看一下`SubscribeTask的定义
//代码片段十四
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这里在run()方法里面调用了subscribe()方法,也就是说,后面的操作都是在我们指定的线程中运行的,直到下一次切换线程.这里的source就是create()里面我们创建的ObservableOnSubscribe(见代码片段一),而它的subscribe()已经在create()里面定义的匿名内部类@Override了,无需调用subscribeActual()了,直接调用定义好的subscribe().直到这里,终于轮到我们的被观察者,也就是事件源运行了.这里又总结以下,运行到这里,代码通过subscribe()方法一步一步往上游回溯,并将上下游之间建立了订阅关系.并且已经完成了subsribeOn()的线程切换.
看到这里忍不住说一下,之前看@扔物线大神的文章的时候,里面说subscribeOn()可以调用多次,但只有最上游的,也就是离事件源最近的subscribeOn()才有效.之前对于这一点不是特别理解,看了这里的源码之后就明白了.这时候的事件源还没有开始发送,所以不管你在后面调用多少次subscribeOn()只是指定了事件应该在哪个线程运行,却不是真正的运行.也就是说,只有最后一次的subscribeOn()才有最终的决定权.所以才有了上面的结论.
被观察者操作也很简单,就在onNext()发送了一个"1"字符串
//代码片段十五
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
})
这里的e就是前面SubscribeTask(见代码片段十四)中传入的parent,parent就是之前创建的SubscribeOnObserver,我们看下它的onNext()
//代码片段十六
public void onNext(T t) {
actual.onNext(t);
}
onNext()里面调用了actual.onNext(),actual一直往上回溯,其实就是subscribeOn()的观察者,也就是MapObserver(见代码片段十二).这里的t就是一开始传进来的"1".
//代码片段十七
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
MapObserver中的onNext()主要做了一件事件,就是把之前存下来的转换事件流方法mapper执行了,将字符串"1"转成了Integer型的1,并调用了其观察者的onNext(),也就是ObserveOnObserver(见代码片段十一),这里的t已经是经过map转换后的Ingeter型的1了.
//代码片段十八
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
那么observeOn()如何做到切换线程的呢?我们继续往下看,首先看schedule()方法
//代码片段十九
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker.schedule()参数是Runnable,而ObserveOnObserver实现了Runnable接口,直接直接去看run()方法
//代码片段二十
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
我们先关心正常流程,看看drainNormal()
//代码片段二十一
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
现在这里已经是在onserveOn()里面指定的线程运行了.首先定义了一个队列用来存放上游发来的事件流,然后一个一个取出来,经过一系列的判空处理,调用了a.onNext(v).这里的a由上面的actual赋值而来,而actual就是我们代码中在subscribe()中定义的最终的观察者.看到这里我们就能看出observeOn()和subscribeOn()的区别了.observeOn()是在事件发送之后切换线程的,也就是会影响下游的运行线程,并且每次切换都能影响下一个观察者所运行的线程.下图是一个大体的流程图,它表明了整个事件执行的过程.
流程图
总结
之前只是知道怎么用RxJava,觉得用起来很爽,但是一直处于一种知其然不知其所以然的状态.这次梳理了一下源码之后思路清晰了很多.果然啃源码才是学习一个框架最直接的方式











网友评论