在前面的篇章中,多次遇见Scheduler,这是一个调度者,但是都没详细说,因为想在这篇章中详细说说。
-
Scheduler其实就是封装了一套GCD,在一些函数源码里已经使用了,我们也可以主动调用这个Scheduler。主要的有:MainScheduler、SerialDispatchQueueScheduler、ConcurrentDispatchQueueScheduler、OperationQueueScheduler。
我们看看下面的例子:
Observable.of(1, 2, 3)
.observeOn(MainScheduler.instance) //主线程
.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposeBag)
Observable.of(1, 2, 3)
.observeOn(SerialDispatchQueueScheduler.init(internalSerialQueueName: "obOnSerial")) //串行
.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposeBag)
Observable.of(1, 2, 3)
.observeOn(ConcurrentDispatchQueueScheduler.init(qos: .background)) //并行
.subscribe(onNext: { (num) in
print(num)
}).disposed(by: disposeBag)
- 我们从调度者的初始化入手,前3个都是继承了
SchedulerType协议,SchedulerType又继承了ImmediateSchedulerType:
(1)MainScheduler
-
MainScheduler的本质其实就是SerialDispatchQueueScheduler,可以看到初始化时,默认保存了DispatchQueue.main队列:
public final class MainScheduler : SerialDispatchQueueScheduler {
...
public init() {
self._mainQueue = DispatchQueue.main
super.init(serialQueue: self._mainQueue)
}
public static let instance = MainScheduler()
...
}
(2)SerialDispatchQueueScheduler
public class SerialDispatchQueueScheduler : SchedulerType {
...
public convenience init(qos: DispatchQoS, internalSerialQueueName: String = "rx.global_dispatch_queue.serial", leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue.global(qos: qos.qosClass),
internalSerialQueueName: internalSerialQueueName,
leeway: leeway)
}
...
}
(3)ConcurrentDispatchQueueScheduler
ConcurrentDispatchQueueScheduler和SerialDispatchQueueScheduler的思路其实是一样的:
public class ConcurrentDispatchQueueScheduler: SchedulerType {
...
public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.init(queue: DispatchQueue(
label: "rxswift.queue.\(qos)",
qos: qos,
attributes: [DispatchQueue.Attributes.concurrent],
target: nil),
leeway: leeway
)
}
...
}
(4)OperationQueueScheduler
OperationQueueScheduler封装了NSOperationQueue:
public class OperationQueueScheduler: ImmediateSchedulerType {
...
public init(operationQueue: OperationQueue, queuePriority: Operation.QueuePriority = .normal) {
self.operationQueue = operationQueue
self.queuePriority = queuePriority
}
...
}
- 我们走
MainScheduler的流程,然后我们来看看下一步的函数observeOn,这里会判断是否为串行队列:
extension ObservableType {
...
public func observeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
if let scheduler = scheduler as? SerialDispatchQueueScheduler {
return ObserveOnSerialDispatchQueue(source: self.asObservable(), scheduler: scheduler)
}
else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
}
}
- 判断自然是
SerialDispatchQueueScheduler,然后返回了ObserveOnSerialDispatchQueue序列(ObserveOnSerialDispatchQueue继承了Producer)。ObserveOnSerialDispatchQueue保存了scheduler调度者和source源序列:
final private class ObserveOnSerialDispatchQueue<Element>: Producer<Element> {
let scheduler: SerialDispatchQueueScheduler
let source: Observable<Element>
init(source: Observable<Element>, scheduler: SerialDispatchQueueScheduler) {
self.scheduler = scheduler
self.source = source
...
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSerialDispatchQueueSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
...
}
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
...
init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
self.scheduler = scheduler
self.observer = observer
self.cancel = cancel
super.init()
//默认初始化cachedScheduleLambda
self.cachedScheduleLambda = { pair in
...
}
}
...
}
根据RxSwift核心逻辑,来到ObserveOnSerialDispatchQueue的run函数,run函数创建了ObserveOnSerialDispatchQueueSink(业务下沉),ObserveOnSerialDispatchQueueSink创建时默认初始化了self.cachedScheduleLambda。然后由保存的self.source源序列进行subscribe。
- 继续根据核心逻辑就会来到
ObserveOnSerialDispatchQueueSink的onCore函数:
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
...
override func onCore(_ event: Event<Element>) {
_ = self.scheduler.schedule((self, event), action: self.cachedScheduleLambda!)
}
...
}
- 在这里保存的
self.scheduler调度者开始执行操作。
- 接着一步步走到
scheduleInternal函数:
public class SerialDispatchQueueScheduler : SchedulerType {
...
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
//不是它
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
...
}
- 在这里要注意,调用的是
MainScheduler的scheduleInternal函数,千万不要被骗了:
public final class MainScheduler : SerialDispatchQueueScheduler {
...
override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let previousNumberEnqueued = increment(self.numberEnqueued)
if DispatchQueue.isMain && previousNumberEnqueued == 0 {
let disposable = action(state)
decrement(self.numberEnqueued)
return disposable
}
let cancel = SingleAssignmentDisposable()
self._mainQueue.async {
if !cancel.isDisposed {
_ = action(state)
}
decrement(self.numberEnqueued)
}
return cancel
}
}
- 但是不管哪个调度者,最后都会在相应的队列调用传过来的
action:
MainScheduler
DispatchQueueConfiguration
而action便是ObserveOnSerialDispatchQueueSink初始化保存的self.cachedScheduleLambda闭包:
final private class ObserveOnSerialDispatchQueueSink<Observer: ObserverType>: ObserverBase<Observer.Element> {
...
init(scheduler: SerialDispatchQueueScheduler, observer: Observer, cancel: Cancelable) {
self.scheduler = scheduler
self.observer = observer
self.cancel = cancel
super.init()
self.cachedScheduleLambda = { pair in
guard !cancel.isDisposed else { return Disposables.create() }
pair.sink.observer.on(pair.event)
if pair.event.isStopEvent {
pair.sink.dispose()
}
return Disposables.create()
}
}
...
}
- 最终调用了
pair.sink.observer.on(pair.event),这语法让人看得有点懵,慢慢一一对应:
pair原来是元组,pair.sink.observer就是保存的self.observer,所以
pair.sink.observer.on=>
self.observer.on=>
AnonymousObserver.on。 - 最后的最后根据RxSwift核心逻辑,便会调用外面的响应闭包:
.subscribe(onNext: { (num) in
print(num)
})
其实调度者封装的GCD就是这么简单,非常好的帮我们控制在相应的队列执行任务。










网友评论