以下都是本人收集和总结的内容:
1. 什么是Rxjava
Rx含义
ReactiveX是Reactive Extensions的缩写,一般简写为Rx。微软给的定义是,Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序,使用Rx,开发者可以用Observables表示异步数据流,用LINQ操作符查询异步数据流, 用Schedulers参数化异步数据流的并发处理,Rx可以这样定义:Rx = Observables + LINQ + Schedulers。
Rxjava含义
简单而言:RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序
2. Rxjava理解与扩展(观察者模式)
简单概括就是,观察者(Observer)需要在被观察者(Observable)变化的一瞬间做出反应。
而两者通过注册(Register)或者订阅(Subscribe)的方式进行绑定。
观察者模式
其中这个Button就是被观察者(Observable),OnClickListener就是观察者(Observer),两者通过setOnClickListener达成订阅(Subscribe)关系,之后当Button产生OnClick事件的时候,会直接发送给OnClickListener,它做出相应的响应处理。
而RxJava的观察者模式呢,跟这个差不多,但是也有几点差别:
- Observer与Observable是通过 subscribe() 来达成订阅关系。
- RxJava中事件回调有三种:onNext() 、 onCompleted() 、 onError() 。
- 如果一个Observerble没有任何的Observer,那么这个Observable是不会发出任何事件的。
关于RxJava的回调事件:
onNext():基本事件。
onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
值得注意的是在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。如果在队列中调用了其中一个,就不应该再调用另一个。
好了,那我们也附一张图对比一下吧:
观察者模式
3. 如何实现RxJava
3.1创建Observer
在Java中,一想到要创建一个对象,我们马上就想要new一个。没错,这里我们也是要new一个Observer出来,其实就是实现Observer的接口,注意String是接收参数的类型:
//创建
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext ---> ", "Item: " + s);
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
}
@Override
public void onError(Throwable e) {
Log.i("onError ---> ", e.toString());
}
};
当然这里也要提另外一个接口:Subscriber ,它跟Observer接口几乎完全一样,只是多了两个方法,总结:
-
onStart(): 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。
-
unsubscribe(): 用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用 unsubscribe() 来解除引用关系,以避免内存泄露的发生。
虽然多了两个方法,但是基本实现方式跟Observer是一样的,所以暂时可以不考虑两者的区别。不过值得注意的是
实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。
3.2创建Observable
与Observer不同的是,Observable是通过 create() 方法来创建的。注意String是发送参数的类型:
//创建
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
});
3.3订阅(Subscribe)
在之前,我们创建了 Observable 和 Observer ,现在就需要用 subscribe() 方法来将它们连接起来,形成一种订阅关系:
//订阅
observable.subscribe(observer);
这里其实确实有点奇怪,为什么是Observable(被观察者)订阅了Observer(观察者)呢?其实我们想一想之前Button的点击事件:
Button.setOnClickListener(new View.OnClickListener())
Button是被观察者,OnClickListener是观察者,setOnClickListener是订阅。我们惊讶地发现,也是被观察者订阅了观察者,所以应该是一种流式API的设计吧,也没啥影响。
完整代码如下:
//创建Observable
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
}
});
//创建Observe
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext ---> ", "Item: " + s);
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
}
@Override
public void onError(Throwable e) {
Log.i("onError ---> ", e.toString());
}
};
//订阅
observable.subscribe(observer);
运行的结果如下,可以看到Observable中发送的String已经被Observer接收并打印了出来:
运行结果
3.4线程控制——Scheduler
Scheduler是RxJava的精髓之一了。
在RxJava中,Scheduler相当于线程控制器,可以通过它来指定每一段代码运行的线程。
RxJava已经内置了几个Scheduler,总结:
-
Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler。
-
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
-
Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler。行为模式和newThread()差不多,区别在于io()的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率。不要把计算工作放在io()中,可以避免创建不必要的线程。
-
Schedulers.computation(): 计算所使用的Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
-
AndroidSchedulers.mainThread(),Android专用线程,指定操作在主线程运行。
那我们如何切换线程呢?RxJava中提供了两个方法:subscribeOn() 和 observeOn() ,两者的不同点在于:
subscribeOn(): 指定subscribe()订阅所发生的线程,即 call() 执行的线程。或者叫做事件产生的线程。
observeOn(): 指定Observer所运行在的线程,即onNext()执行的线程。或者叫做事件消费的线程。
这里确实不好理解,没关系,下面我们在具体例子中观察现象。
//创建被观察者
Observable.create(new Observable.OnSubscribe<Bitmap>() {
/**
* 复写call方法
*
* @param subscriber 观察者对象
*/
@Override
public void call(Subscriber<? super Bitmap> subscriber) {
//通过URL得到图片的Bitmap对象
Bitmap bitmap = GetBitmapForURL.getBitmap(url);
//回调观察者方法
subscriber.onNext(bitmap);
subscriber.onCompleted();
Log.i(" call ---> ", "运行在 " + Thread.currentThread().getName() + " 线程");
}
})
.subscribeOn(Schedulers.io()) // 指定subscribe()发生在IO线程
.observeOn(AndroidSchedulers.mainThread()) // 指定Subscriber的回调发生在UI线程
.subscribe(new Observer<Bitmap>() { //订阅观察者(其实是观察者订阅被观察者)
@Override
public void onNext(Bitmap bitmap) {
mainImageView.setImageBitmap(bitmap);
Log.i(" onNext ---> ", "运行在 " + Thread.currentThread().getName() + " 线程");
}
@Override
public void onCompleted() {
mainProgressBar.setVisibility(View.GONE);
Log.i(" onCompleted ---> ", "完成");
}
@Override
public void onError(Throwable e) {
Log.e(" onError --->", e.toString());
}
});
现在来看一下运行的Log日志:
Log
可以看到,call方法(事件产生)执行在IO线程,而onNext方法(事件消费)执行在main线程。说明之前分析的是对的。
3.5操作符
所谓操作符(Operators),简单来说就是一种指令,表示需要执行什么样的操作。Rx中的每种编程语言实现都实现了一组操作符的集合。RxJava也不例外。
RxJava中有大量的操作符,比如创建操作符、变换操作符、过滤操作符等等,这些操作符要全部讲解完几乎是不可能也没必要的事情。所以我们只介绍常见的、有用的、重要的操作符。其他的如果用到直接到文档查找就行了。
下面就针对前篇文章的创建(create)来说明一下另外两种常见的创建操作符。
Observable.just()
首先给出定义:
Just操作符是创建一个将参数依次发送出来的Observable
具体一点来说就是, just() 中会接收1~9个参数,它会返回一个按照传入参数的顺序依次发送这些参数的Observable。
这样说可能还是不够清晰,所以画个图来看:
JUST流程图
从图中可以看出,其实就是依次发送单个数据,它的具体写法是这样的,非常简单:
Observable.just("Hello","world");
//其实就相当于依次调用:
//subscriber.onNext("Hello");
//subscriber.onNext("World");
但是这里要注意一点,如果你传递null给just,它会返回一个发送null值的Observable,而不是返回一个空Observable(完全不发送任何数据的Observable)。后面会讲到,如果需要空Observable应该使用 Empty 操作符。
现在来看完整的代码,代码本身很简单,注意看Log日志:
//创建Observable
Observable.just("Hello", "World", null) .subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
if (s == null) {
Log.i("onNext ---> ", "null");
}else {
Log.i("onNext ---> ", s);
}
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
}
@Override
public void onError(Throwable e) {
Log.i("onError ---> ", "出错 --->" + e.toString());
}
});
log
这里因为我们要打印字符串,所以不能为null,我就处理了一下,可以看到当发送 null 的时候,s确实等于null。
Observable.from()
尽管与just一样是创建操作符,但是from操作符稍微强大点。因为from操作符的作用是:
将传入的数组或 Iterable 拆分成具体对象后,依次发送出来。
注意,这里不再是发送单个对象,而是直接发送一组对象。为了与just对比,也来画个图描述一下:
from流程图
它的具体写法是这样的,也非常简单:
String[] str = new String[]{"Hello", "World"};
//创建Observable
Observable.from(str);
4. 结合Rxjava源码深度学习
基础源码
实现RxJava的代码,这里我打上了Log日志,来看一下每个方法执行的顺序。
//创建Observable
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("World");
subscriber.onCompleted();
Log.i("执行顺序 ---> ", " call ");
}
}).subscribe(new Observer<String>() {
@Override
public void onNext(String s) {
Log.i("onNext ---> ", s);
Log.i("执行顺序 ---> ", " subscribe onNext");
}
@Override
public void onCompleted() {
Log.i("onCompleted ---> ", "完成");
Log.i("执行顺序 ---> ", " subscribe onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i("onError ---> ", "出错 --->" + e.toString());
}
});
好了,来看一下Log日志:
执行log
从图中可以看到,subscribe方法先执行,等执行完成后再执行call方法。
好了,这就是结论。先在脑子里产生个印象,方便后面追溯。
4.1 create()
进入Observable的create()方法做了些什么:
public class Observable<T> {
.....省略代码......
static final RxJavaObservableExecutionHook hook =
RxJavaPlugins.getInstance().getObservableExecutionHook();
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
.....省略代码......
}
直接返回一个 Observable,接下里继续看看它的构造函数:
public class Observable<T> {
.....省略代码......
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
.....省略代码......
}
返回继续查看 hook.onCreate(f) 。 hook 是什么鬼?
hook是一个代理对象, 仅仅用作调试的时候可以插入一些测试代码。如单元测试
static final RxJavaObservableExecutionHook hook =
RxJavaPlugins.getInstance().getObservableExecutionHook();
继续查看hook.onCreate(f) :
public abstract class RxJavaObservableExecutionHook {
.....省略代码......
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
return f;
}
.....省略代码......
}
直接把OnSubscribe 这个对象返回了一下。
创建时做了三件事情:
返回了一个Observable(假设为ObservableA)
返回了一个OnSubscribe(假设为OnSubscribeA)
把返回的OnSubscribeA在ObservableA构造函数中保存为ObservableA的 .onSubscribe 属性
create创建流程图
create()方法创建了一个Observable,且在这个Observable中有个OnSubscribe。
所以就画个简图就如下图所示这样:
create简图
4.2 subscribe()
subscribe() 这个是将观察者(Observer)与被观察者(Observable)联系到一起的操作,也就是产生一种订阅(Subcribe)关系。
先查看源码:
public class Observable<T> {
.....省略代码......
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
return subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
observer.onCompleted();
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onNext(T t) {
observer.onNext(t);
}
});
}
.....省略代码......
}
实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。
在这里就能够看出,首先 if 中的语句意思是如果这个Observer已经是Subscriber类型,那就直接返回。如果不是的话 new了一个Subscriber ,再点进去看看:
public abstract class Subscriber<T> implements Observer<T>, Subscription {
.....省略代码......
}
果然,它还是转成了Subscriber类型,刚好印证了之前的话。所以为了方便起见,之后文章中,所有的观察者(Observer)我都用Subscriber来代替。
继续看 subscribe 源码:
public class Observable<T> {
.....省略代码......
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe<T> onSubscribe;
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
.....省略代码......
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
.....省略代码......
}
.....省略代码......
}
把一些暂时无关的代码省略掉来看,其实就是执行了一句 hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber); 。
而这个 hook.onSubscribeStart 方法再点进去看看:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe) {
// pass through by default
return onSubscribe;
}
可以看到,竟然直接返回了一个 onSubscribe ,由于之前说过这个hook没什么作用,直接删掉,那就等于整个 subscribe 做了一件事就是 onSubscribe.call(subscriber) ,当然这个call里面的参数subscriber是我们代码中传递进去的。
而onSubscribe在create源码解析中我们已经知道是新建 ObservableA 的一个属性,所以总结来说,subscribe()方法做的事情就是这样:
ObservableA.onSubscribe.call(subscriber);
而调用 call方法,就是调用传入的参数subscriber的onNext/onCompleted/onError方法。这就是全部的过程。依然画个图来说,图中省略了create中的创建步骤:
使用过程
结合图我们最后再顺一下思路:
首先创建过程也就是create()方法中创建了一个Observable,并有一个onSubscribe属性;
其次在订阅过程也就是subscribe()方法中,调用了create()方法中创建的Observable的onSubscribe属性的call方法;
最后这个call回调的就是代码中创建的Subscriber的onNext/onCompleted/onError方法。
之前Log日志可以看出,将onNext与onCompleted方法执行完后,call方法才结束。这也印证了call方法回调Subscriber的方法这一说。
4.3 map
4.3.1map使用流程
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
并且回顾Observable.create过程
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
//直接返回
return f;
}
4.3.2map源码
这里比较神奇的地方是这个 map,其实 map 实际上做了两件大事:
- (第一件)new 了一个 变形函数, 保存在了
OperatorMap.transform中
查看map源码
public class Observable<T> {
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
}
点击继续查看OperatorMap
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
}
可以看出就是把new Func1
...省略代码...
map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
...省略代码...
保存在了transformer中去。
- (第二件)new了一个新的 Observable. 这个 Observable 的构造函数中, 传入了一个新的 OnSubscribe. 整个
lift函数的难点就在于这个 OnSubscribe 对象中. 我们仔细看一下它做了什么. 它其实也做了两件大事儿:
进入lift函数
public class Observable<T> {
.....省略代码.......
final OnSubscribe<T> onSubscribe;
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
//新的Observable
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//hook.onLift(operator).call(o)创建了一个新的 Subscriber
//(实际上是一个 proxy)并调用了OperatorMap中的Subscriber.onNext
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
.....省略代码.......
}
继续进入OnSubscribe.call 函数中, 看一下源码:
public final class OperatorMap<T, R> implements Operator<R, T> {
....省略代码.....
final Func1<? super T, ? extends R> transformer;
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
//函数transform执行对参数t进行变形然
//后将变形结果转发给o.onNext
@Override
public void onNext(T t) {
try {
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
....省略代码.....
}
-
hook.onLift(operator).call(o)
创建了一个新的 Subscriber (实际上是一个 proxy), 并在Subscriber.onNext中调用transform函数对参数t进行变形, 然后将变形结果转发给o.onNext`. 这么上面的变量o是哪里的, -
OnSubscribe.call 调用了 4.3.1中
create创建出来的 Observable.onSubscribe 函数!
17.png
很简单, 该变形函数保存在了 OperatorMap.transform 中.
**总结一下 map 的行为: **
- 创建了一个新的 Observable,
- 创建了一个新的 OnSubscribe: 其中的
call方法是整个调用链的关键. 它调用了上一级Observable.onSubscribe.call, 同时, 还将结果通过transform对 4.3.1处理后的结果进行变形。
3.subscribe 触发整个回调流程. 我们来看一下主要流程
refactor.png
这一步也很简单, 就是通过 Observable.subscribe 调用该对象的 Observable.onSubscribe.call 方法, 然后经过一系列调用, 最终由该对象内部临时创建的 Subscriber 对象(上文中的 proxy 对象) 调用用户目标 Subscriber (即代码中 .subscribe(…) 中的参数) 的方法.
4.4 Schedulers
.subscribeOn(Schedulers.io())和.observeOn(Schedulers.computation())应用后的原理.
4.4.1基本使用
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
}
});
我们来拆解一下 .subscribeOn 和 .observeOn 的作用范围:
24.png
-
subscribeOn将作用于create中的OnSubscribe.call()方法. -
observeOn作用于其语法中下一语句的Subscriber.onNext等函数中.
首先分析 subscribeOn
public class Observable<T> {
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//创建了一个 Observable 来转发 OnSubscribe.call 请求
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
}
与 map 一样, 是通过创建了一个 Observable 来转发 OnSubscribe.call 请求(代码中的 OperatorSubscribeOn 继承自 OnSubscribe. 来看看具体实现
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
subscriber.add(inner);
//这里使用了Worker.schedule方法改变了source.call()方法执行的线程
inner.schedule(new Action0() {
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
subscriber.onError(e);
} finally {
inner.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
subscriber.onCompleted();
} finally {
inner.unsubscribe();
}
}
@Override
public void setProducer(final Producer p) {
subscriber.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread()) {
p.request(n);
} else {
inner.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
};
source.unsafeSubscribe(s);
}
});
}
}
}
可见, 该函数中做了如下两件事:
- 创建一个用于在不同线程执行的
Worker对象(代码中的 inner) - 使用上述
inner在该对象所代表的线程中执行Observable.onSubscribe.call方法(代码中的source.unsafeSubscribe(s);
再来分析 observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
//返回一个新Observable对象
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
}```
继续查看OperatorObserveOn对象及OperatorObserveOn.call` 方法是如何生成 `st` 对象的
public final class OperatorObserveOn<T> implements Operator<T, T> {
private final Scheduler scheduler;
private final boolean delayError;
private final int bufferSize;
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
if (scheduler instanceof ImmediateScheduler) {
// avoid overhead, execute directly
return child;
} else if (scheduler instanceof TrampolineScheduler) {
// avoid overhead, execute directly
return child;
} else {
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init();
return parent;
}
}
}
该方法会根据 `scheduler` 的类型决定返回什么样的`Subscriber` 对象. 可见, 如果 child 类型为 `ImmediateScheduler` 或者 `TrampolineScheduler` 等以当前线程为执行环境的类型, 则直接返回 `child` 对象. 本例中, `child` 为 `NewThreadScheduler`, 因此将通过 `ObserveOnSubscriber` 对 `child` 进行包装. 生成一个 proxy subscriber 对象.
返回来继续查看
public class Observable<T> {
final OnSubscribe<T> onSubscribe;
//这段代码熟悉吧
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
}
继续查看OnSubscribeLift
```public final class OnSubscribeLift<T, R> implements OnSubscribe<R> {
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
//调用并且切换线程
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
st.onStart();
//熟悉.....
parent.call(st);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
o.onError(e);
}
}
}
至此, 我们可以知道 observeOn 是通过以下方法对其后面的 Subscriber 进行控制的:
-
lift->OnSubscribe.call->proxy subscriber = new Subscriber(original subscriber)创建了一个新的Subscriber(实际上是个代理) - 在上述
proxy subscriber中对original subscriber对象的执行进行转发. 转发过程中,proxy subscriber完全可以自由的控制original subscriber执行的线程.
l1.png










网友评论