美文网首页Android-RxJavaAndroid-Rxjava&retrofit&daggerAndroid 入门进阶
这可能是最好的RxJava 2.x 入门教程(四)

这可能是最好的RxJava 2.x 入门教程(四)

作者: nanchen2251 | 来源:发表于2017-06-26 18:02 被阅读10749次

这可能是最好的 RxJava 2.x 入门教程系列专栏
文章链接:
这可能是最好的 RxJava 2.x 入门教程(完结版)【重磅推出】
这可能是最好的 RxJava 2.x 入门教程(一)
这可能是最好的 RxJava 2.x 入门教程(二)
这可能是最好的 RxJava 2.x 入门教程(三)
这可能是最好的 RxJava 2.x 入门教程(四)
这可能是最好的 RxJava 2.x 入门教程(五)
GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples
为了满足大家的饥渴难耐,GitHub 将同步更新代码,主要包含基本的代码封装,RxJava 2.x 所有操作符应用场景介绍和实际应用场景,后期除了 RxJava 可能还会增添其他东西,总之,GitHub 上的 Demo 专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

前言

最近很多小伙伴私信我,说自己很懊恼,对于 RxJava 2.x 系列一看就能明白,但自己写却又写不出来。如果 LZ 能放上实战情景教程就最好不过了。也是哈,单讲我们的操作符,也让我们的教程不温不火,但 LZ 自己选择的路,那跪着也要走完呀。所以,也就让我可怜的小伙伴们忍忍了,操作符马上就讲完了。

正题

Single

顾名思义,Single 只会接收一个参数,而 SingleObserver 只会调用 onError() 或者 onSuccess()

Single.just(new Random().nextInt())
                .subscribe(new SingleObserver<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {

                    }

                    @Override
                    public void onSuccess(@NonNull Integer integer) {
                        mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
                        Log.e(TAG, "single : onSuccess : "+integer+"\n" );
                    }

                    @Override
                    public void onError(@NonNull Throwable e) {
                        mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
                        Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
                    }
                });

输出:


distinct

去重操作符,简单的作用就是去重。


Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("distinct : " + integer + "\n");
                        Log.e(TAG, "distinct : " + integer + "\n");
                    }
                });

输出:



很明显,发射器发送的事件,在接收的时候被去重了。

debounce

去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。


Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("debounce :" + integer + "\n");
                        Log.e(TAG,"debounce :" + integer + "\n");
                    }
                });

输出:



代码很清晰,去除发送间隔时间小于 500 毫秒的发射事件,所以 1 和 3 被去掉了。

defer

简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> call() throws Exception {
                return Observable.just(1, 2, 3);
            }
        });


        observable.subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                mRxOperatorsText.append("defer : " + integer + "\n");
                Log.e(TAG, "defer : " + integer + "\n");
            }

            @Override
            public void onError(@NonNull Throwable e) {
                mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
                Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                mRxOperatorsText.append("defer : onComplete\n");
                Log.e(TAG, "defer : onComplete\n");
            }
        });

输出:


last

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

Observable.just(1, 2, 3)
                .last(4)
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("last : " + integer + "\n");
                        Log.e(TAG, "last : " + integer + "\n");
                    }
                });

输出:


merge

merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("merge :" + integer + "\n");
                        Log.e(TAG, "accept: merge :" + integer + "\n" );
                    }
                });

输出:


reduce

reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

Observable.just(1, 2, 3)
               .reduce(new BiFunction<Integer, Integer, Integer>() {
                   @Override
                   public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                       return integer + integer2;
                   }
               }).subscribe(new Consumer<Integer>() {
           @Override
           public void accept(@NonNull Integer integer) throws Exception {
               mRxOperatorsText.append("reduce : " + integer + "\n");
               Log.e(TAG, "accept: reduce : " + integer + "\n");
           }
       });

输出:



可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

scan

scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

Observable.just(1, 2, 3)
                .scan(new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
                        return integer + integer2;
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                mRxOperatorsText.append("scan " + integer + "\n");
                Log.e(TAG, "accept: scan " + integer + "\n");
            }
        });

输出:



看日志,没毛病。

window

按照实际划分窗口,将数据发送给不同的 Observable

mRxOperatorsText.append("window\n");
       Log.e(TAG, "window\n");
       Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
               .take(15) // 最多接收15个
               .window(3, TimeUnit.SECONDS)
               .subscribeOn(Schedulers.io())
               .observeOn(AndroidSchedulers.mainThread())
               .subscribe(new Consumer<Observable<Long>>() {
                   @Override
                   public void accept(@NonNull Observable<Long> longObservable) throws Exception {
                       mRxOperatorsText.append("Sub Divide begin...\n");
                       Log.e(TAG, "Sub Divide begin...\n");
                       longObservable.subscribeOn(Schedulers.io())
                               .observeOn(AndroidSchedulers.mainThread())
                               .subscribe(new Consumer<Long>() {
                                   @Override
                                   public void accept(@NonNull Long aLong) throws Exception {
                                       mRxOperatorsText.append("Next:" + aLong + "\n");
                                       Log.e(TAG, "Next:" + aLong + "\n");
                                   }
                               });
                   }
               });

输出:


写在最后

至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符,不是说它们不重要,而是 LZ 也要考虑大家的情况,接下来就会根据实际应用场景来对 RxJava 2.x 发起冲锋。如果想看更多的数据,请移步 GitHub:https://github.com/nanchen2251/RxJava2Examples

做不完的开源,写不完的矫情。欢迎扫描下方二维码或者公众号搜索「nanchen」关注我的微信公众号,目前多运营 Android ,尽自己所能为你提升。如果你喜欢,为我点赞分享吧~


nanchen

相关文章

网友评论

  • 風的記憶:写的很不错,很适合入门看
  • 达则兼济天下:debounce补充: 两个相邻数据发射的时间间隔决定了前一个数据是否会被丢弃,然而demo代码中5是最后一个数据,所以后面设置的510ms并不影响它是否被丢弃,也仅仅起一个线程等待时间的作用吧。代码分析如下:
    emitter.onNext(1); // skip 先收到一个1
    Thread.sleep(400);
    emitter.onNext(2); // deliver 过了400ms收到一个2,小于设定时间500ms,把前一个丢掉,现在只有一个2
    Thread.sleep(505);
    emitter.onNext(3); // skip 过了505ms收到一个3,符合设定时间,保存,现在是2、3
    Thread.sleep(100);
    emitter.onNext(4); // deliver 过了100ms收到一个4,小于设定时间,把前一个丢掉,丢掉3,保存4,现在是2、4
    Thread.sleep(605);
    emitter.onNext(5); // deliver 过了605ms收到一个5,符合设定时间,保存,现在是2、4、5
    Thread.sleep(510);
    emitter.onComplete();
    V_Ballack:感谢解释
    达则兼济天下:@ezfantasy 😄
    ezfantasy:这个解释看得懂,博主贴出来的反而让我理解答案应该是1,3,5
  • luo2016:写的挺好的,清晰易懂
  • 追风筝的boy:我好懊恼,我突然变成了一个爱哭鼻子的傻瓜:joy:
    追风筝的boy:@nanchen2251 就是看到你前言里面的第一句话,很多同学私信给你,说自己很懊恼。看到这个词,突然就想起了《夏洛特烦恼》里面这句台词。没什么特别的含义:sweat:
    nanchen2251:@追风筝的boy 什么鬼
  • 30035123f1bc:嗯 这篇有点急了
  • thinkerzhangyan:debounce v发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射。
  • leilifengxingmw:文章感觉文笔叙述有的地方不是很详细,但还是可以的:smile:
  • b09dbea7de6a:defer是延迟订阅的意思。在订阅的时候,执行ObservableOnScubsribe call方法里面的代码。
  • ImTudou:关于last的建议:
    通过Observable只发出最后一个项目(或符合某些条件的最后一个项目)

    重点在发出(emit)一词,而不是取出。
    nanchen2251:这个可以。
  • 6f9626cdc6c5:scan操作符的例子,为什么会把1给打印出来呢?
    Prom_Jual:因为 1和另外一个没有赋值的(默认0)相加,所以sum=1
  • 间歇性丶神经病患者:楼主超厉害
    nanchen2251:@间歇性丶神经病患者 0 0.
  • SZhua:不孬,讲的很好
    nanchen2251:@SZhua 谢谢认了
  • 黑白咖:签到
  • Alsomail:last好像里面传任何数值,都是返回最后一个item啊,而且里面只能写入一个int类型参数,那它的参数有什么用呢
  • 3a1451eb7fb4:defer到底是怎么回事,感觉没讲清楚啊,博主
    hubery_:感觉其实就懒加载,只有具体的使用时才会创建
    seraphzxz:defer 的意思是,直到有订阅,才会创建 Observable ,具有延时的效果。比如:int a = 1;

    Observable.just(a); 此时 just 中的 a 已经初始化了,此后在修改 int a = 2;在订阅时依然会发送 a=1。

    使用 defer 就可以将初始化延迟到订阅时,大概就是这个意思吧。
    28744fc330e1:同样表示没看懂
  • Davisxy:window这个没懂
    喝绿茶的考拉:@相公无爱 我理解window的功能和buffer类似 不过buffer 把数据源分割成list 而window把数据源分割成了observable
  • icoo:distinct 在第三篇里写过了,要去重啊:stuck_out_tongue_closed_eyes:
  • _大洲:从来只有听过RxJava的我竟然全看懂了:joy:
  • a953b6bd860e:整体不错,就是有些操作符没解释清楚
  • 飞奔的小马:debounce操作符 发射的最后一项即使把sleep时间改为小于500ms的值,还是会输出的,这个是怎么解释呢
    达则兼济天下:@zilch1974 你解释的很到位,两个相邻数据发射的时间间隔决定了前一个数据是否会被丢弃,然而demo代码中5是最后一个数据,所以后面设置的510ms并不影响它是否被丢弃,也仅仅起一个线程等待时间的作用吧。代码具体分析如下:
    emitter.onNext(1); // skip 先收到一个1
    Thread.sleep(400);
    emitter.onNext(2); // deliver 过了400ms收到一个2,小于设定时间500ms,把前一个丢掉,现在只有一个2
    Thread.sleep(505);
    emitter.onNext(3); // skip 过了505ms收到一个3,符合设定时间,保存,现在是2、3
    Thread.sleep(100);
    emitter.onNext(4); // deliver 过了100ms收到一个4,小于设定时间,把前一个丢掉,丢掉3,保存4,现在是2、4
    Thread.sleep(605);
    emitter.onNext(5); // deliver 过了605ms收到一个5,符合设定时间,保存,现在是2、4、5
    Thread.sleep(510);
    emitter.onComplete();
    e145da3c954d:我一开始也有这个疑惑,后来看了源码的解释后终于理解了。
    比如我按照以下顺序发送数据:1--->2--->3--->4--->5--->6,间隔时间依次为200,350,250,280,290ms,而我的debounce设定的时间为285,那么mirro的observable首先收到了一个1,200ms后收到了2,因为200小于285,所以前面的1会被丢弃掉,现在只有一个2,然后再过350ms后来了3,因为350大于设定时间,所以3会被存储,之后的4是250ms之后来的,所以它会替换掉4...
    就是这么一个流程。
  • 一步一年华:23的为啥会输出scan 1??
  • Xdjm:distinct在第三章讲过啦

本文标题:这可能是最好的RxJava 2.x 入门教程(四)

本文链接:https://www.haomeiwen.com/subject/aspjcxtx.html