关键元素
在前一章中,我们介绍了rxcpp库及其编程模型。我们编写了一些程序来理解这个库的工作原理。我们还讨论了rxcpp库的基本元素。在本章中,我们将深入讨论rxcpp库的关键元素,以及一般的响应式编程模型,包括以下内容:
- 可观察对象
- 观察者及其变体(订阅者)
- 主题
- 调度器
- 操作符
实际上,响应式编程的关键方面如下:
- 可观察对象是观察者可以订阅用于通知的流
- subject是可观测对象和观察者的组合
- 调度程序执行与操作符关联的操作,并帮助将数据从可观察对象流到观察者
- 操作符是接受一个可观测值并发出另一个可观测值的函数
可观察对象
在前一章中,我们从零开始创建可观察性,并为这些可观察性编写订阅者。在我们的所有示例中,observables创建了producer类的一个实例。producer类生成一个事件流。换句话说,observable是连接订阅者和生产者的函数。在我们继续之前,让我们来剖析一个可观察到的和与之相关的核心活动:
- 可观察对象是一个函数,它接受一个观察者作为参数并返回一个函数
- 一个可观察对象连接一个观察者和一个生产者
- 生产者是一个可观察值的来源
- 观察者是具有on_next、on_error和on_completed方法的对象
生产者是什么?
生产者是一个可观察值的来源。生成器可以是windows、定时器、websocket、dom树、集合/容器上的迭代器等等。它们可以是任何可以传递给Observer.Next(value)(在rxcpp中, observer.on_next(value))。
热与冷的可观察对象
Observable 热:直播。所有的观察者,无论进来的早还是晚,看到的是同样内容的同样进度,订阅的时候得到的都是最新时刻发送的值。
Observable 冷:点播。 新的订阅者每次从头开始。
冷可观察对象
如果底层的生产者是在订阅期间创建并激活的,那么 Observable 就是“冷的”。这意味着,如果 Observables 是函数,而生产者是通过调用该函数创建并激活的。
- 创建生产者
- 激活生产者
- 开始监听生产者
- 单播
比如:一开始有个订阅者,两秒后又有个订阅者,这两个序列按照自己的节奏走的,不同步。每个流进行都会从interval的0开始。
在内部创建了生产者的任何可观测对象都称为冷可观察对象。
//---------- ColdObservable.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
//----------- Get a Coordination
auto eventloop = rxcpp::observe_on_event_loop();
//----- Create a Cold Observable
auto values = rxcpp::observable<>::interval(
std::chrono::seconds(2)).take(2);
//----- Subscribe Twice
values.
subscribe_on(eventloop).
subscribe(
[](int v) {printf("[1] onNext: %d\n", v); },
[]() {printf("[1] onCompleted\n"); });
values.
subscribe_on(eventloop).
subscribe(
[](int v) {printf("[2] onNext: %d\n", v); },
[]() {printf("[2] onCompleted\n"); });
//---- make a blocking subscription to see the results
values.as_blocking().subscribe();
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
}
interval创建一个冷的可观察对象,因为事件流的生产者是由interval函数实例化的。当订阅或观察者附加到可观察对象时,冷可观察对象将发出数据。即使订阅有延迟,结果也将是一致的。这意味着我们将得到所有由可观察对象发出的数据。
对于每次运行,控制台中内容的顺序可能会改变,因为我们正在调度观察者方法在同一线程中的执行。延迟订阅不会造成任何数据丢失。
热可观察对象
如果底层的生产者是在 订阅外部创建或激活的,那么 Observable 就是“热的”。
- 共享生产者的引用
- 开始监听生产者
- 多播(通常情况下)
在外部创建生产者可观察对象称为热可观察对象
我们可以通过调用可观察对象的发布方法将冷可观察对象转换为热可观察对象。将冷可观察对象转换为热可观察对象的结果是,以后的订阅将丢失数据。不管是否有订阅,热可观察对象都会发出数据。
//---------- HotObservable.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
auto eventloop = rxcpp::observe_on_event_loop();
//----- Create a Cold Observable
//----- Convert Cold Observable to Hot Observable
//----- using .Publish();
auto values = rxcpp::observable<>::interval(
std::chrono::seconds(2)).take(2).publish();
//----- Subscribe Twice
values.
subscribe_on(eventloop).
subscribe(
[](int v) {printf("[1] onNext: %d\n", v); },
[]() {printf("[1] onCompleted\n"); });
values.
subscribe_on(eventloop).
subscribe(
[](int v) {printf("[2] onNext: %d\n", v); },
[]() {printf("[2] onCompleted\n"); });
//------ Start Emitting Values
values.connect();
//---- make a blocking subscription to see the results
values.as_blocking().subscribe();
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
}
rxcpp支持的publish_synchronized机制。从编程接口的角度来看,这只是一个小小的变化。
热可观察对象和重放机制
不管是否有订阅服务器,热可观察对象都会发出数据。这有时会成为一个问题。在响应式编程中有一种机制可以缓存数据,以便稍后的订阅者可以通过一个可观察的对象获得可用数据的通知。我们可以使用.replay()方法来创建这样一个可观察对象。这在编写热可观察对象时非常有用:
//---------- ReplayAll.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
auto values = rxcpp::observable<>::interval(
std::chrono::milliseconds(50),
rxcpp::observe_on_new_thread()).
take(5).replay();
// Subscribe from the beginning
values.subscribe(
[](long v) {printf("[1] OnNext: %ld\n", v); },
[]() {printf("[1] OnCompleted\n"); });
// Start emitting
values.connect();
// Wait before subscribing
rxcpp::observable<>::timer(
std::chrono::milliseconds(125)).subscribe([&](long) {
values.as_blocking().subscribe(
[](long v) {printf("[2] OnNext: %ld\n", v); },
[]() {printf("[2] OnCompleted\n"); });
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
}
在编写响应式程序时,您确实需要理解热可观察对象和冷可观察对象之间的语义差异。
观察者及其变体(订阅者)
观察者订阅一个可观察对象,并等待事件被通知。订阅者是观察者和订阅者的组合。订阅者具有取消订阅的功能。对于普通的观察者,您只能订阅:
//---- Subscriber.cpp
#include "rxcpp/rx.hpp"
int main()
{
//----- create a subscription object
auto subscription = rxcpp::composite_subscription();
//----- Create a Subscription
auto subscriber = rxcpp::make_subscriber<int>(
subscription,
[&](int v) {
printf("OnNext: --%d\n", v);
if (v == 3)
subscription.unsubscribe();
},
[]() { printf("OnCompleted\n"); });
rxcpp::observable<>::create<int>(
[](rxcpp::subscriber<int> s) {
for (int i = 0; i < 5; ++i) {
if (!s.is_subscribed())
break;
s.on_next(i);
}
s.on_completed();
}).subscribe(subscriber);
return 0;
}
对于编写具有并发性和动态性的非平凡程序,订阅和取消订阅的能力非常方便。
Subject
Subject可以看成是一个桥梁或者代理,它同时充当了Observer和Observable的角色。因为它是一个Observer,它可以订阅一个或多个Observable;又因为它是一个Observable,它可以转发它收到(Observe)的数据,也可以发射新的数据。
由于一个Subject订阅一个Observable,它可以触发这个Observable开始发射数据(如果那个Observable是"冷"的--就是说,它等待有订阅才开始发射数据)。因此有这样的效果,Subject可以把原来那个"冷"的Observable变成"热"的。在rxcpp中实现了四种不同的主题。它们如下:
- Subject
- BehaviorSubject
- ReplaySubject
- SynchronizeSubject
subject
让我们编写一个简单的程序,它将以观察者的身份订阅数据,并作为一对订阅者的可观察对象:
//---- Subscriber.cpp
#include "rxcpp/rx.hpp"
int main()
{
//----- create a subscription object
auto subscription = rxcpp::composite_subscription();
//----- Create a Subscription
auto subscriber = rxcpp::make_subscriber<int>(
subscription,
[&](int v) {
printf("OnNext: --%d\n", v);
if (v == 3)
subscription.unsubscribe();
},
[]() { printf("OnCompleted\n"); });
rxcpp::observable<>::create<int>(
[](rxcpp::subscriber<int> s) {
for (int i = 0; i < 5; ++i) {
if (!s.is_subscribed())
break;
s.on_next(i);
}
s.on_completed();
}).subscribe(subscriber);
return 0;
}
BehaviorSubject
当观察者订阅BehaviorSubject时,它开始发射原始Observable最近发射的数据(如果此时还没有收到任何数据,它会发射一个默认值),然后继续发射其它任何来自原始Observable的数据。
image.png
然而,如果原始的Observable因为发生了一个错误而终止,BehaviorSubject将不会发射任何数据,只是简单的向前传递这个错误通知。
image.png
behaviorsubject是subject的变体,它存储最后发出的(当前的)值作为实现的一部分。任何新订阅服务器都将立即获得当前值。否则,它的行为就像一个正常的subject。behaviorsubject也被称为属性或单元格。在使用一系列数据更新特定单元或内存的场景中(例如在事务中),它非常有用。有两种方法可以拿到 BehaviorSubject “当前”的值:访问其 .value 属性或者直接订阅。如果你选择了订阅,那么 BehaviorSubject 将直接给订阅者发送当前存储的值,无论这个值有多么“久远”,详见程序:
//-------- BehaviorSubject.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
rxcpp::subjects::behavior<int> behsubject(0);
auto observable = behsubject.get_observable();
observable.subscribe([](int v) {
printf("1------%d\n", v);
});
observable.subscribe([](int v) {
printf("2------%d\n", v);
});
auto subscriber = behsubject.get_subscriber();
subscriber.on_next(1);
subscriber.on_next(2);
int n = behsubject.get_value();
observable.subscribe([](int v) {
printf("3------%d\n", v);
});
printf("Last Value ....%d\n", n);
}
replaysubject
replaysubject是subject的一个变体,它存储已经发出的数据。我们可以指定参数来指示subject必须保留多少值。这在处理热点观测数据时非常方便。各种回放重载的原型如下:
replay(Coordination cn, [optional] composite_subscription cs)
replay(std::size_t count, Coordination cn, [optional]composite_subscription cs)
replay(duration period, Coordination cn, [optional] composite_subscription cs)
replay(std::size_t count, duration period, Coordination cn, [optional] composite_subscription cs)
相比 BehaviorSubject 而言,ReplaySubject 是可以给新订阅者发送“旧”数据的。另外,ReplaySubject 还有一个额外的特性就是它可以记录一部分的 observable execution,从而存储一些旧的数据用来“重播”给新来的订阅者。
image.png
如果你把ReplaySubject当作一个观察者使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。
当创建 ReplaySubject 时,你可以指定存储的数据量以及数据的过期时间。也就是说,你可以实现:给新来的订阅者“重播”订阅前一秒内的最后5个已广播的值,如以下的程序:
//------------- ReplaySubject.cpp
#include <rxcpp/rx.hpp>
#include <memory>
int main(int argc, char *argv[]) {
//----------- instantiate a ReplaySubject
rxcpp::subjects::replay<int, rxcpp::observe_on_one_worker>
replay_subject(5, rxcpp::observe_on_new_thread());
//---------- get the observable interface
auto observable = replay_subject.get_observable();
//---------- Subscribe!
observable.subscribe([](int v) {
printf("1------%d\n", v);
});
//--------- get the subscriber interface
auto subscriber = replay_subject.get_subscriber();
//---------- Emit data
for (int i = 0; i < 10; i++)
{
subscriber.on_next(i);
}
//----------- Wait for 100 milliseconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(100)).
subscribe([&](long) {});
//-------- Add a new subscriber
//-------- A normal subject will drop data
//-------- Replay subject will not
observable.subscribe([](int v) {
printf("2------%d\n", v);
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
}
rxcpp::observable<>::timer用于等待工作线程执行。
我们已经讨论了subject的三个变体。主要用例是通过使用observable接口来利用来自不同源的事件和数据,并允许一组订阅者使用利用的数据。subject可以同时作为一个可观察对象和一个观察者来处理数据流。behaviorsubject用于监视一段时间内属性或变量的更改。replaysubject将帮助您避免由于订阅中的延迟而丢失数据。synchronizesubject在其实现中内置了同步逻辑。
调度器
rxcpp库提供了一种声明式线程机制,这要归功于其打包的健壮调度子系统。从一个可观察到的对象来看,数据可以通过沿着变更传播图的不同路径流动。通过向流处理管道提供提示,我们可以在不同的线程、相同的线程或后台线程中调度执行。这有助于更好地捕捉程序员的意图。
rxcpp中的声明式调度模型是可能的,因为操作符实现中的流是不可变的。流操作符接受一个可观察值作为参数,并返回一个新的可观察值作为结果。不修改输入参数。这有助于无序执行。rxcpp的调度子系统包含以下构造:
- Scheduler
- Worker
- Coordination
- Coordinator
- Schedulable
- TimeLine
rxcpp的版本2借鉴了rxjava系统的调度体系结构。它依赖于rxjava使用的调度程序和worker习惯用法。下面是一些关于调度程序的重要事实:
- Scheduler有一个时间线。
- Scheduler可以在时间线中创建许多worker。
- Worker在时间线中拥有一个schedulable队列。
- schedulable拥有一个函数(称为action)并具有一个生命周期。
- Coordination为coordinator提供工厂功能,并有一个Scheduler。
- 每个coordinator都有一个Worker,并且是以下用途的工厂:
- 可协调的schedulable
- 可协调的可观测对象和订阅者
我们一直在我们的程序中使用rx调度程序,而不关心它们是如何工作的。让我们来编写一个玩具程序,它将帮助我们理解调度是如何工作的:
//------------- SchedulerOne.cpp
#include "rxcpp/rx.hpp"
int main()
{
//---------- Get a Coordination
auto coordination = rxcpp::serialize_new_thread();
//------- Create a Worker instance
auto worker = coordination.create_coordinator().get_worker();
auto sub_action = rxcpp::schedulers::make_action([]
(const rxcpp::schedulers::schedulable&)
{ printf("Action Executed in Thread # : %d\n",
std::this_thread::get_id()); });
auto scheduled = rxcpp::schedulers::make_schedulable(worker, sub_action);
scheduled.schedule();
printf("main Thread # : %d\n",
std::this_thread::get_id());
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
return 0;
}
在rxcpp中,所有以多个流作为输入或处理与时间有关任务的操作符都以协调函数作为参数。使用特定调度程序的一些协调功能如下:
identity_immediate()
identity_current_thread()
identity_same_worker(worker w)
serialize_event_loop()
serialize_new_thread()
serialize_same_worker(worker w)
observe_on_event_loop()
observe_on_new_thread()
在前面的程序中,我们手动调度了一个操作(实际上,它只是一个lambda)。让我们继续讨论调度程序的声明性方面。我们将编写一个程序,使用协调功能来安排任务:
//----------- SchedulerTwo.cpp
#include "rxcpp/rx.hpp"
int main()
{
auto coordination = rxcpp::identity_current_thread();
auto worker = coordination.create_coordinator().get_worker();
auto start = coordination.now() + std::chrono::milliseconds(1);
auto period = std::chrono::milliseconds(1);
auto values = rxcpp::observable<>::interval(start, period).
take(5).
replay(2, coordination);
worker.schedule([&](const rxcpp::schedulers::schedulable&) {
values.subscribe(
[](long v) {
printf("#1 -- %d : %ld\n",
std::this_thread::get_id(), v);
},
[]() {
printf("#1 --- OnCompleted\n"); });
});
worker.schedule([&](const rxcpp::schedulers::schedulable&) {
values.subscribe(
[](long v) {
printf("#2 -- %d : %ld\n",
std::this_thread::get_id(), v);
},
[]() {
printf("#2 --- OnCompleted\n"); });
});
// Start emitting
worker.schedule([&](const rxcpp::schedulers::schedulable&) {
values.connect();
});
// Add blocking subscription to see results
values.as_blocking().subscribe();
return 0;
}
我们使用重播机制创建了一个热观察对象来处理一些观察者的延迟订阅。我们还创建了一个worker来执行调度用于订阅,并将观察者与可观察对象连接起来。前面的程序演示了调度程序如何在rxcpp中工作。
observeon与subscribeon
observeon和subscribeon操作符的行为方式不同,这一直是响应式编程新手感到困惑的原因之一。observeon操作符更改下面操作符和观察者的线程。对于subscribeon,它还会影响位于其上下的操作符和方法。
observeon
指定一个观察者在哪个调度器上观察这个Observable
image.png
很多ReactiveX实现都使用调度器 "
Scheduler"来管理多线程环境中Observable的转场。你可以使用ObserveOn操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted, onError方法)。
image.png
注意:当遇到一个异常时ObserveOn会立即向前传递这个onError终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展示的。
SubscribeOn操作符的作用类似,但它是用于指定Observable本身在特定的调度器上执行,它同样会在那个调度器上给观察者发通知。
下面的程序演示了订阅和observeon操作符的行为方式对行为的细微变化。让我们编写一个程序,使用observeon操作符:
//-------- ObservableOnScheduler.cpp
#include "rxcpp/rx.hpp"
int main() {
//------- Print the main thread id
printf("Main Thread Id is %d\n",
std::this_thread::get_id());
//-------- We are using observe_on here
//-------- The Map will use the main thread
//-------- Subscribed lambda will use a new thread
rxcpp::observable<>::range(0, 15).
map([](int i) {
printf("Map %d : %d\n", std::this_thread::get_id(), i);
return i;
}).
take(5).observe_on(rxcpp::synchronize_new_thread()).
subscribe([&](int i) {
printf("Subs %d : %d\n", std::this_thread::get_id(), i);
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
return 0;
}
前面程序的输出清楚地显示map在主线程中工作,而subscribe方法在辅助线程中得到调度。这清楚地表明observeon只对它下面的操作符和订阅者有效。
subscribeon
指定Observable自身在哪个调度器上执行
image.png
很多ReactiveX实现都使用调度器 "Scheduler"来管理多线程环境中Observable的转场。你可以使用SubscribeOn操作符指定Observable在一个特定的调度器上运转。
//-------- SubscribeOnScheduler.cpp
#include "rxcpp/rx.hpp"
int main() {
//------- Print the main thread id
printf("Main Thread Id is %d\n",
std::this_thread::get_id());
//-------- We are using subscribe_on here
//-------- The Map and subscribed lambda will
//--------- use the secondary thread
rxcpp::observable<>::range(0, 15).
map([](int i) {
printf("Map %d : %d\n", std::this_thread::get_id(), i);
return i;
}).
take(5).subscribe_on(rxcpp::synchronize_new_thread()).
subscribe([&](int i) {
printf("Subs %d : %d\n", std::this_thread::get_id(), i);
});
//----------- Wait for Two Seconds
rxcpp::observable<>::timer(
std::chrono::milliseconds(2000)).
subscribe([&](long) {});
return 0;
}
前面程序的输出显示map和订阅方法都在辅助线程中工作。这清楚地表明subscribeon更改了项的线程行为。
runloop调度器
rxcpp库没有内置的主线程调度程序。最接近的方法是利用run_loop类来模拟主线程中的调度。在下面的程序中,可观察对象在后台线程中执行,订阅方法在主线程中运行。我们使用subscribe_on和observe_on来实现这个目标:
//------------- RunLoop.cpp
#include "rxcpp/rx.hpp"
int main()
{
//------------ Print the Main Thread Id
printf("Main Thread Id is %d\n",
std::this_thread::get_id());
//------- Instantiate a run_loop object
//------- which will loop in the main thread
rxcpp::schedulers::run_loop rlp;
//------ Create a coordination for run loop
auto main_thread = rxcpp::observe_on_run_loop(rlp);
auto worker_thread = rxcpp::synchronize_new_thread();
rxcpp::composite_subscription scr;
rxcpp::observable<>::range(0, 15).
map([](int i) {
//----- This will get executed in worker
printf("Map %d : %d\n", std::this_thread::get_id(), i);
return i;
}).
take(5).subscribe_on(worker_thread).
observe_on(main_thread).
subscribe(scr, [&](int i) {
//--- This will get executed in main thread
printf("Sub %d : %d\n", std::this_thread::get_id(), i);
});
//------------ Execute the Run Loop
while (scr.is_subscribed() || !rlp.empty()) {
while (!rlp.empty() && rlp.peek().when < rlp.now()) {
rlp.dispatch();
}
}
return 0;
}
我们可以看到map在工作线程中调度,订阅方法在主线程中执行。启用此功能是因为subscribe_on和observe_on方法的明智位置,我们在前一节中讨论了这两个方法。
操作符
操作符是一个作用于一个可观测对象的函数,它产生一个新的可观测对象。在此过程中,原始的可观测值没有发生突变,而是一个纯函数。在我们编写的示例程序中,我们已经介绍了许多操作符。在第9章,使用qt / c++的反应性gui编程中,我们将学习如何创建自定义操作符来处理可观察对象。操作符不改变可观察对象的事实是rx编程模型中声明式调度工作的原因之一。rx操作符可以分为以下几类:
创建操作符
转换操作符
过滤操作符
结合操作符
错误处理操作符
通用操作符
布尔操作符
数学运算符
有一些可用的操作符不属于这些类别。我们将在一个表中列出前面类别中的一些关键操作符,以供快速参考,见:rx operators
总结
在本章中,我们了解了rx编程模型的各个部分是如何组合在一起的。我们从可观测数据开始,很快就进入了冷热可观测数据的话题。然后,我们介绍了订阅机制及其使用。然后,我们继续讨论主题的重要主题,并了解了主题调度程序实现的一些变体。最后,我们对rxcpp系统中可用的各种操作符进行了分类。在下一章中,我们将学习如何使用这些知识来使用qt框架以一种反应性的方式编写gui程序。










网友评论