美文网首页
RxJava初探

RxJava初探

作者: kevinsEegets | 来源:发表于2019-12-09 16:19 被阅读0次

我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处

RxJava特性:

轻松的切换线程,流式的API以及有强大的操作符,这使得我们做异步操作时变的简单,不用像以前一样写各种的Handler来回调主线程.只需要一个操作符一行代码就可以搞定.

我所理解的RxJava的解释

之前也看过RxJava类的文章,但是一直都是一脸迷茫,观察者被观察者一大堆都搞晕了,后来偶然间看到一位大神的解释,瞬间明白了,他将RxJava的观察者和被观察者比喻为水管的上下游,我更愿意把这个概念理解为电话这头和那头.

有两部有线电话, 通过一根网线连接, A给B打电话, 我们可以把A理解为上游, B自然就是下游了, 他们之间通过电话线连接

开始学习

我们从最简单的RxJava使用入手,先上代码

        val observable = Observable.create(object :ObservableOnSubscribe<Int> {
            override fun subscribe(it: ObservableEmitter<Int>?) {
                it?.onNext(1)
                it?.onNext(2)
                it?.onComplete()
            }
        })
        val observer = object: Observer<Int> {
            override fun onComplete() {
                debugMsg("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                debugMsg("onSubscribe")
            }

            override fun onNext(value: Int?) {
                debugMsg("==",value)
            }

            override fun onError(e: Throwable?) {
                debugMsg("onError")
            }
        }
        observable.subscribe(observer)

根据电话的例子,我们看看上述代码

上游:

  • observable 可以看成是我们的上游
  • ObservableEmitter<Int> Emitter指发射器的意思,该类的解释就是被观察者发射器,也就是上游的发射器
  • it?.onNext(2) 上游发射器通过方法onNext()方法发射数据

下游

  • observer 当然就是下游了
  • onSubscribe() 下游开始接收数据就会触发
  • onNext() 下游正式接收上游数据
  • onError() 下游接收到上游发送的异常消息
  • onComplete(() 下游消息接收完成的标志

上下游连接

subscribe 上下游通过subscribe连接, 此处需要注意 上下游连接只能连接一次(通俗一点就是 subscribe 只使用一次)

我们逐行解释一下:

1: 我们创建一个上游(Observable.create), 然后通过 it.onNext(1) it.onNext(2) it.onComplete(2) 发送了三条消息 "1", "2"以及 "完成" 给下游

2: 同样我们必须定义一个接受者也就是下游 (object: Observer<Int>) ,用来接收上游发送的消息,因为我们上游发送的消息为Int类型, 所以我们此处接受下游的类型为Int

3: 通过 observable.subscribe(observer) 连接上下游

日志输出

D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete

根据日志,我们可以看出,当我们的上下游建立连接时,首先会执行 onSubscribe() ,接着通过onNext()接收到我们上游发送的数据 "1", "2", 数据全部接收完成后会执行 onComplete()

此处需要注意几个地方:

  • 上游不能发送多个onError,发送多个会导致程序崩溃
  • 上游发送多个onComplete,下游只接受一个
  • 上游发送的onComplete和onError必须是互斥
  • 下游接收到onComplete时再不会接受上游事件

我们用代码解释一下注意的几个地方

1: 我们在上游添加几个onComplete()

    override fun subscribe(it: ObservableEmitter<Int>?) {
                it?.onNext(1)
                it?.onNext(2)
                it?.onNext(3)
                it?.onComplete()
                it?.onComplete()
                it?.onComplete()
            }

        })
        val observer = object: Observer<Int> {
            override fun onComplete() {
                debugMsg("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                debugMsg("onSubscribe")
            }

            override fun onNext(value: Int?) {
                debugMsg("==",value)
            }

            override fun onError(e: Throwable?) {
                debugMsg("onError")
            }
        }
        observable.subscribe(observer)

日志输出

D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete

可以看到我们的onComplete只输出了一次

2: 上游只要发送了onComplete, 下游不会再接收后续的消息

 override fun subscribe(it: ObservableEmitter<Int>?) {
                it?.onNext(1)
                it?.onNext(2)
                it?.onNext(3)
                it?.onComplete()
                it?.onNext(4)
            }
        })
        val observer = object: Observer<Int> {
            override fun onComplete() {
                debugMsg("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                debugMsg("onSubscribe")
            }

            override fun onNext(value: Int?) {
                debugMsg("==",value)
            }

            override fun onError(e: Throwable?) {
                debugMsg("onError")
            }
        }
        observable.subscribe(observer)

日志输出

D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete

3: 上游发送的onComplete和onError必须是互斥 以及发送多个onError报错

val observable = Observable.create(object :ObservableOnSubscribe<Int> {
            override fun subscribe(it: ObservableEmitter<Int>?) {
                it?.onNext(1)
                it?.onNext(2)
                it?.onNext(3)
                it?.onComplete()
                it?.onError(Throwable())
            }
        })
        val observer = object: Observer<Int> {
            override fun onComplete() {
                debugMsg("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                debugMsg("onSubscribe")
            }

            override fun onNext(value: Int?) {
                debugMsg("==",value)
            }

            override fun onError(e: Throwable?) {
                debugMsg("onError")
            }
        }
        observable.subscribe(observer)

日志输出

D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onSubscribe
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 1
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 2
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->== 3
D/com.eegegts: .MainActivity$loadRxjava1$observer$1--->onComplete
W/System.err:     at com.eegets.rxjava.MainActivity$loadRxjava1$observable$1.subscribe(MainActivity.kt:76)
W/System.err:     at com.eegets.rxjava.MainActivity.loadRxjava1(MainActivity.kt:97)
W/System.err:     at com.eegets.rxjava.MainActivity$onCreate$1.onClick(MainActivity.kt:32)

如上log日志输出,程序崩溃

在最开始我们说过流式 API, 我们可以改改如上代码,让它看起来更符合RxJava的定义方式

 Observable.create(ObservableOnSubscribe<Int> {
            it?.onNext(1)
            it?.onNext(2)
            it?.onNext(3)
            it?.onComplete()
        }).subscribe(object: Observer<Int> {
            override fun onComplete() {
                debugMsg("onComplete")
            }

            override fun onSubscribe(d: Disposable?) {
                debugMsg("onSubscribe")
            }

            override fun onNext(value: Int?) {
                debugMsg("==",value)
            }

            override fun onError(e: Throwable?) {
                debugMsg("onError")
            }
        })

本节咱们看了如何用RxJava发送简单的消息,后续咱们还有看很多干货, 下一章 Consumer 走起

相关文章

  • Retrofit2.0+Rxjava初探

    Retrofit2.0+Rxjava初探 1.WebService 2.初始化配置 3.结合Rxjava

  • RxJava2源码初探-整体设计

    RxJava2源码初探-整体设计 首先简单介绍Rxjava2 的四个基本的概念 Observable (可观察者,...

  • RxJava初探

    RxJava RxJava – Reactive Extensions for the JVM – a libra...

  • RxJava 初探

    前言 去年无意间知道了RxJava这个东东,但一直没时间去看看。最近,终于有了不少时间,经过两周的学习,对RxJa...

  • RxJava 初探

    入门教程推荐 **RxJava、Retrofit **http://www.jianshu.com/p/19cac...

  • RxJava初探

    转载自:http://codethink.me/2015/05/09/intro-of-rxjava/ 1.前言 ...

  • RxJava 初探

    0.前言 本文主要记录RxJava的初步用法,以对RxJava有一个直观的感受。官网在这里,RxJava项目地址在...

  • RxJAVA初探

    参考博主 扔物线 的给 Android 开发者的 RxJava 详解,可点击学习。 原文链接:RxJAVA git...

  • RxJava初探

    我们在学习RxJava之前要了解一下,为什么使用RxJava, 使用RxJava有什么好处 RxJava特性: 轻...

  • RxJava2.0源码初探

    RxJava2.0源码初探 RxJava2.0的源码相对于1.0发生了很大的变化, 命名方式也发生了很大变化, 下...

网友评论

      本文标题:RxJava初探

      本文链接:https://www.haomeiwen.com/subject/qvtigctx.html