美文网首页
RxJava的一些理解

RxJava的一些理解

作者: 北雁南飞_8854 | 来源:发表于2017-11-19 21:35 被阅读0次

一、Observable的创建

//Observable的创建, 这里的Object类可以替换为任意类型。
Observable<Object> observable =
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> e) throws Exception {
                e.onNext(/*Object*/value1);
                e.onNext(/*Object*/value2);
                ...
                e.onComplete();
            }
        });

//Observer订阅事件
observable.subscribe(new Observer<Object>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Object o) {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

1.1 被观察者(Observable)接口及实现类:

(1) 接口ObservableSource

public interface ObservableSource<T> {
    void subscribe(@NonNull Observer<? super T> observer);
}

(2) 接口ObservableSource的抽象实现类Observable

public abstract class Observable<T> implements ObservableSource<T> {

    /* 实现ObservableSource接口。*/
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
        }
    }

    protected abstract void subscribeActual(Observer<? super T> observer);

    /* 静态方法,用于创建Observable实例。*/
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

(3) Observable的具体实现类ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable>
            implements ObservableEmitter<T>, Disposable {
        private static final long serialVersionUID = -3434801548987643227L;
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) { this.observer = observer;}
         /* Emitter接口的实现。*/
        @Override
        public void onNext(T t) {
            ...
            observer.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            ...
            observer.onError(t);
        }
        @Override
        public void onComplete() {
            ...
            observer.onComplete();
        }

        /* Disposable接口的实现。*/
        @Override
        public void dispose() { ...}
        @Override
        public boolean isDisposed() {...}
    }
}    

1.2 被观察者(Observable)和观察者(Observer)之间的桥梁

被观察者(Observable)持有ObservableOnSubscribe实例的引用, 参数ObservableEmitter持有观察者(Observer)实例的引用,这样通过ObservableOnSubscribe的subscribe(ObservableEmitter<T> e)方法,Observable和Observer之间就建立了联系。
(1) 接口ObservableOnSubscribe

/**
 * A functional interface that has a {@code subscribe()} method that receives
 * an instance of an {@link ObservableEmitter} instance that allows pushing
 * events in a cancellation-safe manner.
 *
 * @param <T> the value type pushed
 */
public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

二、总结

RxJava的使用可以概括为:

  1. 通过Observable.create(ObservableOnSubscribe<T> source)创建一个Observable实例,实际类型为ObservableCreate, 它持有对source的引用;
  2. 通过已创建的Observable实例,调用subscribe(Observer<? super T> observer)方法,在该方法中完成的主要工作:
  • 使用observer作参数构建CreateEmitter类的实例, 记为parent,parent持有对observer的引用;
  • 调用observer.onSubscribe(parent);
  • 通过ObservableCreate类的source成员, 调用source.subscribe(parent),该方法由用户自定义,一般包含:
    parent.onNext(Object);
    parent.onError(Throwable);
    parent.onComplete();
    最终分别会调用observer的onNext(Object),onComplete()和onError(Throwable)方法。

三、操作符

concatMap

@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

说明:
对源Publisher发射的每一个元素应用一个转换函数(transformation function),来生成一个新的Flowable实例,然后将新的Flowable实例按原序发给订阅者。
举例:

private void timeoutWithRetry() {
        Flowable
                .just("red", "dark", "yellow", "green", "black", "blue")
                .concatMap(new Function<String, Publisher<? extends String>>() {
                    @Override
                    public Publisher<? extends String> apply(String color) throws Exception {
                        Log.d(TAG, "applying " + color + ", thread: " + Thread.currentThread().getName());
                        return Flowable.just(color)
                                .delay(color.length(), TimeUnit.SECONDS)
                                .timeout(5, TimeUnit.SECONDS) //超时时间, 如果超时、且没有添加重试,则抛出TimeoutException.
                                .retry(2); //Publisher在出现onError()时重试的次数, 重试之后再决定是否调用onError().
                    }
                })
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, String.format("Received {%s} delaying for {%d}, thread: %s", s, s.length(), Thread.currentThread().getName()));
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.e(TAG, "accept exception:  ", throwable);
                    }
                });
    }


四、RxJava的线程控制

  1. 产生事件的代码(ObservableOnSubscribe接口)和doOnSubscribe()分别在它们后面最近的一个subscribeOn() 指定的Scheduler上执行,如果后面没有找到subscribeOn(),则在subscribe()的调用者所在的线程执行;
  2. 普通操作(map、filter等)和消费事件的代码(Consumer、Observer接口)在它们前面最近的一个observeOn指定的Scheduler上执行;如果它们前面没有observeOn了,那么它们就在整个调用链的第一个subscribeOn指定的Scheduler上执行;如果没找到subscribeOn调用,则在subscribe()的调用者所在的线程执行。

相关文章

网友评论

      本文标题:RxJava的一些理解

      本文链接:https://www.haomeiwen.com/subject/wvppvxtx.html