美文网首页
Rx转换操作符

Rx转换操作符

作者: gczxbb | 来源:发表于2019-04-06 14:55 被阅读0次

map操作符

被观察者数据源泛型,当发射器的数据类型和观察者数据类型不同时,通过map操作符转换,可以将上游发射的类型转换成任意对象类型。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onComplete();
    }
}).map(new Function<Integer, String>() {
    @Override
    public String apply(Integer s) throws Exception {
        String newStr = s + "_";
        Log.d(TAG, "int apply s " + newStr);
        return newStr;
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String response) throws Exception {
        Log.d(TAG, "Observer : " + response);
    }
});

发射数据类型是Integer类,通过map操作符,将类型转换成String类。Function是一个类型转换接口,Function<T, R>,将T转换R,解决被观察者和观察者数据类型不匹配问题。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

返回一个被观察者ObservableMap,封装原始被观察者ObservableCreate和转换接口Function,调用ObservableMap的subscribe注册方法。

@Override
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

创建观察者MapObserver,封装自定义观察者和转换接口Function。source源即内部ObservableCreate,调用它的subscribe方法。

被观察者链

ObservableCreate的#subscribeActual方法,创建CreateEmitter数据发射器,通知观察者已经注册。
调用数据源source(ObservableOnSubscribe)的subscribe方法,将发射器暴漏给外部。外部通过发射器发射数据,如onNext方法。
发射器CreateEmitter持有观察者MapObserver,当onNext事件发射后,通知观察者MapObserver的onNext方法,传参发射的数据类型Integer类。

public void onNext(T t) {
    ...    
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

根据MapObserver内部转换接口Function,apply方法,将T类型转换成U类型,再调用自己定义观察者Observer的onNext方法,入参数据类型转换成String。
发射器onNext方法和观察者accept方法按照通知顺序执行。

Rx的map操作符

flatMap操作符

flatMap操作符和map类似,Function接口实现类型转换,转换的对象是一个被观察者ObservableSource。

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }
}).flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer s) throws Exception {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //将上游Integer类型数据,在新发射器中改造发射。
                String newStr = s + "_gc1";
                String newStr2 = s + "_gc2";
                Log.d(TAG, "int apply s " + newStr);
                Log.d(TAG, "int apply s " + newStr2);
                e.onNext(newStr);
                e.onNext(newStr2);
            }
        });
    }
}).subscribe(new Consumer<String>() {
    @Override
    public void accept(String response) throws Exception {
        Log.d(TAG, "Observer : " + response);
    }
});

将上游发射器每个Integer类型的数据转换成Observable类型,再由每个转换的被观察者发射目标类型数据。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
                                       boolean delayErrors, int maxConcurrency, int bufferSize) {
    ...
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

返回一个被观察者ObservableFlatMap。封装原始被观察者ObservableCreate和转换接口Function,调用ObservableFlatMap的subscribe注册方法。

public void subscribeActual(Observer<? super U> t) {
    ...
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

创建观察者MergeObserver,封装自定义观察者和转换接口Function,source源即内部ObservableCreate,调用它的subscribe方法。当发射器onNext方法发射时,调用发射器内部MergeObserver的onNext方法。

@Override
public void onNext(T t) {

    ObservableSource<? extends U> p;
    try {
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        return;
    }
    //调用的是新建ObservableSource的注册方法。
    subscribeInner(p);
}

通过Function接口方法,将Integer类型转换成ObservableSource类型,转换对象是一个被观察者,外部创建,ObservableCreate类型,将Integer类型的数据暴露在新被观察者的数据源发射器中,处理转换成新发射器支持String类型,subscribeInner方法,新被观察者订阅。

void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        if (p instanceof Callable) { 
            ...
        } else {
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner);
            }
            break;
        }
    }
}

调用Observable的subscribe方法,订阅InnerObserver观察者,外部调用发射器onNext方法,可以获取apply方法中上层发射的Integer数据,按照String类型,触发两个onNext方法再次发射数据,两次调用观察者InnerObserver的onNext方法,每次,调用它引用MergeObserver的onNext方法,最终,通知到外部观察者。

flatMap最初的onNext顺序,在Function转换成新Observable后,根据收到的数据,包装重新发射一批新数据。在观察者到的onNext顺序不一定是按照最初的onNext顺序调用的。
上面发送的1,2,3,在观察者中看到的不一定是1,2,3的排序,加一个延迟就能看到,即1_gc1,1_gc2,3_gc1,3_gc2,2_gc1,2_gc2。

flatMap操作符数据流程

总结

flatMap不保证数据发射流的通知顺序。
concatMap和flatMap功能相同,可以保证按照发射顺序通知。


任重而道远

相关文章

  • RxJava学习之转换型操作符

    RxJava学习之转换型操作符 标签(空格分隔): RX系列 转换型操作符 下面展示了可用于Observable发...

  • Rx转换操作符

    map操作符 被观察者数据源泛型,当发射器的数据类型和观察者数据类型不同时,通过map操作符转换,可以将上游发射的...

  • RxJS 使用手册

    手册 基础的 Rx 使用非常直观。 转换成 Observable 使用构造操作符能够轻松创造出一个 Observa...

  • Rx操作符

    RxJava的操作符太多了,详细的各个操作符的解释可以直接看文档ReactiveX/RxJava文档中文版,这里记...

  • rx操作符

    按功能进行分类 Operators By Category 1.创建Observable Create —以编程方...

  • Rxjava2的操作符 三

    概述:RX的操作符确实不少,本篇将介绍剩余的操作符。 5. 过滤操作符 5.1 filter()通过一定逻辑来过滤...

  • RxSwift文档六(单元测试)

    单元测试 测试自定义操作符 RxSwift用RxTest为所有操作符测试,位于Rx.xcworkspace项目内的...

  • Android架构师RX响应式编程——Rxjava实战项目教学

    RX定义 Rx是一个函数库,让开发者可以利用可观察序列和LINQ风格查询操作符来编写异步和基于事件的程序 Rx是微...

  • js学习

    变量的声明 赋值 isNaN(n) 数值转换 parseInt 转换字符串 bool转换 比较操作符 三元操作符 ...

  • RxJS系列教程(九) 操作异步流

    Rx,不管你是JS,Java,Python还是Swift,玩的就是操作符。每个操作符怎么用,官方文档写得不能再清楚...

网友评论

      本文标题:Rx转换操作符

      本文链接:https://www.haomeiwen.com/subject/jraliqtx.html