RxJava

作者: HOLLE_karry | 来源:发表于2020-04-26 18:51 被阅读0次

1.定义

一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
总结:RxJava 是一个 基于事件流、实现异步操作的库

2.作用

实现异步操作
类似于 Android中的 AsyncTask 、Handler作用

3.特点

由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:

逻辑简洁
实现优雅
使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁和优雅

4.RxJava原理介绍

Rxjava原理 基于 一种扩展的观察者模式

Rxjava的扩展观察者模式中有4个角色:
被观察者 (Observable)一一>产生事件­­­­
订阅(Subscribe)一一>连接被观察者和观察者
事件(Event)一一>被观察者和观察者沟通的载体
观察者(Observer)一一>接收事件并给出响应

总结:被观察者(Observable)通过 订阅(Subscribe)按顺序发送事件 给观察者(Observer),观察者(Observer)按顺序接收事件 并 作出对应的响应动作
普通事件 :onNext() 接收被观察者发送的消息
特殊的事件:
onCompleted() 事件队列完结
onError () 事件队列异常
注意:

1)RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
2)RxJava 规定,onNext() 接收被观察者发送的消息、可以执行多次;当不会再有新的 onNext () 发出时,需要触发 onCompleted () 方法作为标志。onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
3)在一个正确运行的事件序列中, onCompleted() 和 onError () 有且只有一个,并且是事件序列中的最后一个。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

5.基本使用

⑴依赖

  //RxJava
    implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 库
    implementation 'com.squareup.retrofit2:converter-gson:2.3.0'//转换器,请求结果转换成Model
    implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'//配合Rxjava 使用
    implementation 'com.google.code.gson:gson:2.6.2'//Gson 库

⑵步骤

创建被观察者并生产事件
创建观察者并 定义响应事件的行为
通过订阅(Subscribe)连接观察者和被观察者

①创建被观察者并生产事件

//创建被观察者 Observable 对象
//ObservableOnSubscribe<Integer>泛型指事件的类型
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 此处传入了一个 OnSubscribe 对象参数
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
    //在复写的subscribe()里定义需要发送的事件
    @Override
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
          // 通过 ObservableEmitter类对象产生事件并通知观察者
          // ObservableEmitter类介绍
          // a. 定义:事件发射器
          // b. 作用:定义需要发送的事件 & 向观察者发送事件
          emitter.onNext(1);
          emitter.onNext(2);
          emitter.onNext(3);
          emitter.onComplete();
     }
});

②创建观察者并 定义响应事件的行为

//创建观察者 (Observer )对象
Observer<Integer> observer = new Observer<Integer>() {
//创建对象时通过对应复写对应事件方法 从而 响应对应事件
       // 观察者接收事件前,默认最先调用复写 onSubscribe()
       @Override
       public void onSubscribe(Disposable d) {
            Log.d(TAG, "开始采用subscribe连接");
       }
       // 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
       @Override
       public void onNext(Integer value) {
            Log.d(TAG, "对Next事件作出响应" + value);
       }
       // 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
       @Override
       public void onError(Throwable e) {
             Log.d(TAG, "对Error事件作出响应");
       }
       // 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
       @Override
       public void onComplete() {
             Log.d(TAG, "对Complete事件作出响应");
       }
};

③通过订阅(Subscribe)连接观察者和被观察者

observable.subscribe(observer);

6.链式调用

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("你好");
                emitter.onNext("孩子");
                emitter.onNext("加油");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: "+d);
            }

            @Override
            public void onNext(String s) {
                Log.e(TAG, "onNext: "+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: "+e);
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        });

7.Consumer简化观察者:实现简便式的观察者模式

Observable.just("hello").subscribe(new Consumer<String>() {
     // 每次接收到Observable的事件都会调用Consumer.accept()
     @Override
     public void accept(String s) throws Exception {
           System.out.println(s);
     }
});

8.倒计时

Disposable.dispose() 切断观察者 与 被观察者 之间的连接
即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件

// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(1,1, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    private Disposable Dis;

                    @Override
                    public void onSubscribe(Disposable d) {
                        this.Dis=d;
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e(TAG, "onNext: "+(3-aLong));
                        if (3-aLong==0){
                            Dis.dispose();
                        }
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

9.创建操作符

创建操作符.png
⑴基本创建

①create()

/ **
   * 1. 通过creat()创建被观察者 Observable 对象
   */ 
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
          // 传入参数: OnSubscribe 对象
          // 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
          // 即观察者会依次调用对应事件的复写方法从而响应事件
          // 从而实现由被观察者向观察者的事件传递 & 被观察者调用了观察者的回调方法 ,即观察者模式
/ **
   * 2. 在复写的subscribe()里定义需要发送的事件
   */ 
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 通过 ObservableEmitter类对象 产生 & 发送事件
                // ObservableEmitter类介绍
                    // a. 定义:事件发射器
                    // b. 作用:定义需要发送的事件 & 向观察者发送事件
                   // 注:建议发送事件前检查观察者的isUnsubscribed状态,以便在没有观察者时,让Observable停止发射数据
                    if (!observer.isUnsubscribed()) {
                           emitter.onNext(1);
                           emitter.onNext(2);
                           emitter.onNext(3);
                }
                emitter.onComplete();
            }
        });
// 至此,一个完整的被观察者对象(Observable)就创建完毕了。
⑵发送事件

①just()

•作用:快速创建1个被观察者对象(Observable)
•发送事件的特点:直接发送 传入的事件
•应用场景:快速创建 被观察者对象(Observable) & 发送10个以下事件

// 1. 创建时传入整型1、2、3、4
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
        Observable.just(1, 2, 3,4)   
            // 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
            // 2. 通过通过订阅(subscribe)连接观察者和被观察者
            // 3. 创建观察者 & 定义响应事件的行为
         .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "开始采用subscribe连接");
            }
            // 默认最先调用复写的 onSubscribe()
            @Override
            public void onNext(Integer value) {
                Log.d(TAG, "接收到了事件"+ value  );
            }
            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "对Error事件作出响应");
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "对Complete事件作出响应");
            }
        });
    }

②fromArray()

•作用
快速创建1个被观察者对象(Observable)
发送事件的特点:直接发送 传入的数组数据
•应用场景
快速创建 被观察者对象(Observable) & 发送10个以上事件(数组形式)
数组元素遍历

③fromIterable()

•作用
快速创建1个被观察者对象(Observable)
发送事件的特点:直接发送 传入的集合List数据
•应用场景
快速创建 被观察者对象(Observable) & 发送10个以上事件(集合形式)
集合元素遍历

④never()
该方法创建的被观察者对象发送事件的特点:不发送任何事件
⑤empty()
该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成,观察者接收后会直接调用onCompleted()
⑥error()
该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常

⑶延迟创建

①defer()

•作用
直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
•应用场景
动态创建被观察者对象(Observable) & 获取最新的Observable对象数据

②timer()

•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
本质 = 延迟指定时间后,调用一次 onNext(0)
•应用场景
延迟指定事件,发送一个0,一般用于检测

③interval

•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:每隔指定时间 就发送 事件
发送的事件序列 = 从0开始、无限递增1的的整数序列,需要手动停止

④intervalRange

•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
a. 发送的事件序列 = 从0开始、无限递增1的的整数序列
b. 作用类似于interval(),但可指定发送的数据的数量

⑤range()

•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:连续发送 1个事件序列,可指定范围
a. 发送的事件序列 = 从0开始、无限递增1的的整数序列
b. 作用类似于intervalRange(),但区别在于:无延迟发送事件

⑥rangeLong()

•作用:类似于range(),区别在于该方法支持数据类型 = Long
具体使用 与range()类似,此处不作过多描述

10.转换操作符

⑴Map()

•作用
map对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件,即 将被观察者发送的事件转换为任意的类型事件。
•应用场景
数据类型转换,开发中一般是处理服务器返回的数据

    private void map() {
        Observable.just(1, 2, 3, 4)///发送的是整数类型的事件
                .map(new Function<Integer, String>() {
                    //new Function<Integer, Object>()
                    //泛型1:只的是被观察者发送的事件类型
                    //泛型2:我们最终转换完成后事件的类型
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "数据进行转换:" + integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

⑵FlatMap()

•作用:将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送
•原理
a.为事件序列中每个事件都创建一个 Observable 对象;

b.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
c.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
d.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
•应用场景
无序的将被观察者发送的整个事件序列进行变换

@SuppressLint("CheckResult")
    private void flatmap() {
        Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    //泛型1:被观察者发送的事件类型
                    //泛型2:ObservableSource<?> 转换数据的类,他里面的泛型是我们最终拆分合并
                    //之后的事件类型
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        //将每一个事件进行拆分
                        ArrayList<String> list = new ArrayList<>();
                        for (int i = 0; i < 4; i++) {
                            list.add("事件:" + integer + ",拆分之后的事件:" + i);
                        }
                        //快速创建一个被观察者,参数是list类型,一般是发送10个以上
                        return Observable.fromIterable(list);
                    }
                }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, "accept: " + s);
            }
        });
    }

⑶ConcatMap()

•作用:类似FlatMap()操作符
与FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序

⑷Buffer()

•作用
定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送
•应用场景
缓存被观察者发送的事件

11.过滤操作符

过滤操作符.png

filter返回true,事件发送到观察者,返回flase,事件拦截

12.功能性操作符

功能性操作符.png

13.Rxjava+Retrofit

//在主线程进行网络请求会报NetworkOnMainThreadException的异常
//多次指定被观察者运行的线程只有第一次有效
//多次指定观察者运行的线程最后一次有效
new Retrofit.Builder().baseUrl(ApiService.url)
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                .build().create(ApiService.class)
                .getData(page)
                .subscribeOn(Schedulers.io())//io操作线程,被观察者运行的线程,因为被观察者在运行网络请求,所以指定子线程
                .observeOn(AndroidSchedulers.mainThread())//Android主线程,观察者运行的线程,一般刷新ui,所以在主线程
                .subscribe(new Observer<Bean>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Bean bean) {
                        if (bean != null) {
                            list.addAll(bean.getResults());
                            adapter.notifyDataSetChanged();
                        }
                    }
                    @Override
                    public void onError(Throwable e) {

                    }
                    @Override
                    public void onComplete() {

                    }
                });

14.AsyncTask与RxJava的区别

•RxJava 实现异步操作是通过一种扩展的观察者模式来实现的。
•异步、简洁(逻辑、代码读写)。
•RxJava 内部支持多线程操作
•AyncTask是采用线程池的形式实现的。
•出现错误的处理-rxjava 自身有错误的方法回调,aync无法做到。
•并发的请求,rxjava 通过操作符能够完成各种并发情况,而AyncTask不行。

15.扩展

RxJava

相关文章

网友评论

      本文标题:RxJava

      本文链接:https://www.haomeiwen.com/subject/wrorwhtx.html