美文网首页Swift学习
RXSwift - 合并Observable (concat和m

RXSwift - 合并Observable (concat和m

作者: 内蒙小猿 | 来源:发表于2020-08-20 12:44 被阅读0次

一、串行 (concat)

let queueA = PublishSubject<String>()
let queueB = PublishSubject<String>()
let queueC = PublishSubject<String>()


enum E:Error {
    case demoAError
    case demoBError
}

Concat 串行合并多个事件,处理完 queneA 才会处理 queueB

1、结合方式

​ 1.1、 let sequece = Observable.concat([queueA.asObserver(), queueB.asObserver()])

​ 1.2、let sequece = queueA.concat(queueB)

2、生命周期

2.1、合成后observable,在所有 Sub Observable 都正常结束了,合成后的observable 才会正常结束。

        let sequece = Observable.concat([queueA.asObserver(), queueB.asObserver()])
        _ = sequece.subscribe(onNext: { (Str) in
            print(Str)
        }, onError: nil, onCompleted: {
            print("Completed")
        }, onDisposed: {
            print("Disposed")
        })
        
        queueA.onNext("A1")
        queueA.onCompleted()
        queueB.onNext("B1")
        queueB.onCompleted()

​ 2.1.1、\color{#DEB887} {执行结果: A1 -> B1 -> onCompleted -> Disposed}

2.2、只要当前只在订阅的 Sub Observable 发生 Error, 合成后的 Observable 也会发生 Error, 离开作用域会被 Dispose 。 但是不会执行 onCompleted

        queueA.onNext("A1")
        queueA.onError(E.demoAError)
        queueA.onCompleted()
        queueB.onNext("B1")
        queueB.onCompleted()
        

​ 2.2.1、\color{#DEB887} {执行结果: A1-> demoAError -> Disposed}

2.3、在 "排队中" 的 Sub observable 发生 Error, 并不影响定于中的observable。不会执行 onCompletd, 但是回执 Error 回调

        queueA.onNext("A1")
        queueA.onNext("A2")
        queueB.onNext("B1")
        queueB.onError(E.demoBError)
        queueA.onNext("A3")
        queueA.onCompleted()
        queueB.onCompleted()

​ 2.3.1、 \color{#DEB887} {执行结果:A1-> A -> 2 -> A3 -> onCompleted -> Disposed}

并行(merge)

并行合并多个事件

1、结合方式

1.1 未指定订阅数量

let sequence = Observable.of(queueA.asObserver(),queueB.asObserver()).merge()

1.2 指定订阅服务

let sequence = Observable.of(queueA.asObserver(),queueB.asObserver()).merge(maxConcurrent: 2)

2、生命周期

2.1 合成后的Observable, 在所有的 Sub observalbe 都正常结束了, 合成后的 Observable 才会正常结束

 _ = Observable.of(queueA.asObserver(),queueB.asObserver()).merge(maxConcurrent: 2).subscribe(onNext: { (Str) in
            print(Str)
        }, onError: nil, onCompleted: {
            print("onCompleted")
        }) {
            print("onDisposed")
        }
        
        queueA.onNext("A1")
        queueB.onNext("B1")
        queueA.onNext("A2")
        queueB.onNext("B2")
        queueA.onCompleted()
        queueB.onCompleted()
        

​ 2.1.1、\color{#DEB887} {执行结果: A1 -> B1 -> A2 -> B2 -> onCompleted -> Disposed}

2.2 合成后的observable, 在所有的 Sub Observable 中只要一个发生Error, 就会指定Error, 并在离开作用域时释放, 并不会执行 onCompleted 方法。

                queueA.onNext("A1")
        queueB.onNext("B1")
        queueA.onNext("A2")
        queueA.onError(E.demoAError)
        queueB.onNext("B2")
        queueA.onCompleted()
        queueB.onCompleted()

​ 2.2.1、 \color{#DEB887} {执行结果:A1 -> B1 -> A2 -> demoAError -> Disposed}

3、指定同时订阅数量 maxconcurrent

3.1、指定的 maxconcurrent 为 1, 下一个事件也需要等当前订阅 completed 完成后才能订阅

        _ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 1).subscribe(onNext: { (Str) in
            print(Str)
        }, onError: nil, onCompleted: {
            print("onCompleted")
        }) {
            print("onDisposed")
        }
        
        queueA.onNext("A1")
        queueA.onCompleted()
        
        queueB.onNext("B1")
        queueB.onCompleted()
        
        queueC.onNext("C1")
        queueC.onCompleted()

​ 3.1.1、 \color{#DEB887} {执行结果:A1 -> B1 -> C1 -> onCompleted -> Disposed}

3.2、maxconcurrent 为1 和 concat 的是效果是一样的

// merge

        _ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 1).subscribe(onNext: { (Str) in
            print(Str)
        }, onError: nil, onCompleted: {
            print("onCompleted")
        }) {
            print("onDisposed")
        }
           
        queueA.onNext("A1")
        queueB.onNext("B1")
        queueA.onCompleted()
        
        queueB.onNext("B2")
        queueB.onCompleted()
        
        queueC.onNext("C1")
        queueC.onCompleted()

// concat

        _ = Observable.concat([queueA.asObserver(),queueB.asObserver(),queueC.asObserver()]).subscribe(onNext: { (str) in
             print(str)
        }, onError: nil, onCompleted: {
            print("onCompleted")
        }, onDisposed: {
            print("onDisposed")
        })
           
        queueA.onNext("A1")
        queueB.onNext("B1")
        queueA.onCompleted()
        
        queueB.onNext("B2")
        queueB.onCompleted()
        
        queueC.onNext("C1")
        queueC.onCompleted()

​ 3.2.1、 \color{#DEB887} {执行结果都是: A1-> B2 -> C1 -> onCompleted -> onDisposed ,B2 都收不到,因为B2 开始时,A1还没结束。}

3.3 指定maxconcurrent 为 2

_ = Observable.of(queueA.asObserver(),queueB.asObserver(),queueC.asObserver()).merge(maxConcurrent: 2).subscribe(onNext: { (Str) in
            print(Str)
        }, onError: nil, onCompleted: {
            print("onCompleted")
        }) {
            print("onDisposed")
        }
        
        queueA.onNext("A1")
        queueA.onNext("A2")
        queueB.onNext("B1")
        queueB.onNext("B2")
        queueA.onCompleted()
        queueB.onCompleted()
        queueC.onNext("C1")
        queueC.onNext("C2")
        queueC.onCompleted()

​ 3.3.1、 \color{#DEB887} {执行结果:A1 -> A2 -> B1 -> B2 -> C1 -> C2 -> onCompleted -> Disposed}

相关文章

网友评论

    本文标题:RXSwift - 合并Observable (concat和m

    本文链接:https://www.haomeiwen.com/subject/whldjktx.html