转载请以链接形式标明出处:
本文出自:103style的博客
本文基于 RxJava 2.x 版本
create操作符例子:
Observable
.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
}
})
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
首先我们看create 方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins 类的 onAssembly 方法:
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
在源码中查看引用可知 onObservableAssembly 只有在测试的时候才不为 null。
所以Observable.create(ObservableOnSubscribe<T> source)实际上就是返回了 ObservableCreate对象
ObservableCreate 类,可以看到 ObservableCreate 是 Observable 的子类,并实现了父类的 subscribeActual 方法。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {...}
...
}
然后我们看subscribe方法: 实际上是调用了 Observable 的抽象方法 subscribeActual(observer);
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
又因为 create操作符返回的 ObservableCreate 是 Observable 的子类,
所以实际上调用的是ObservableCreate 的 subscribeActual(observer);
ObservableCreate 的 subscribeActual(observer)方法:
- 首先创建了
CreateEmitter对象, - 然后调用了
subscribe方法传进来的Observer对象的onSubscribe()方法 - 然后调用了
create操作符 传进来的ObservableOnSubscribe对象的subscribe(ObservableEmitter<T> emitter)方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
因为 CreateEmitter 类实现了 ObservableEmitter<T> 和 Disposable 接口,
所以我们可以在 create 操作符 传进来的 ObservableOnSubscribe 对象的 subscribe(ObservableEmitter<T> emitter)方法里调用onNext、onError、onComplete等方法。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
...
}
ObservableEmitter 接口:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Disposable 接口:
public interface Disposable {
void dispose();
boolean isDisposed();
}
因为CreateEmitter 又重写了onNext、onError、onComplete等方法。
所以 create 操作符 传进来的 ObservableOnSubscribe 对象的 subscribe(ObservableEmitter<T> emitter)方法里调用onNext、onError、onComplete等方法实际上调用了 CreateEmitter 的 onNext、onError、onComplete等方法。
CreateEmitter 的 onNext、onError、onComplete方法:
- 对
onNext、onError传进来的值做了空判断。 - 如果
!isDisposed()则继续执行observer对象的onNext、onError、onComplete等方法。
(observer对象为create操作符 之后的subscribe()方法传进来的Observer<T>对象) - 并在
onComplete和onError方法最后执行dispose()方法。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("..."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("...");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
接下来我们来看 CreateEmitter 的 dispose() 和 isDisposed()方法
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
继续看 get()方法,看下面代码可知 get() 返回的是一个 Disposable 对象
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {...}
public class AtomicReference<V> implements Serializable {
private volatile V value;
public AtomicReference(V var1) {
this.value = var1;
}
public AtomicReference() {
}
public final V get() {
return this.value;
}
继续看 DisposableHelper 的 isDisposed(Disposable d) 和 dispose(AtomicReference<Disposable> field)方法
-
isDisposed(Disposable d)则是判断d是否和枚举值DISPOSED相等。 -
dispose(AtomicReference<Disposable> field)方法即是 将CreateEmitter的isDisposed()中调用get()获取的对象赋值为DisposableHelper的枚举值DISPOSED。
所以调用dispose(AtomicReference<Disposable> field)方法后,则理论上isDisposed(Disposable d)即返回true。
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
...
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
...
}
DisposableHelper 类中 dispose(AtomicReference<Disposable> field) ,
当 field.getAndSet(d);之后,如果 Disposable对象的值还不等于 DISPOSED ,
则会调用current.dispose();
current 为 以下例子的 disposableObserver对象。
DisposableObserver disposableObserver;
private void test() {
disposableObserver = Observable
.create(new ObservableOnSubscribe<Object>() {...})
.subscribeWith(new DisposableObserver<Object>() {...});
}
create 操作符 返回的是 ObservableCreate,因为 ObservableCreate未重写 subscribeWith 方法,所以调用的是 Observable 的 subscribeWith方法:
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
所以我们知到 disposableObserver 即为subscribeWith 传进来的 DisposableObserver 对象。
DisposableObserver类:
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();
@Override
public final void dispose() {
DisposableHelper.dispose(upstream);
}
}
我们可以看到 dispose()方法继续调用了DisposableHelper.dispose(AtomicReference<Disposable> field);
所以我们是否可以得出结论,
dispose(AtomicReference<Disposable> field)在设置值为DISPOSED 失败之后会一直重复调用直到设置成功为止?






网友评论