上一篇文章中定义了Rx = Observable + Operator + Scheduler。Rx以经典观察者模式为骨架、并扩展之使得我们能够以类似使用Iterable的方式使用Observable。
Rx最为重要的两个要素是:数据流和异步(实际上Rx把数据流都视作异步的)。今天的主角便是数据流——Observable。根据上下文语义的需要,本系列文中可能另称之为数据序列、事件流、被观察者。
观察者
在Rx的世界中,(几乎)每一个故事都从“观察者订阅了数据流”开始。观察者——Observer——好比哨兵,时刻监视着数据流的动静,一旦有数据发射或通知发送便立即响应。观察者实现了以下三个方法的子集:
-
onNext-- 当数据流发射流中任意一个数据时会调用观察者的onNext方法,并将发射的数据作为参数。 -
onError-- 当数据流产生数据失败或发生其他异常时会调用观察者的onError方法,并将失败原因(Throwable)作为参数。 -
onComplete-- 当数据流中的所有数据全部正常发射完会调用观察着的onComplete方法。
当数据流调用观察者的onError/onComplete时,我们称它发送了错误/完成通知。观察者只能收到来自某个数据流的一个通知,也就是说如果收到了流的错误通知,就不可能再收到该流的完成通知,反之亦然。一旦观察者收到了通知,便不能接收任何由该流发射的数据。
注:数据流可以发送多个通知,也可以在发送通知之后继续发送数据,只是观察者收到通知后就单方面把该流“拉黑”了而已。有时候为了实现一些特殊功能,我们不得不允许Observer不受限制地接收数据和通知(RxJava2的源码中也存在着这样的实现,比如:ObservableConcatMap.SourceObserver.InnerObserver就可以多次接收onComplete通知)。
Rx编程模型
我们先看一个常规的方法调用过程,程序会按照代码书写的顺序逐步地执行指令并返回结果,以同步的方式完成任务:
- 先调用某个方法。
- 把方法的返回值赋值给某个变量。
- 使用该变量执行后续指令以完成任务。
在Rx中,数据流用于定义产生、处理数据的机制,一旦有观察者订阅(subscribe)了该流,其预定义的机制立即生效,观察者等待数据发射或通知发送并响应:
- 定义一个数据流,该流定义了一个异步操作,可以产生一个或多个数据。
- 定义一个观察者,并为它定义一个方法(
onNext),该方法用来消费第一步的异步操作发射的数据。 - 观察者订阅数据流(于是故事开始了),数据流的异步操作被触发,然后生产发射数据,或发送通知(以结束整个故事)。
如果程序需要完成多个不存在互相依赖的任务,由于Rx中指令可以异步并发地执行,我们可以同时启动多个任务,而不用依次地等待某个任务完成再启动下一个。
Observable操作符
掌握数据流和观察者之后,我们能比以前更好地处理数据序列(而不限于单个数据)。然而Rx真正的核武器是操作符Operator。我们先了解一下Rx有哪些操作符。
-
创建型
Create,Defer,Empty/Never/Throw,From,Interval,Just,Range,Repeat,Start,Timer -
变换型
Buffer,FlatMap,GroupBy,Map,Scan,Window -
过滤型
Debounce,Distinct,ElementAt,Filter,First,IgnoreElements,Last,Sample,Skip,SkipLast,Take,TakeLast -
组合型
And/Then/When,CombineLatest,Join,Merge,StartWith,Switch,Zip -
容错型
Catch,Retry -
工具型
Delay,Do,Materialize/Dematerialize,ObserveOn,Serialize,Subscribe,SubscribeOn,TimeInterval,Timeout,Timestamp,Using -
条件型
All,Amb,Contains,DefaultIfEmpty,SequenceEqual,SkipUntil,SkipWhile,TakeUntil,TakeWhile -
聚合型
Average,Concat,Count,Max,Min,Reduce,Sum -
转换型
To -
连接型
Connect,Publish,RefCount,Replay - 背压型
多数操作符仍然返回一个数据流,这种方式允许我们在程序中链式地对数据流调用操作符——联想一下builder(构建者)模式的链式调用——与builder模式不同的是,Observable的操作符返回了一个新Observable,这个新Observable是原Observable的代理。
应用了操作符后,单单用“数据流”已经无法准确描述Observable的含义。我增加“原始流”、“上游”和“下游”以及“支流(流中流)”来区分不同意义的Observable。“数据流”是Observable的泛称。
- 原始流——全称为“原始数据流”,指代由创建型操作符返回的
Observable。 - 上游和下游——二者必须成对地出现。对
Observable调用非创建型操作符后,“上游”指代原Observable,“下游”指代返回的新Observable。 - 支流或流中流——仅仅在应用
FlatMap和ConcatMap操作符的场景中使用这一称谓。“支流”指代这两个操作符的mapper返回的子Observable,“支流”亦称“流中流”。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
从flatMap的方法签名分析,它接受一个mapper参数,此mapper将上游中的数据变换成一个ObservableSource,新Observable中的数据是ObservableSource类型——颇有子Observable的味道——这不就是“流中流”(“流中流”自带解释功能,理解之后还是叫“支流”比较自然)吗?
需要注意的是:最终Rx将整合子Observable(支流)中的所有数据而不是子Observable本身汇入下游。后面的文章会详细地对FlatMap和ConcatMap进行源码分析。
RxJava2#Observable类(源码基于v2.1.5)
Observable是一个抽象类,实现了ObservableSource(void subscribe())接口。该类有且仅有一个抽象方法subscribeActual,其他非private(private方法也就3个)方法要么是static的,要么是final的。这意味着定义自己的ObservableCustom是件非常简单的事情,Observable类已经完成了99.99%的工作,我们只需要override subscribeActual方法就够了。
Observable所有的创建型操作符都是静态的,比如Just:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 通过RxJavaPlugins的setters可以在运行时改变默认的行为
// 如果程序中没有调用RxJavaPlugins.setOnObservableAssembly(xxx),下面一行代码跟其后一行注释完全等效
return RxJavaPlugins.onAssembly(new ObservableJust(item));
// return new ObservableJust(item);
}
我们可以看到Just操作符本质上构造了一个ObservableJust对象。RxJava2内置了大量的ObservableXXX(XXX往往是操作符的名字比如Just)。
再来看一个非创建型的操作符Map:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap(this, mapper));
}
以及ObservableMap类的核心代码:
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
public void subscribeActual(Observer<? super U> t) {
this.source.subscribe(new ObservableMap.MapObserver(t, this.function));
}
}
重点看ObservableMap构造方法,它接收ObservableSource类型的对象作为第一个参数——回忆一下代理模式——创建了原Observable的代理,也就是新ObservableMap实例。
RxJava2中大量运用了代理模式,细心的你或许已经发现:在subscribeActual方法中,还创建了一个原Observer的代理——MapObserver的实例。










网友评论