上文RxSwift-deallocating,deallocated源码解析,我们提到deallocating序列和takeUntil序列经常结合起来使用,本文将分析takeUntil,探索一下takeUntil内部是如何接收deallocating发送的响应非常有必要。
takeUntil源码解析
public func takeUntil<Source: ObservableType>(_ other: Source)
-> Observable<Element> {
return TakeUntil(source: self.asObservable(), other: other.asObservable())
}
返回的是一个TakeUntil序列,TakeUntil对象依然继承自我们熟悉的Producer。
final private class TakeUntil<Element, Other>: Producer<Element> {
fileprivate let _source: Observable<Element>
fileprivate let _other: Observable<Other>
init(source: Observable<Element>, other: Observable<Other>) {
self._source = source
self._other = other
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = TakeUntilSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
TakeUntil序列保存了原始序列_source(上文的例子中,_source保存的是vc.publicOB序列)以及序列_other(上文的例子中,_other保存的就是vc.rx.deallocating序列)
外界的subscribe的是TakeUntil序列。TakeUntil序列被订阅时,会执行TakeUntil.run,TakeUntil.run调用TakeUntilSink.run
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
TakeUntilSink保存TakeUntil到_parent属性.
func run() -> Disposable {
let otherObserver = TakeUntilSinkOther(parent: self)
let otherSubscription = self._parent._other.subscribe(otherObserver)
otherObserver._subscription.setDisposable(otherSubscription)
let sourceSubscription = self._parent._source.subscribe(self)
return Disposables.create(sourceSubscription, otherObserver._subscription)
}
TakeUntilSink.run中,
1.创建观察者TakeUntilSinkOther, TakeUntilSinkOther的_parent保存TakeUntilSink
2.self._parent._other.subscribe(otherObserver)订阅序列,这个序列就是上文的例子的vc.rx.deallocating序列。
RxSwift-deallocating,deallocated源码解析中,我们提到当对象在dealloc时,会向DeallocatingProxy中存储ReplaySubject序列(就是vc.rx.deallocating序列)发送响应,会来到这里的观察者TakeUntilSinkOther.on(.next())
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next:
self._parent.forwardOn(.completed)
self._parent.dispose()
case .error(let e):
self._parent.forwardOn(.error(e))
self._parent.dispose()
case .completed:
self._subscription.dispose()
}
}
TakeUntilSinkOther.on(.next())----->Sink.forwardOn(.completed)----->TakeUntilSink.observer.(.completed).
最终会给TakeUntil序列的observer发送.completed信号,序列完成并销毁。
总结
1.takeUntil是一个中间层,在takeUntil被订阅的流程中,中间层takeUntil会订阅rx.deallocating()序列.
2.当对象被销毁,对rx.deallocating()发送响应时,会调用到观察者的TakeUntilSinkOther.on(.next()),最终会给TakeUntil序列的observer发送.completed信号,序列完成并销毁。
TakeUntil.png










网友评论