美文网首页
RxJava学习总结-进程控制

RxJava学习总结-进程控制

作者: 取了个很好听的名字 | 来源:发表于2019-07-01 16:53 被阅读0次

前言

上一篇文章简单介绍了RxJava的使用,本文将在前文的基础上针对一些细节进行说明。

链式编程

链式编程不是什么新鲜的东西,很多框架中的构造者模式都使用了链式编程,RxJava自然也不例外。代码如下:

   Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("onSubscribe", "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("onNext", integer+"");
            }

            @Override
            public void onError(Throwable e) {
               Log.e("onError",e.getMessage());
            }

            @Override
            public void onComplete() {
               Log.e("onComplete","onComplete");
            }
        });

结果如下:

06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onSubscribe: onSubscribe
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 1
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 2
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onNext: 3
06-11 14:11:14.134 21877-21877/com.zhqy.myrxjavademo E/onComplete: onComplete

这里需要说明的是subscribe()有多个重载方法

    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}

第一个重载:上游发送的事件下游都不接收
第二个重载:只关心上游onNext()方法发送的事件
第三个重载:只关心上游onNext()和onError()发送的事件
第四个重载:关心上游onNext(),onError(),onComplete()发送的事件
第五个重载:和例子中使用的Observer类似,只是需要分别创建对象来接收各自的事件

线程控制

RxJava作为一个异步库上下游自然能够在进程间进行切换,但需要注意的是线程间的切换也需要遵守一些规则:
默认情况下上下游在同一个进程中,所处进程有所在的进程决定,举例来说,如果你把上下游写在Activity中,那么上下游均处于主线程中。代码如下:

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("thread", "上游处在"+Thread.currentThread().getName() );
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

                Log.e("thread","下游处在"+ Thread.currentThread().getName() );
            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

结果如下:

06-11 14:26:04.576 23290-23290/? E/thread: 下游处在main
06-11 14:26:04.576 23290-23290/? E/thread: 上游处在main

结果说明了上下游处在同一线程内。但很多情况下上下游处于不同的线程下,比如说网络请求时请求数据需要在子进程下,而更新UI则需要在UI线程下,这又该如何做呢,其实很简单,只需要设置如下代码

.subscribeOn(Schedulers.newThread())                                              
.observeOn(AndroidSchedulers.mainThread())  

其中subscribeOn指定上游所在线程,observeOn指定下游所在进程,结果如下:

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.e("thread", "上游处在"+Thread.currentThread().getName() );
            }
        }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

                Log.e("thread","下游处在"+ Thread.currentThread().getName() );
            }

            @Override
            public void onNext(Integer integer) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

注意:如果想在Android下使用RxJava需要添加如下依赖

    //RxAndroid
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

结果如下

06-11 14:34:12.541 23802-23802/com.zhqy.myrxjavademo E/thread: 下游处在main
06-11 14:34:12.543 23802-23830/com.zhqy.myrxjavademo E/thread: 上游处在RxNewThreadScheduler-1

从结果可以看出上下游处于不同的线程中,从而完成不同的功能。

相关文章

  • RxJava学习总结-进程控制

    前言 上一篇文章简单介绍了RxJava的使用,本文将在前文的基础上针对一些细节进行说明。 链式编程 链式编程不是什...

  • Python daemon守护进程!

    Python daemon 守护进程学习与总结。 守护进程 定义 守护进程是生存期长的一种进程。它们独立于控制终端...

  • RxJava——线程控制切换/调度

    本篇代码见:RxJava_Demo_Translater 这里开始学习RxJava线程控制(切换/调度)。一、Rx...

  • APUE第9章 进程关系、进程组、会话

    9 进程关系 在第8章学习了进程的控制原语,通过各种进程原语可以对进程进行控制,包括新建进程、执行新程序、终止进程...

  • RxJava学习总结

    四个基本概念 水管模型 作用 简单使 回调事件 observable.subscribe的方法说明 操作符 1.j...

  • RxJava学习总结

    什么是RxJava 1. 定义 RxJava is a Java VM implementation of Rea...

  • Rxjava学习总结

    参考:给 Android 开发者的 RxJava 详解-扔物线深入浅出RxJava 基础 "a library f...

  • 5.进程控制

    进程控制 这一节主要介绍Unix系统的进程控制,包括创建新进程、执行进程和进程终止。由于前面Linux学习部分有了...

  • RxJava入门与提高(1)

    前言 本教程是我自己学习Rxjava的系列总结。帮你从浅入深的学习RxJava。该系列借鉴了很多网上的优秀文章,并...

  • Parser 中GCD的使用(8)

    PFMultiProcessFileLockController.m 多进程,文件锁控制器 总结:并发屏障,根据需...

网友评论

      本文标题:RxJava学习总结-进程控制

      本文链接:https://www.haomeiwen.com/subject/rofqfctx.html