美文网首页
RxSwift高阶函数merge解读

RxSwift高阶函数merge解读

作者: silasjs | 来源:发表于2019-08-13 10:58 被阅读0次

RxSwift高阶函数merge解读

通过使用 merge 操作符你可以将多个 Observables 合并成一个,当某一个 Observable 发出一个元素时,他就将这个元素发出。

如果,某一个 Observable 发出一个 onError 事件,那么被合并的 Observable 也会将它发出,并且立即终止序列。


merge.jpg

demo

这只是个最简单的 merge。

let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()

Observable.of(subject1, subject2)
    .merge()
    .subscribe(onNext: { print("订阅到:\($0)") })
    .disposed(by: disposeBag)

subject1.onNext("🅰️")
subject1.onNext("🅱️")
subject2.onNext("①")
subject2.onNext("②")
subject1.onNext("🆎")
subject2.onNext("③")

输出:
订阅到:🅰️
订阅到:🅱️
订阅到:①
订阅到:②
订阅到:🆎
订阅到:③

解析

首先看下merge函数:

extension ObservableType where Element : ObservableConvertibleType {
    public func merge() -> Observable<Element.Element> {
        return Merge(source: self.asObservable())
    }
}

返回了一个Merge序列对象:

final class Merge<SourceSequence: ObservableConvertibleType> : Producer<SourceSequence.Element> {
    private let _source: Observable<SourceSequence>

    init(source: Observable<SourceSequence>) {
        self._source = source
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
        let sink = MergeBasicSink<SourceSequence, Observer>(observer: observer, cancel: cancel)
        let subscription = sink.run(self._source)
        return (sink: sink, subscription: subscription)
    }
}

Merge序列被订阅后,会走MergeBasicSinkMergeSink的子类)的run函数。

fileprivate final class MergeBasicSink<Source: ObservableConvertibleType, Observer: ObserverType> : MergeSink<Source, Source, Observer> where Observer.Element == Source.Element {
    override func performMap(_ element: Source) throws -> Source {
        return element
    }
}

MergeBasicSink中只实现了一个performMap,还要看它的父类。

private class MergeSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType>
    : Sink<Observer>
    , ObserverType where Observer.Element == SourceSequence.Element {

    override init(observer: Observer, cancel: Cancelable) {
        super.init(observer: observer, cancel: cancel)
    }

    @inline(__always)
    final private func nextElementArrived(element: SourceElement) -> SourceSequence? {
        self._lock.lock(); defer { self._lock.unlock() } 
        if !self.subscribeNext {
            return nil
        }

        do {
            let value = try self.performMap(element)
            self._activeCount += 1
            return value
        }
        catch let e {
            self.forwardOn(.error(e))
            self.dispose()
            return nil
        }
    }
    
    func on(_ event: Event<SourceElement>) {
        switch event {
        case .next(let element):
            if let value = self.nextElementArrived(element: element) {
                self.subscribeInner(value.asObservable())
            }
        case .error(let error):
            self._lock.lock(); defer { self._lock.unlock() }
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self._lock.lock(); defer { self._lock.unlock() }
            self._stopped = true
            self._sourceSubscription.dispose()
            self.checkCompleted()
        }
    }

    func subscribeInner(_ source: Observable<Observer.Element>) {
        let iterDisposable = SingleAssignmentDisposable()
        if let disposeKey = self._group.insert(iterDisposable) {
            let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
            let subscription = source.subscribe(iter)
            iterDisposable.setDisposable(subscription)
        }
    }
    
    func run(_ source: Observable<SourceElement>) -> Disposable {
        _ = self._group.insert(self._sourceSubscription)

        let subscription = source.subscribe(self)
        self._sourceSubscription.setDisposable(subscription)
        
        return self._group
    }
}

MergeSinkrun中,source.subscribe(self):源序列开始订阅,MergeSink作为观察者。这里的源序列就是外界的of序列,那么of序列的响应必然会来到MergeSinkon函数中。

MergeSinkon函数中,在.next类型下,调用nextElementArrived,这个函数在这里只做了个计数self._activeCount += 1,为了管理序列的生命周期。因为子类对performMap的实现就是直接返回了element

现在是 of 序列的响应,这里的element就是 of 序列中的两个元素( PublishSubject 序列)。

接着又调用了subscribeInner把 PublishSubject 序列传过去。这里才是 merge 用来合并的重点。这里用了MergeSinkIter类,用 PublishSubject 作为源序列去订阅,把 MergeSinkIter 作为它的观察者。这样,等 PublishSubject 有响应的时候就会通知到 MergeSinkIter 的 on 函数中。

fileprivate final class MergeSinkIter<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : ObserverType where Observer.Element == SourceSequence.Element {
    typealias Parent = MergeSink<SourceElement, SourceSequence, Observer>

    init(parent: Parent, disposeKey: DisposeKey) {
        self._parent = parent
        self._disposeKey = disposeKey
    }
    
    func on(_ event: Event<Element>) {
        self._parent._lock.lock(); defer { self._parent._lock.unlock() } 
        switch event {
        case .next(let value):
            self._parent.forwardOn(.next(value))
        case .error(let error):
            self._parent.forwardOn(.error(error))
            self._parent.dispose()
        case .completed:
            self._parent._group.remove(for: self._disposeKey)
            self._parent._activeCount -= 1
            self._parent.checkCompleted()
        }
    }
}

MergeSinkIter 的 on 里面直接让它的 _parent(MergeSink)调用forwardOn,去让 MergeSink 的观察者响应外界的订阅。这样的话,of 序列中的任意元素( PublishSubject 序列)发出的信号都可以被 merge 序列的订阅者接收到。

总结:
merge 序列作为 of 序列的观察者,监听着 of 序列中的元素(PublishSubject)。
把 MergeSinkIter 作为所有序列元素(PublishSubject)的观察者,监听 PublishSubject 的变动,交给 merge 处理。
merge 直接通过自己的观察者响应外界的订阅。

相关文章

网友评论

      本文标题:RxSwift高阶函数merge解读

      本文链接:https://www.haomeiwen.com/subject/julvjctx.html