美文网首页
Rxjava2-create

Rxjava2-create

作者: CODERLIHAO | 来源:发表于2018-10-09 16:29 被阅读0次

所有代码的演示都在RxJava2.1.13版本上进行的

     Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

看看是如何运行的

@CheckReturnValue
  @SchedulerSupport(SchedulerSupport.NONE)
  public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
      ObjectHelper.requireNonNull(source, "source is null");
      return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
  }

source是ObservableOnSubscribe类型,里面只有一个方法

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. 
                    Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null.
              Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }
        ...

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}

看的出,先执行

 CreateEmitter<T> parent = new CreateEmitter<T>(observer);
 observer.onSubscribe(parent);

onSubscribe的执行先通知观察者调用public void onSubscribe(Disposable d)
后面才会有emitter的next()和onComplete()
emitter调用一次next(),observer就会收到一次onNext(),一直到emitter调用onComplete()
绑定关系才会解除

相关文章

  • Rxjava2-create

    所有代码的演示都在RxJava2.1.13版本上进行的 看看是如何运行的 source是ObservableOnS...

网友评论

      本文标题:Rxjava2-create

      本文链接:https://www.haomeiwen.com/subject/pmzeaftx.html