前言
讲到RxJava不得不提到他的线程切换,前面本来打算是按字母顺序把操作符一个个介绍,现在看来其实很多操作符的源码看过去都是差不多的,我这样一个个介绍难免显得过于重复,啰嗦,所以还是想把几个最重要的,最常用的挑出来,然后给大家详细的分析,这样应该更好。
用法+解析
subscribeOn其实是很通用的方法。用来切换线程。他的参数呢是Scheduler对象。我们可以直接看RxJava中为我们提供了多少种Scheduler。
前面几篇文章的写法呢,直接用语言来概括用法,我觉得不太好,还是应该用例子来说明,这样会比较好。
图片.png
Schedulers中定义了许多 返回不同Scheduler的方法。
由于篇幅有限,这里我介绍一个最常用的Schedulers.io(),如果反响比较好,可以再介绍其他的,其实搞懂了一个,其他也是一样的。
首先了解下用法,然后深入源码。
public class MainActivity extends AppCompatActivity {
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
findViewById(R.id.btClick).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.e("aaaaa", "" + Thread.currentThread().getName());
}
});
}
});
}
}
快速点击按钮
图片.png
看日志,始终是复用了一个叫
RxIoScheduler-2的线程。我们都知道线程的创建和销毁是很消耗资源的。所以我们尽量避免使用Schedulers.newThread()每次都去创建一个线程,而是去使用Schedulers.io()可以去复用已有的线程。
等待2分钟以上,我们再次去快速点击按钮。
图片.png
发现这一次的所有的线程名称变为了
RxIoScheduler-3的线程。
初步推断RxIoScheduler-2长时间未使用,被销毁了。那么我们可以用工具去验证下,查看下当前存在的线程,查看RxIoScheduler-2是否是被销毁了。
工具查看RxIoScheduler-2是否被销毁
-
进入debug模式
图片.png
-
查看线程状态
图片.png
-
间隔2分钟,进行2次快速点击,再查看线程状态
图片.png
图片.png
很显然,
RxIoScheduler-2线程已经被销毁了
深入源码
- Schedulers.io()
Schedulers
...
Scheduler io = hook.getIOScheduler();
if (io != null) {
ioScheduler = io;
} else {
ioScheduler = RxJavaSchedulersHook.createIoScheduler();
}
...
public static Scheduler io() {
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}
...
hook.getIOScheduler()==null所以走下面分支。
hook其实是提供了一个全局的钩子,一般都不会自己去实现,所以直接是使用默认的hook,返回为null
RxJavaSchedulersHook
...
public Scheduler getIOScheduler() {
return null;
}
...
public static Scheduler createIoScheduler() {
return createIoScheduler(new RxThreadFactory("RxIoScheduler-"));
}
...
public static Scheduler createIoScheduler(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory == null");
}
return new CachedThreadScheduler(threadFactory);
}
...
这里我们可以看出来Schedulers.io()返回的其实是CachedThreadScheduler 。
- RxThreadFactory
其实在上面代码中,我们看到了一个比较熟悉的东西,就是RxIoScheduler-这么一个字符串,在前面的例子其实我们已经有所提到,就是线程的名称是RxIoScheduler-2,RxIoScheduler-3...这样的。
RxThreadFactory
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
...
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, prefix + incrementAndGet());
t.setDaemon(true);
return t;
}
...
这里就是真正来创建线程的地方,prefix + incrementAndGet()就是线程的name。
RxThreadFactory继承了AtomicLong来实现线程安全,不会出现2个同名的情况。
- subscribeOn
我们已经知道Schedulers.io的返回和真正创建线程的地方。
那么重新回到subscribeOn操作符下,一步步跟入。
Observable
...
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}
...
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}
...
this显然不是ScalarSynchronousObservable所以走下面分支。给Observable外面再套一层Observable。这不是关键。关键的是OperatorSubscribeOn是一个OnSubscribe。
其实前面我讲解了几篇操作符的文章,操作符主要就是对应了一个OnSubscribe,我们直接看里面的具体实现。
今天这篇文章主要介绍线程切换,可能对RxJava整个链式函数的流程说的不是很清楚。
可以参考RxJava操作符源码解析
因为每个操作符的整体流程其实是不变的,最重要的是操作符对应的OnSubscribe具体做了什么,这里就不重复去讲整个流程了。
- OperatorSubscribeOn
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
//最重要的一句,切换线程
inner.schedule(parent);
}
scheduler其实在前面已经介绍,就是CachedThreadScheduler。
我们来分析下上面的代码。
- 创建一个
Worker对象
创建一个Worker,这个worker也是一个Subscription - 创建
Subscriber对象,并绑定parent,inner
绑定后,Subscriber对象unsubscribe的时候,parent,inner一起unsubscribe - 使用
Worker对象切换线程
- SubscribeOnSubscriber
首先这是一个Subscriber,对我们自己实现的Subscriber做了一次包装,先执行这里的onNext等方法然后再执行我们自己实现的Subscriber的onNext等方法。
SubscribeOnSubscriber
...
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}
@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}
...
onNext 其实不作任何处理
onError,onCompleted都会调用worker.unsubscribe();
这一句是关键。
简单的理解就是执行完毕后,就会把Worker对象解除订阅。
- Worker.schedule
inner.schedule是用来切换线程的,我们可以深入看看。
CachedThreadScheduler
...
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
所以这里的inner就是EventLoopWorker
EventLoopWorker
...
@Override
public Subscription schedule(Action0 action) {
return schedule(action, 0, null);
}
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}
ScheduledAction s = threadWorker.scheduleActual(new Action0() {
@Override
public void call() {
if (isUnsubscribed()) {
return;
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
return s;
}
...
threadWorker是什么?一步步往前推
CachedThreadScheduler
...
final AtomicReference<CachedWorkerPool> pool;
...
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
...
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.once = new AtomicBoolean();
this.threadWorker = pool.get();
}
...
这里其实看的比较迷糊,因为用了2个相同名字的变量,并且有2个相同名字的方法。
第一个pool.get()获取的是CachedWorkerPool对象
第二个pool.get()获取的是ThreadWorker对象
我们直接看第二个pool.get()也就是 CachedWorkerPool的`get`方法。
...
ThreadWorker get() {
if (allWorkers.isUnsubscribed()) {
return SHUTDOWN_THREADWORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
...
这里我们就可以清楚的看出,CachedThreadScheduler是如何复用的。先从expiringWorkerQueue取,没有再去创建ThreadWorker,这里传入的参数threadFactory就是前面介绍的RxThreadFactory。
-
如何维护
expiringWorkerQueue
既然知道了使用expiringWorkerQueue来复用线程。那么我们看看具体是如何维护的。
带着几个问题。- 如何添加ThreadWork到
expiringWorkerQueue
2.如何移除ThreadWork
首先我们要知道,在使用的
ThreadWorker显然是不会放入expiringWorkerQueue,然后给其他地方复用的。所以肯定是,当ThreadWorker执行完毕之后。 - 如何添加ThreadWork到
这里其实我们可以在原来的demo基础之上稍微修改一下,救能很快的证明这一点。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.e("aaaaa", "" + Thread.currentThread().getName());
try {
//添加sleep
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
快速点击多次,看输出。
图片.png
由于添加了Thread.sleep后,你快速点击多次,点击第二次的时候,上一个ThreadWorker未执行完成,所以不能复用,而是重新创建了。但是当点击第8次的时候,第一个 ThreadWorker其实是已经执行完毕了,所以RxIoScheduler-2线程被复用。
在看源码之前,我们先了解下CachedThreadScheduler,CachedWorkerPool,EventLoopWorker这3者的关系。
CachedThreadScheduler:第一次使用Schedulers类方法的时候创建,单例
CachedWorkerPool:创建CachedThreadScheduler的时候创建,单例
EventLoopWorker:每次调用subscribeOn操作符的时候创建,非单例
3个类的代码都在CachedThreadScheduler.java文件中。
下面我们直接看代码
CachedThreadScheduler
...
static final class CachedWorkerPool {
void release(ThreadWorker threadWorker) {
...
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
...
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
...
@Override
public void call() {
pool.release(threadWorker);
}
...
可以看出,调用EventLoopWorker的call方法,就会给ThreadWorker设置过期时间,然后加入expiringWorkerQueue队列。
那么在哪里调用EventLoopWorker的call方法呢?
在前面介绍SubscribeOnSubscriber的最后,提到了onError,onCompleted都会调用worker.unsubscribe();
...
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
...
@Override
public void unsubscribe() {
if (once.compareAndSet(false, true)) {
// unsubscribe should be idempotent, so only do this once
// Release the worker _after_ the previous action (if any) has completed
threadWorker.schedule(this);
}
innerSubscription.unsubscribe();
}
@Override
public void call() {
pool.release(threadWorker);
}
...
EventLoopWorker本身也是个Action0,
threadWorker.schedule(this);其实就是直接异步调用Action0的call方法。
这么看来其实就是当Subscriber的onComplete或onError方法调用后,线程就会放在队列中复用。
那么第二个问题,什么时候从队列中移除这个线程?
CachedThreadScheduler
void release(ThreadWorker threadWorker) {
...
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
大家可以看出来当空闲的ThreadWorker添加到队列前设置了一个ExpirationTime,也就是超时时间。
这里可以告诉大家keepAliveTime其实是60s,可以配置的。
...
static final class CachedWorkerPool {
...
CachedWorkerPool(final ThreadFactory threadFactory, long keepAliveTime, TimeUnit unit) {
...
if (unit != null) {
//创建了一个ScheduledExecutorService
evictor = Executors.newScheduledThreadPool(1, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
Thread thread = threadFactory.newThread(r);
thread.setName(thread.getName() + " (Evictor)");
return thread;
}
});
...
//开启一个新的Thread每隔60s一次调用evictExpiredWorkers
task = evictor.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
evictExpiredWorkers();
}
}, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS
);
}
...
//遍历expiringWorkerQueue,移除已经过期的ThreadWorker
void evictExpiredWorkers() {
if (!expiringWorkerQueue.isEmpty()) {
long currentTimestamp = now();
for (ThreadWorker threadWorker : expiringWorkerQueue) {
if (threadWorker.getExpirationTime() <= currentTimestamp) {
if (expiringWorkerQueue.remove(threadWorker)) {
allWorkers.remove(threadWorker);
}
} else {
// Queue is ordered with the worker that will expire first in the beginning, so when we
// find a non-expired worker we can stop evicting.
break;
}
}
}
}
...
}
已经在关键代码上添加了注释,整体来看,就是在创建CachedWorkerPool的时候开启了一个线程每隔60s去遍历移除过期的ThreadWorker
下面我们可以再来看下刚才的图片,看过源码之后,我们可以再去理解一下为什么会这么输出。
图片.png
大家可以前面我让大家等待2分钟以上,然后线程变成了RxIoScheduler-3,因为RxIoScheduler-2显然已经被移除了。
那么这里为什么会是2分钟呢,刚才不是介绍了keepAliveTime是60s么。
- 这里会给
ThreadWorker设置为过期时间为60s - 然后每60s遍历并移除过期
ThreadWorker
如果运气不好的话,当你ThreadWorker的工作刚刚完成,设置了过期时间为60s之后,这一次的evictExpiredWorkers已经执行过了,只能等待下一次。所以等待了60s左右进行下一次的evictExpiredWorkers,显然这时候ThreadWorker的过期时间还没到,还差一点。只能等待下一次evictExpiredWorkers。所以最好是2分钟。
总结
其实呢已经把最关键的点都过了一遍,但是大家对整个流程到底哪里切换了线程不是非常理解,我会在有空的时候更新一张图,画出来。
Observable.create 其实是最简单的,如果使用Observable.just,Observable.from等方法的话整个流程会更加复杂,简单去理解subscribeOn或者其他操作符的话,建议大家都用Observable.create,这样就避免调用setProducer,request等方法,这样对理解操作符来说会有很大的帮助。
有空再更新下,其实还没有写完。












网友评论