rxjava线程切换也是一大亮点,还是以第一篇的下载图片并展示的例子来分析:
Observable.just("https://img1.sycdn.imooc.com/5b8ab353000181b204000284.jpg")
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(@NonNull String s) throws Exception {
//1.下载图片
URL url = new URL(s);
URLConnection urlConnection = url.openConnection();
InputStream inputStream = urlConnection.getInputStream();
return BitmapFactory.decodeStream(inputStream);
}
})
.subscribeOn(Schedulers.io())//io线程下载
.observeOn(AndroidSchedulers.mainThread())//android主线程接收图片bitmap
.subscribe(new Consumer<Bitmap>() {
@Override
public void accept(Bitmap bitmap) throws Exception {
//2.设置图片
ivImage.setImageBitmap(bitmap);
}
});
根据上一篇map()的经验,subscribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread())应该也是封装成了另外的一个形式的Observable,先来看看subscribeOn(Schedulers.io()),这个方法是指定被观察者运行的线程:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//又是这个方法,不用再说了吧,最终返回的还是入参ObservableSubscribeOn对象
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
看看ObservableSubscribeOn类:
//也是继承AbstractObservableWithUpstream,从命名规则也知道了这也是个observable,而且这种类跟
//之前的ObservableMap结构很像,只是换了个成员变量scheduler
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
// 也是新建了一个内部类对象,SubscribeOnObserver,入参是后面的Observer,
//SubscribeOnObserver一看就知道也是一个观察者
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//调用了Observer的onSubscribe()方法
s.onSubscribe(parent);
//SubscribeTask 是个Runnable,后面有贴出,run()方法就是执行了source(源observable)的subscribe()
//方法,就是在这里把之前的observable都放到了run里面执行,至于scheduler 就是Schedulers.io()
//创造出来的线程了,这里不深究,这里scheduleDirect()方法就会去执行runnable
//setDisposable()方法会在线程执行完的时候关掉它
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//静态内部类,实现了Observer
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
//构造函数,初始化了内部的一个Observer
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
//这个onNext()方法实际调用的是下一级observer的onNext()
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
//最终调用了这个方法
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//源observable执行subscribe()后会触发parent.onNext()
source.subscribe(parent);
}
}
}
总结下来,rxjava指定被观察者在特定线程执行的核心就是把上层的observable放在指定的线程执行subscribe()方法。
下面来看看指定下游观察者执行的线程,也就是observeOn(AndroidSchedulers.mainThread())方法:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//最终还是调用了onAssembly(),返回的依旧是入参ObservableObserveOn(可观察者被观察在xxx线程)
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
继续看ObservableObserveOn类:
//传入了主线程的Scheduler,也是被封装成了一个 Observable
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
//当他调用subscribe()方法的时候,执行这里
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//走这里
Scheduler.Worker w = scheduler.createWorker();
//调用上游的Observable的subscribe()方法,observer和Scheduler.Worker(线程)被封装
//在ObserveOnObserver里,这里按顺序第一是看ObserveOnObserver的onSubscribe()方法
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//静态内部类ObserveOnObserver,实现了Observer和Runnable
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//此案例的上个Disposable 不属于QueueDisposable,不走这条分支
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
//走这里
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//调用传进来的observer的方法,接下来执行onNext()
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//执行schedule()
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
@Override
public boolean isDisposed() {
return cancelled;
}
void schedule() {
if (getAndIncrement() == 0) {
//worker是传进来的线程,最终执行了自身,因为它自己就是Runnable,看看他的run()方法
worker.schedule(this);
}
}
//
@Override
public void run() {
//outputFused背压相关,默认false,走drainNormal()
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//主要的代码在这里,这个a是传进来的observer,最终它的onNext()执行在了指定的线程
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
...
}
总结下来,rxjava指定观察者在特定线程执行的核心就是把下层的observer放在指定的线程执行onNext()方法。依然是用Observable子类去包装实现链式响应。一直要记得rxjava源码的那种套娃结构。
下篇分析分析rxjava封装的线程Scheduler相关。









网友评论