当你看到这篇文章的时候,想必对rxjava的使用已经非常熟练了。我将从常用的调用出发,带你分析一下内部的实现原理。废话不多说,从最简单的开始。
一、最简单的调用
val observer = object : Observer<String>{
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.e("tag", t)
}
override fun onComplete() {
}
override fun onError(e: Throwable) {
}
}
Observable.just("1")
.subscribe(observer)
运行后控制台会输出1。开始分析:
just方法源码如下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
先判空,最终返回RxJavaPlugins#onAssembly。
/**
* Calls the associated hook function.
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
注释写得很清楚,这是一个hook函数,hook翻译为钩子,其实就是预埋方法,对参数做一些额外的操作。此处预埋的是Function接口,就是我们平时Observable#map使用的参数,作用为把参数转化成另外一种类型的对象并返回。此处onObservableAssembly默认为null,也既返回参数本身。
那么,just方法返回的就是hook函数的入参也就是ObservableJust对象,我们来看一下。
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
代码很简单,构造函数把我们的字符串“1”传了进来。subscribeActual方法看名字像是subscribe方法的真实实现,我们先放一边。
到这里总结一下,Observable#just方法其实就是创建了一个Observable的子类,子类的名字叫ObservableJust,里面的实现相当简单。
接着看Observable#subscribe:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
代码很简单,就是对入参observer判空,然后hook函数转化ovserver(这里默认是不转化的),然后再判空observer,最后调用subscribeActual方法,继续:
protected abstract void subscribeActual(Observer<? super T> observer);
这是个抽象方法,实现在子类。这里的子类就是前面new出来的ObservableJust,再贴一下代码:
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
很明显这里的s就是我们传入的Observer对象。然后调用了他的onSubscribe方法。接着调用了sd#run:
@Override
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
调理很清晰,先后调用了observer#onNext,observer#onComplete。至此就结束了。细心的同学要问了,那onError在哪里调用的呢?这里并没有调用,也没有捕获异常,所以一旦你的onNext实现中抛出异常,程序将直接挂掉。
二、切换发射线程
Observable.just("1")
.subscribeOn(Schedulers.io())
.subscribe(observer)
没错,Observable#subscribeOn,就能切换发射线程,我们来看看具体的实现。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
代码是不是看着眼熟,没错,跟just方法差不多,这里也只是new了一个ObservableSubscribeOn并返回,猜名字这应该也是个Observable的子类。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
需要注意的是构造函数传入的2个参数。第一个source传的是this,也就是前面just方法生成的Observable,这里是典型的装饰者模式。第二个scheduler就是subscribeOn传入的参数。通过之前的分析,我们知道最终会走到subscribeActual方法。
首先调用observer#onSubscribe,注意,当前的线程还没有切换,而一般当前的线程都是主线程,所以说onSubscribe一般运行在主线程。
接下来,就是真正的线程切换操作了。
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
createWorker方法是个抽象方法,具体实现在我们传入的子类,去看下Schedulers#io生成的Scheduler。由于代码比较简单就不贴了,生成的是IoScheduler。那我们看一下IoScheduler的createWorker方法的实现。
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
我们来看一下他的schedule方法,这个是核心方法必须牢记,他又调用了threadWorker#scheduleActual:
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
ScheduledRunnable实际上既是runnable又是callable。看代码:
public final class ScheduledRunnable extends AtomicReferenceArray<Object>
implements Runnable, Callable<Object>, Disposable {
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;
this.lazySet(0, parent);
}
@Override
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
}
}
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();
return null;
}
}
我们先回到NewThreadWorker#scheduleActual方法,new了ScheduledDirectTask以后,用executor去执行他,那么executor是什么呢?
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (PURGE_ENABLED && exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
搞了半天,这就是一个系统提供的线程池,他能处理延时任务。这样一来就很好理解了,其实就是把ScheduledDirectTask这个任务交给线程池去执行。注意,现在开始线程已经切换了,接下来的代码都将执行在线程池。由于是调用submit方法,所以将会转到ScheduledDirectTask#call,回去看代码,实际又调用了run方法,run方法又调用了actual#run。actual是什么呢?actual是我们创建ScheduledDirectTask的时候在构造函数传入的。再往上追溯,是通过scheduleDirect方法传进来的。再往上追溯,EventLoopWorker#schedule通过参数传入的。再往上追溯,这里再贴一次代码:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
没错w.schedule(task, delay, unit);这里的w就是EventLoopWorker,原来在这里调用的,那么我们一直在找的runnable就是这个task了,他是DisposeTask。那么再来看下代码:
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
}
他的run方法实际调用了decoratedRun#run,decoratedRun又是通过构造函数传进来的,出去看。原来他就是schedule#scheduleDirect的第一个参数。那么再往前回溯,ohyeah,又回来了subscribeActual方法,再贴一次:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
抓到他了,这个runnable叫SubscribeTask。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
看下run方法的实现,这里的source最开始说过,是通过构造函数传入的,就是Observable#just生成的Observable的子类,然后就会去调用这个Observable的subscribeActual方法。前面我们分析过,这里面会去调用observer#onNext,这个observer是SubscribeOnObserver:
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
这里以onNext为例,实际调用了 actual#onNext,这个actual就是我们自己的Observer。好激动,红军会师终于调用到了我们自己的方法。
至此就结束了。细心的同学会发现一个问题,subscribeOn方法生成的Observable的核心方法subscribeActual除了把我们自己的代码切入线程池,其他什么具体的业务也没做。why?因为他就是切换线程的啊。
整个过程有点长,可能有些同学看得有点懵逼。我再大体梳理下流程。
其实就是调用了创建ObservableSubscribeOn的Observable的subscribeActual,本质是这样,然后为了切换线程,就把这个调用封装进了runnable,这当中层层封装了好几层runnable,这里用到了标准的装饰者模式。然后用线程池去执行。
由于篇幅原因,下一篇继续。








网友评论