美文网首页
RxSwift - 核心逻辑 序列的创建与订阅

RxSwift - 核心逻辑 序列的创建与订阅

作者: 恍然如梦_b700 | 来源:发表于2021-04-21 01:58 被阅读0次

今天梳理一下RxSwift核心逻辑
想要理解RxSwift核心逻辑首先要熟悉swift语言的基本用法
RxSwift使用函数式编程思想,一些基本使用我这里就不赘述了,大家可以看一下github上的Demo,仿写一下
我们以下面的代码来展开

//创建序列
       let ob = Observable<String>.create { (obserber) -> Disposable in
            obserber.onNext("帅的一匹")
            obserber.onCompleted()
            return Disposables.create()
        }
        //订阅
        let _ = ob.subscribe(onNext: { (text) in
            print("订阅到:\(text)")
        }, onError: { (error) in
            print("error: \(error)")
        }, onCompleted: {
            print("完成")
        }) {
            print("销毁")
        }

首先进入到create方法里

/**
         Creates an observable sequence from a specified subscribe method implementation.
    
         - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
    
         - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
         - returns: The observable sequence with the specified implementation for the `subscribe` method.
         */
    public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>

继续点进定义,发现点不进去,那我们就看一下注释被。从指定的订阅方法实现中创建一个可观察序列。http://reactivex.io/documentation/operators/create.html这里就是通过一层一层的路由找到create文件

image.png

返回一个AnonymousObservable对象


image.png

逃逸闭包不理解可以翻看我之前的闭包的相关文章
这里AnonymousObservable包存了闭包也就是我们create传入的闭包,先做一个标记

AnonymousObservable 的继承关系是什么样的呢
AnonymousObservable<Element> 继承自 Producer<Element> ,Produce继承自 Observable<Element> , Observable 遵循 ObservableType : ObservableConvertibleType协议
我们从ObservableConvertibleType往下捋

public protocol ObservableConvertibleType {
    /// Type of elements in sequence.
    associatedtype E

    /// Converts `self` to `Observable` sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    func asObservable() -> Observable<E>
}

ObservableConvertibleType 首先有个关联类型E,然后有个函数asObservable声明
ObservableType协议:

public protocol ObservableType : ObservableConvertibleType {
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
extension ObservableType {
    
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<E> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}

ObservableType声明了一个函数subscribe
asObservable默认实现,这就有点类似于 as 的用法,也就意味着任何遵循这个协议的对象或者结构体都可以转换为Observable<E>

create做的事情也就是保存了闭包,那么我们再看看对可观察序列subscribe时都做了什么:

public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }

首先创建了一个AnonymousObserver<E>类型的对象observer,这个泛型E也就是示例代码 let ob = Observable<String>.create...中的 String类型,我们看最后retrun, self.asObservable().subscribe(observer)中做了什么,此时self是AnonymousObservable类型的对象,所以找AnonymousObservable中的subscribe函数,但是里面并没有实现,而是在父类Producer中实现了

class Producer<Element> : Observable<Element> {
    override init() {
        super.init()
    }

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
        }
    }

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

为什么这么做呢,其实也可以理解,父类这里处于一个OB视角,拥有统一的订阅能力,线程调度能力,销毁能力。具有公平原则。
调用self.run(observer, cancel: disposer)就会来到AnonymousObservable的run函数

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

AnonymousObservableSink是一个管子,看看他的结构(非常重要)


image.png
image.png

我们看下面这张图,这个管理既包含了观察者,又包含了销毁者,同时sink.run将self传了进去,self也就是当前的可观察序列,所以这个管子包含了就将观察者和序列关联了起来
sink : 订阅者 + 销毁者 + 序列 + 调度环境,流通序列与订阅者,处理业务逻辑。


image.png

我们再看sink.run


image.png image.png

可以看到,AnyObserver保存了可观察序列的on函数


image.png

从上面代码可以看出,当实例代码调用onNext的时候就会来到管子里的on函数:


image.png
又会调到sink的forwardOn
image.png
又会调用_observer.on, 前面我们说过 _observer不就是之前传进来的let observer = AnonymousObserver<E>这个东西吗?
image.png

通过模式匹配最终调用到我们传进来的闭包onNext,将value传递给外部


image.png

我们再来看看subscribe(event)的时候又是什么流程呢?流程也差不多,我把调用流程标注出来,你可以按照这个流程调试一下

 public func subscribe(_ on: @escaping (Event<E>) -> Void)
        -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
    }
image.png image.png image.png image.png
image.png

所以这一系列流程并不是我们想像中的那么简单,其实如果看的框架比较多了,你自然会发现很多框架的思维都是类似的,所以好的框架是值得我们去好好研究与借鉴的。

相关文章

网友评论

      本文标题:RxSwift - 核心逻辑 序列的创建与订阅

      本文链接:https://www.haomeiwen.com/subject/qrtalltx.html