基本框架
- Observable (可观察者,即被观察者)
- Observer (观察者)
- subscribe (订阅) 通过该方法,将 Observable 与 Observer 关联起来
- 事件 (包括 onNext,onComplete,onError 等事件)
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
(1)创建Observable流程源码分析
create方法源码
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
返回值是Observable,参数是ObservableOnSubscribe
ObservableOnSubscribe源码
@FunctionalInterface
public interface ObservableOnSubscribe<@NonNull T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Throwable;
}
ObservableOnSubscribe是一个接口,里面就一个我们实现的那个方法。该方法的参数是ObservableEmitter
ObservableEmitter源码
public interface ObservableEmitter<@NonNull T> extends Emitter<T> {
//code...
}
ObservableEmitter也是一个接口。它继承了 Emitter<T> 接口
Emitter<T>源码
public interface Emitter<@NonNull T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
定义了 我们在ObservableOnSubscribe中实现subscribe()方法里最常用的三个方法
create()源码方法里就一句话return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
RxJavaPlugins.onAssembly方法源码
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
RxJavaPlugins.onAssembly中返回source,即传入的对象,也就是new ObservableCreate<T>(source)
create需要返回的是Observable,而我现在有的是ObservableOnSubscribe对象,ObservableCreate将ObservableOnSubscribe适配成Observable
至此,创建流程结束,我们得到了Observable<T>对象,其实就是ObservableCreate<T>
(2)subscribe订阅流程分析
- subscribe源码
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
//code...
//真正的订阅处
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
- 将第一节提到的ObservableCreate里的subscribeActual()方法拿出来看看
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建CreateEmitter,也是一个适配器
CreateEmitter<T> parent = new CreateEmitter<>(observer);
//onSubscribe()参数是Disposable ,所以CreateEmitter可以将Observer->Disposable 。还有一点要注意的是onSubscribe()是在我们执行subscribe()这句代码的那个线程回调的,并不受线程调度影响。
observer.onSubscribe(parent);
try {
//将ObservableOnSubscribe源头与CreateEmitter联系起来
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
- CreateEmitter
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//code...
//如果没有被dispose,会调用Observer的onNext()方法
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
}
//如果没有被dispose,会调用Observer的onError()方法
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose(); //一定会自动dispose()
}
return true;
}
return false;
}
@Override
public void onComplete() {
//如果没有被dispose,会调用Observer的onComplete()方法
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();// //一定会自动dispose()
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
@Override
public String toString() {
return String.format("%s{%s}", getClass().getSimpleName(), super.toString());
}
Observer源码
public interface Observer<@NonNull T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
Observer是一个接口,里面就四个方法,我们在开头的例子中已经全部实现
- Observable和Observer的关系没有被dispose,才会回调Observer的onXXXX()方法
- Observer的onComplete()和onError() 互斥只能执行一次,因为CreateEmitter在回调他们两中任意一个后,都会自动dispose()
- Observable和Observer关联时(订阅时),Observable才会开始发送数据
- 在subscribeActual()方法中,源头和终点关联起来。
主要用到的设计模式:
- 适配器模式
- 观察者模式
- 装饰者模式
网友评论