使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");
e.onComplete();
}
}).map(new Function<String, String>(){
@Override
public String apply(String s) throws Throwable {
return s+"<<<";
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.e("onSubscribe","onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.e("onNext","onNext:"+s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.e("onError","onError");
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete" );
}
});
发射一个字符串,经过map转化,然后发到观察者onNext输出数据
查看源码
1、Observable.create(..)
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
接受一个ObservableOnSubscribe参数,返回一个ObservableCreate对象,并把参数传了进去,RxJavaPlugins 装配这个new的对象,返回Observable,但实质是ObservableCreate
RxJavaPlugins是一个hook类,主要就是把你new出来的对象,使用的操作符转换成RxJava具体操作的一个类
2、.map(new Function<String, String>(){..})
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
因为create已经返回了一个Observable对象,直接使用该对象的map方法。同样也是装配了一个ObservableMap(this,mapper)
this就是ObservableCreate对象
mapper是需要转换的方法
这时候ObservableCreate和ObservableMap已经连起来了
3、.subscribeOn(Schedulers.io())
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
}
装配返回一个Observable对象实质是ObservableSubscribeOn(this, scheduler)
this是ObservableMap对象
scheduler是Schedulers.io()线程调度对象,订阅放到了io线程 或者是newThread、single...
这时候 上游的ObservableMap已经和ObservableSubscribeOn连起来了,并且指定了订阅的地方是在io线程
4、.observeOn(AndroidSchedulers.mainThread())
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
}
装配返回Observable对象,实质是ObservableObserveOn
this传入的是ObservableSubscribeOn
scheduler是Android的主线程
这时候ObservableSubscribeOn和ObservableObserveOn连起来了
5、.subscribe(new Observer<String>(){onSubscribe、onNext...等方法})
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer); //关键方法
} catch (NullPointerException e) {
} catch (Throwable e) {
}
}
主要就是subscribeActual(observer);当前对象是ObservableObserveOn,也就是ObservableObserveOn调用了subscribeActual(observer) 并把观察者的对象传了进来,也就是 onSubscribe、onNext...等方法 的对象
到这里创建、订阅、关联已经基本完成
前面的类名比较乱,什么ObservableObserveOn,又是什么ObservableSubscribeOn,还有Observable,后面还有ObserveOnObserver等等 这些我第一次看也比较晕,绕了半天,后来想想还是根据英文意思来看
比如Observable 中文 可观察的 也就是 被观察者
Observe 中文 观察的
ObservableObserveOn 中文 被观察者 观察 在...
ObservableSubscribeOn 被观察者 订阅 在...
接下来是关键
订阅
理一下 ObservableCreate -> ObservableMap -> ObservableSubscribeOn -> ObservableObserveOn -> 执行subscribeActual()
在上面的方法中 执行subscribeActual()对象是最后一个ObservableObserveOn
ObservableObserveOn.subscribeActual()
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
第一个判断是判断scheduler对象是否是TrampolineScheduler,这个类里面注释说 在当前线程上工作,内容放入队列,但不会立即执行,暂时忽略
下面创建了一个 Worker ,new了一个对象ObserveOnObserver,这个静态类,继承了Observer<T>(Observer<T>是为了下游对象)和Runnable()(run方法中有个判断默认是false,暂时不知道第一个方法是做什么的暂时忽略,网上说与backpressure有关 不是背后的压力,是后面的压力)。
ObserveOnObserver这个类里面有upstream,有queue,有worker,是用来最后执行,你定义的Observer里面的方法
source订阅了ObserveOnObserver,这个source对象是ObservableSubscribeOn,执行了他的subscribeActual()
ObservableSubscribeOn.subscribeActual()
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这个类是 被观察者订阅在 定义的是io线程
首先new了一个对象SubscribeOnObserver 传入了 observer 这个是具体实现最后发射的对象
接着就调用了observer的onSubscribe(parent)方法
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
//去掉其他代码
}
queue = new SpscLinkedArrayQueue<>(bufferSize);
downstream.onSubscribe(this);
}
}
这里做一个保留,第二个if判断if (d instanceof QueueDisposable) 判断是否是 一次性队列
接着创建了一个SpscLinkedArrayQueue这个类里面有一个 AtomicReferenceArray 原子的引用集合,保证发射出去的数据是原子操作不可被打断。
最后调用了下游的方法,回调到了我们定义的onSubscribe方法中打印log
回到前面 最后一行
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
分部来看
开始new了一个SubscribeTask这一个线程,并把parent传了进去,这个parent可以理解为当前实际操作对象 握着 下游 实例的对象
既然是个线程肯定有run方法
@Override
public void run() {
source.subscribe(parent);
}
这个source对象 实际是前一个结点的对象也就是map 也就是ObservableMap,这里只定义了还没使用
接着上面的Task交给了scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
createWorker()是IoScheduler中初始化的也就是指定 被观察者 在那个线程执行操作Schedulers.io()
public Worker createWorker() {
return new EventLoopWorker(pool.get()); //这里的get是返回pool的对象
}
//EventLoopWorker的构造函数
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get(); //这里的get是返回pool里一个ThreadWorker对象
}
final AtomicReference<CachedWorkerPool> pool;
static final class CachedWorkerPool implements Runnable {
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
}
这里有 一个pool.get()对象,这是个原子性的缓存工作池
CachedWorkerPool是个static的线程类,里面有个队列的数据结构,pool.get();是返回一个ThreadWorker对象,循环找,找到就返回,找不到就new一个然后添加到pool里再返回,这个类也会定期清理里面的worker
回到scheduleDirect(),第三行 new DisposeTask 一次性的任务 并传入 线程 和 “工作者”对象 。DisposeTask也是一个线程里面的run方法,执行了传入的线程。
w.schedule(task, delay, unit);在IoScheduler中实现
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime,@NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
ThreadWorker继承NewThreadWorker 打开scheduleActual()
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
把线程和一个一次性的工作对象放入ScheduledRunnable,在塞入线程池executor.schedule(),前面的判断也很明显是否要延迟。
再回到前面scheduler.scheduleDirect返回一个 DisposeTask,再回到前面ObservableSubscribeOn.subscribeActual()
//这里复制是方法最开始的代码
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
parent.setDisposable( 把DisposeTask设置了一下 )
DisposableHelper.setOnce(this, d);
public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
Objects.requireNonNull(d, "d is null");
if (!field.compareAndSet(null, d)) {
d.dispose(); //主要是这个方法
if (field.get() != DISPOSED) {
reportDisposableSet();
}
return false;
}
return true;
}
这个d.dispose() d对象是CompositeDisposable,这个dispose()里面主要就是把一个boolean disposed取反,主要为了 和之前绑定的线程 只能执行一次操作
到这里基本上前期的准备工作已经做好
回顾一下,开始create() 创建了ObservableCreate -> ObservableMap -> ObservableSubscribeOn -> ObservableObserveOn 这些都是连起来的,并且都有一个source对象,source指向的上游对象
最后subscribe(),执行ObservableObserveOn的subscribeActual()方法, 通过对source的subscribe(),在ObservableSubscribeOn中定义了Task线程并放到线程池中
,等待执行
SubscribeTask执行
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这个类是写在ObservableSubscribeOn中,所以source是ObservableMap对象subscribe() 实际执行还是subscribeActual(..)
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这个source 实际对象是 ObservableCreate,调用了subscribe传入了new MapObserver
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这里创建了一个发射器对象CreateEmitter,传入了一个observer,这个observer就是上面传过来的MapObserver
下面这个MapObserver订阅了这个发射器对象,也就是说这个发射器对象交给了下游的MapObserver。
再下面的source就是MainActivity(这个名字无所谓主要是后面的对象 具体是在什么地方new的)的ObservableOnSubscribe。
开始source.subscribe(parent)回调到了我们实例化ObservableOnSubscribe中,重写的方法,也就是这里
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("hello world");//这个e是之前定义的发射器对象CreateEmitter
e.onComplete();
}
})
发射器onNext("hello world")进入CreateEmitter的onNext()
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
先是一个判空,然后一次性检查,最后observer.onNext,这个observer是初始化传入的,之前说了是MapObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
第二个判断看变量注解是 保持上游已建立融合模式 ,这个也不是非常明白,看代码也明白这里肯定是false
下一步 mapper.apply(t),这个mapper是一个Function对象,初始化的时候被赋值的,也就是创建的时候,也就是我们需要一个map操作符的时候,创建赋值的
这个apply会回调到map操作符 里面的apply(方法)。map是一个数据转换的,比如发射了int用map转换成String
转换后调用了下游的onNext,也就是下游的具体操作对象,SubscribeOnObserver
@Override
public void onNext(T t) {
downstream.onNext(t);
}
这里什么也没干 直接调用下游onNext,ObserveOnObserver
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
onNext 第二个判断不知道是我AS抽风了还是什么原因 Alt+单击 看到的一直是false但是真正的确实 true,这里面是把值 原子性的 放到队列中
继续schedule()
这里判断保证只做一次。worker.schedule(this) 传入了一个线程也就是本身this
这里的worker是HandlerWorker是AndroidSchedulers.mainThread(),我们定义在android的主线程执行内容。
我们知道线程执行是执行run方法,直接跳到run(),暂时忽略Android里面handler发送消息 handler.sendMessageDelayed(message, unit.toMillis(delay));只要知道run是在主线程执行的就ok
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里的if 也是和融合相关,暂时忽略
drainNormal()
关键的来了
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
乍一看。。两无限循环。。一看就好麻烦。。。
看到开始声明了两个变量一个q和a,q是队列,a是下游对象,再看到a.onNext()基本胜利一半了,里面内容无非就是从队列中获取数据,然后发射到下游去。
检查是否中止,不中止进入下一个循环,这个循环就比较简单了,从队列中取一个值,然后判空,不为空就发射,这个a对象就是我们定义的Observer中的onNext方法
再后面是一个原子性操作,这个我不确定作用具体是什么,忙猜是防止漏掉数据,为外面的循环多一层跳出的方式。希望有人能解决我的疑惑。我也不知道为什么这里会有for的无限循环。。这可读性太差了(应该是我太菜了🤣)
看一下他是怎么检查中止的
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (disposed) {
queue.clear();
return true;
}
if (d) {
Throwable e = error;
if (delayError) {
if (empty) {
disposed = true;
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
worker.dispose();
return true;
}
} else {
if (e != null) {
disposed = true;
queue.clear();
a.onError(e);
worker.dispose();
return true;
} else
if (empty) {
disposed = true;
a.onComplete();
worker.dispose();
return true;
}
}
}
return false;
}
判断是否是disposed,接着d是个boolean是 传进来的done,影响这个done是有在onError()或onComplete()的时候
如果 手动 或者try cratch 抛出了异常 这个delayError会为true 这里会调用a.onError(e);否则调用结束
这里结束的时候有个 worker.dispose(); 在最开始初始化的时候 一顿封装后 把DisposeTask交给了worker对象
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);
}
这里官方的注释已经写明了 释放pool,将工作线程放回pool并刷新过期时间,还调用了dispose()处理一些一次性的标记
到这里基本一整个的流程基本上结束了
总结
个人觉着这个RxJava就是一个个的结点+链式调用,创建一个方法,它帮你用一个实际操作的类包起来,每个结点都有一个源结点,这个源结点指向的前一个对象,最后 subscribe 订阅时候 开始执行,一步步调用 源 结点,并一步步在结点中初始化,推到最前端,调用定义的方法,e.onNext() 继续调用下游的onNext() 最后调用到Observer的观察者方法,从而形成一个完整的 链。
我画了一个过程的图
最开始使用的结点图
create() 创建了
ObservableCreate -> ObservableMap -> ObservableSubscribeOn -> ObservableObserveOn -> Observer 每个对象都像一个结点,结点之间进行连接,当如果把map这个结点去掉,或者增加其他结点也是没问题的
感觉有些不足比如java的一些原子性对象的使用,准备来全面学习一下
如果内容有错误请及时联系我,我并加以改正









网友评论