简介
ReactiveX是一个通过使用可观察序列来编写异步和基于事件的程序的库。
它扩展了观察者模式以支持数据和/或事件序列,并增加了运算符,使您可以声明性地组合序列,同时抽象出对低级线程,同步,线程安全性,并发数据结构和非线程等事物的关注阻塞I / O , Rx提供了一系列的操作符,你可以使用它们来过滤(filter)、选择(select)、变换(transform)、结合(combine)和组合(compose)多个Observable,这些操作符让执行和复合变得非常高效。
Rx模式
创建:Rx可以方便的创建事件流和数据流
组合:Rx使用查询式的操作符组合和变换数据流
监听:Rx可以订阅任何可观察的数据流并执行操作
简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码
异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制
轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题
创建操作符
Observables 被观察者事件源 ,SubScribers观察者
创建操作 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, Timer
Create操作符
create 操作符产生一个 Obserable 被观察者对象,通过上游 Observable 发射上游事件,观察者 Observer 接收下游事件。

//todo 操作符Create
Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
if(!e.isDisposed()){//在没有观察者的时候不发射节约资源
e.onNext("张飞");
e.onNext("赵子龙");
e.onNext("刘备");
}
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.print(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
//打印结果: 张飞 赵子龙 刘备
Defer 操作符
defer直到有订阅者订阅时,才通过Observable的工厂方法创建Observable并执行

Observable<String> observable = Observable.defer(new Callable<ObservableSource<String>>() {
@Override
public ObservableSource<String> call() throws Exception {
return Observable.just("张飞","赵子龙","刘备");
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String integer) {
System.out.print(integer);
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
//打印结果:张飞赵子龙刘备
Just 操作符
创建发送指定值的Observerble,just只是简单的原样发射,将数组或Iterable当做单个数据。如果传递的值为null,则发送的Observable的值为null。参数最多为9个

Observable.just("张飞", "赵子龙","刘备")
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
System.out.print(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
//打印结果:张飞赵子龙刘备
Range 操作符
Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。
RxJava将这个操作符实现为range函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。
range默认不在任何特定的调度器上执行。有一个变体可以通过可选参数指定Scheduler。

Observable .range(1, 6)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.print(integer);
}
});
//打印结果:123456
Timer 操作符
创建一个Observable,它延迟后发射数据。
Timer操作符创建一个在给定的时间段之后返回一个特殊值的Observable。

Observable.timer(2, TimeUnit.SECONDS)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
System.out.print(aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
网友评论