1.定义
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
总结:RxJava 是一个 基于事件流、实现异步操作的库
2.作用
实现异步操作
类似于 Android中的 AsyncTask 、Handler作用
3.特点
由于 RxJava的使用方式是:基于事件流的链式调用,所以使得 RxJava:
逻辑简洁
实现优雅
使用简单
更重要的是,随着程序逻辑的复杂性提高,它依然能够保持简洁和优雅
4.RxJava原理介绍
Rxjava原理 基于 一种扩展的观察者模式
Rxjava的扩展观察者模式中有4个角色:
被观察者 (Observable)一一>产生事件
订阅(Subscribe)一一>连接被观察者和观察者
事件(Event)一一>被观察者和观察者沟通的载体
观察者(Observer)一一>接收事件并给出响应
总结:被观察者(Observable)通过 订阅(Subscribe)按顺序发送事件 给观察者(Observer),观察者(Observer)按顺序接收事件 并 作出对应的响应动作
普通事件 :onNext() 接收被观察者发送的消息
特殊的事件:
onCompleted() 事件队列完结
onError () 事件队列异常
❤注意:
1)RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。
2)RxJava 规定,onNext() 接收被观察者发送的消息、可以执行多次;当不会再有新的 onNext () 发出时,需要触发 onCompleted () 方法作为标志。onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
3)在一个正确运行的事件序列中, onCompleted() 和 onError () 有且只有一个,并且是事件序列中的最后一个。
4)需要注意的是,onCompleted()和 onError () 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
5.基本使用
⑴依赖
//RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.2.4'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'com.squareup.retrofit2:retrofit:2.5.0'//retrofit 库
implementation 'com.squareup.retrofit2:converter-gson:2.3.0'//转换器,请求结果转换成Model
implementation 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'//配合Rxjava 使用
implementation 'com.google.code.gson:gson:2.6.2'//Gson 库
⑵步骤
创建被观察者并生产事件
创建观察者并 定义响应事件的行为
通过订阅(Subscribe)连接观察者和被观察者
①创建被观察者并生产事件
//创建被观察者 Observable 对象
//ObservableOnSubscribe<Integer>泛型指事件的类型
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 此处传入了一个 OnSubscribe 对象参数
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现被观察者调用了观察者的回调方法 & 由被观察者向观察者的事件传递,即观察者模式
//在复写的subscribe()里定义需要发送的事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通过 ObservableEmitter类对象产生事件并通知观察者
// ObservableEmitter类介绍
// a. 定义:事件发射器
// b. 作用:定义需要发送的事件 & 向观察者发送事件
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
});
②创建观察者并 定义响应事件的行为
//创建观察者 (Observer )对象
Observer<Integer> observer = new Observer<Integer>() {
//创建对象时通过对应复写对应事件方法 从而 响应对应事件
// 观察者接收事件前,默认最先调用复写 onSubscribe()
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 当被观察者生产Next事件 & 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onNext(Integer value) {
Log.d(TAG, "对Next事件作出响应" + value);
}
// 当被观察者生产Error事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
// 当被观察者生产Complete事件& 观察者接收到时,会调用该复写方法 进行响应
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
};
③通过订阅(Subscribe)连接观察者和被观察者
observable.subscribe(observer);
6.链式调用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("你好");
emitter.onNext("孩子");
emitter.onNext("加油");
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: "+d);
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext: "+s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: "+e);
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
7.Consumer简化观察者:实现简便式的观察者模式
Observable.just("hello").subscribe(new Consumer<String>() {
// 每次接收到Observable的事件都会调用Consumer.accept()
@Override
public void accept(String s) throws Exception {
System.out.println(s);
}
});
8.倒计时
Disposable.dispose() 切断观察者 与 被观察者 之间的连接
即观察者 无法继续 接收 被观察者的事件,但被观察者还是可以继续发送事件
// 参数1 = 第1次延迟时间;
// 参数2 = 间隔时间数字;
// 参数3 = 时间单位;
Observable.interval(1,1, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
private Disposable Dis;
@Override
public void onSubscribe(Disposable d) {
this.Dis=d;
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: "+(3-aLong));
if (3-aLong==0){
Dis.dispose();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
9.创建操作符

⑴基本创建
①create()
/ **
* 1. 通过creat()创建被观察者 Observable 对象
*/
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 传入参数: OnSubscribe 对象
// 当 Observable 被订阅时,OnSubscribe 的 call() 方法会自动被调用,即事件序列就会依照设定依次被触发
// 即观察者会依次调用对应事件的复写方法从而响应事件
// 从而实现由被观察者向观察者的事件传递 & 被观察者调用了观察者的回调方法 ,即观察者模式
/ **
* 2. 在复写的subscribe()里定义需要发送的事件
*/
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// 通过 ObservableEmitter类对象 产生 & 发送事件
// ObservableEmitter类介绍
// a. 定义:事件发射器
// b. 作用:定义需要发送的事件 & 向观察者发送事件
// 注:建议发送事件前检查观察者的isUnsubscribed状态,以便在没有观察者时,让Observable停止发射数据
if (!observer.isUnsubscribed()) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
emitter.onComplete();
}
});
// 至此,一个完整的被观察者对象(Observable)就创建完毕了。
⑵发送事件
①just()
•作用:快速创建1个被观察者对象(Observable)
•发送事件的特点:直接发送 传入的事件
•应用场景:快速创建 被观察者对象(Observable) & 发送10个以下事件
// 1. 创建时传入整型1、2、3、4
// 在创建后就会发送这些对象,相当于执行了onNext(1)、onNext(2)、onNext(3)、onNext(4)
Observable.just(1, 2, 3,4)
// 至此,一个Observable对象创建完毕,以下步骤仅为展示一个完整demo,可以忽略
// 2. 通过通过订阅(subscribe)连接观察者和被观察者
// 3. 创建观察者 & 定义响应事件的行为
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
// 默认最先调用复写的 onSubscribe()
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
}
②fromArray()
•作用
快速创建1个被观察者对象(Observable)
发送事件的特点:直接发送 传入的数组数据
•应用场景
快速创建 被观察者对象(Observable) & 发送10个以上事件(数组形式)
数组元素遍历
③fromIterable()
•作用
快速创建1个被观察者对象(Observable)
发送事件的特点:直接发送 传入的集合List数据
•应用场景
快速创建 被观察者对象(Observable) & 发送10个以上事件(集合形式)
集合元素遍历
④never()
该方法创建的被观察者对象发送事件的特点:不发送任何事件
⑤empty()
该方法创建的被观察者对象发送事件的特点:仅发送Complete事件,直接通知完成,观察者接收后会直接调用onCompleted()
⑥error()
该方法创建的被观察者对象发送事件的特点:仅发送Error事件,直接通知异常
⑶延迟创建
①defer()
•作用
直到有观察者(Observer )订阅时,才动态创建被观察者对象(Observable) & 发送事件
•应用场景
动态创建被观察者对象(Observable) & 获取最新的Observable对象数据
②timer()
•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:延迟指定时间后,发送1个数值0(Long类型)
本质 = 延迟指定时间后,调用一次 onNext(0)
•应用场景
延迟指定事件,发送一个0,一般用于检测
③interval
•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:每隔指定时间 就发送 事件
发送的事件序列 = 从0开始、无限递增1的的整数序列,需要手动停止
④intervalRange
•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:每隔指定时间 就发送 事件,可指定发送的数据的数量
a. 发送的事件序列 = 从0开始、无限递增1的的整数序列
b. 作用类似于interval(),但可指定发送的数据的数量
⑤range()
•作用
快速创建1个被观察者对象(Observable)
•发送事件的特点:连续发送 1个事件序列,可指定范围
a. 发送的事件序列 = 从0开始、无限递增1的的整数序列
b. 作用类似于intervalRange(),但区别在于:无延迟发送事件
⑥rangeLong()
•作用:类似于range(),区别在于该方法支持数据类型 = Long
具体使用 与range()类似,此处不作过多描述
10.转换操作符
⑴Map()
•作用
map对 被观察者发送的每1个事件都通过 指定的函数 处理,从而变换成另外一种事件,即 将被观察者发送的事件转换为任意的类型事件。
•应用场景
数据类型转换,开发中一般是处理服务器返回的数据
private void map() {
Observable.just(1, 2, 3, 4)///发送的是整数类型的事件
.map(new Function<Integer, String>() {
//new Function<Integer, Object>()
//泛型1:只的是被观察者发送的事件类型
//泛型2:我们最终转换完成后事件的类型
@Override
public String apply(Integer integer) throws Exception {
return "数据进行转换:" + integer;
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
⑵FlatMap()
•作用:将被观察者发送的事件序列进行 拆分 & 单独转换,再合并成一个新的事件序列,最后再进行发送
•原理
a.为事件序列中每个事件都创建一个 Observable 对象;
b.将对每个 原始事件 转换后的 新事件 都放入到对应 Observable对象;
c.将新建的每个Observable 都合并到一个 新建的、总的Observable 对象;
d.新建的、总的Observable 对象 将 新合并的事件序列 发送给观察者(Observer)
•应用场景
无序的将被观察者发送的整个事件序列进行变换
@SuppressLint("CheckResult")
private void flatmap() {
Observable.just(1, 2, 3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
//泛型1:被观察者发送的事件类型
//泛型2:ObservableSource<?> 转换数据的类,他里面的泛型是我们最终拆分合并
//之后的事件类型
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
//将每一个事件进行拆分
ArrayList<String> list = new ArrayList<>();
for (int i = 0; i < 4; i++) {
list.add("事件:" + integer + ",拆分之后的事件:" + i);
}
//快速创建一个被观察者,参数是list类型,一般是发送10个以上
return Observable.fromIterable(list);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.d(TAG, "accept: " + s);
}
});
}
⑶ConcatMap()
•作用:类似FlatMap()操作符
与FlatMap()的 区别在于:拆分 & 重新合并生成的事件序列 的顺序 = 被观察者旧序列生产的顺序
⑷Buffer()
•作用
定期从 被观察者(Obervable)需要发送的事件中 获取一定数量的事件 & 放到缓存区中,最终发送
•应用场景
缓存被观察者发送的事件
11.过滤操作符

filter返回true,事件发送到观察者,返回flase,事件拦截
12.功能性操作符

13.Rxjava+Retrofit
//在主线程进行网络请求会报NetworkOnMainThreadException的异常
//多次指定被观察者运行的线程只有第一次有效
//多次指定观察者运行的线程最后一次有效
new Retrofit.Builder().baseUrl(ApiService.url)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build().create(ApiService.class)
.getData(page)
.subscribeOn(Schedulers.io())//io操作线程,被观察者运行的线程,因为被观察者在运行网络请求,所以指定子线程
.observeOn(AndroidSchedulers.mainThread())//Android主线程,观察者运行的线程,一般刷新ui,所以在主线程
.subscribe(new Observer<Bean>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Bean bean) {
if (bean != null) {
list.addAll(bean.getResults());
adapter.notifyDataSetChanged();
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
14.AsyncTask与RxJava的区别
•RxJava 实现异步操作是通过一种扩展的观察者模式来实现的。
•异步、简洁(逻辑、代码读写)。
•RxJava 内部支持多线程操作
•AyncTask是采用线程池的形式实现的。
•出现错误的处理-rxjava 自身有错误的方法回调,aync无法做到。
•并发的请求,rxjava 通过操作符能够完成各种并发情况,而AyncTask不行。
网友评论