美文网首页RxJava
RxJava<第五篇>:5种被观察者的创建

RxJava<第五篇>:5种被观察者的创建

作者: NoBugException | 来源:发表于2019-03-13 16:30 被阅读1次

5种被观察者分别是:Observable,Flowable, Single, Completable, Maybe。
五种被观察者可通过toObservable,toFlowable,toSingle,toCompletable,toMaybe相互转换。

(1)Observable

  • Observable即被观察者,决定什么时候触发事件以及触发怎样的事件。
  • Oberver即观察者,他可以在不同的线程中执行任务,极大的简化了并发操作,因为他创建了一个处于待命状态的观察者,可以在某一时刻响应Observable的通知,而不会造成阻塞。
  • ObservableEmitter数据发射器,发射Observable的onNext,onError,onComplete,onSubscribe方法。
  • subscribe() 订阅Observable的四个方法,只有调用此方法才会开始发射数据。其有4个构造方法:
subscribe(onNext()) 
subscribe(onNext(),onError()) 
subscribe(onNext(),onError(),onComplete())     
subscribe(onNext(),onError(),onComplete(),onSubscribe())

具体实现前几篇已经说明了,本篇就不介绍了。

(2)Flowable

可以看成是Observable的实现,只有Flowable支持压背

  • Observable:
    一般处理不超过1000条数据,几乎不会造成内存溢出
    不会背压
    处理同步流
  • Flowable:
    处理超过10KB的数据元素
    文件读取与分析
    读取数据库
    处理网络I/O流
    创建一个响应式的非阻塞接口

压背的实现会在后续章节中讲解。

(3)Single

只有onSuccess和onError回调,Single只会发射一次数据
具体实现如下:

    Single.create(new SingleOnSubscribe<CountBean>() {
        @Override
        public void subscribe(SingleEmitter<CountBean> e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onSuccess(countBean);
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new Consumer<CountBean>() {
        @Override
        public void accept(CountBean countBean) throws Exception {
            System.out.println("count:" + countBean.getCount());
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println("exception:" + throwable.getMessage());
        }
    });
    Single.create(new SingleOnSubscribe<CountBean>() {
        @Override
        public void subscribe(SingleEmitter<CountBean> e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onSuccess(countBean);
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new SingleObserver<CountBean>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("被观察者和观察者开始连接!");
        }

        @Override
        public void onSuccess(CountBean countBean) {
            System.out.println("count:"+countBean.getCount());
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("exception:"+e.getMessage());
        }
    });

(4)Completable

只有onComplete和onError事件, 和Single不同, Completable不发射数据。

具体实现如下:

    Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onComplete();
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new Action() {
        @Override
        public void run() throws Exception {

        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            
        }
    });
    Completable.create(new CompletableOnSubscribe() {
        @Override
        public void subscribe(CompletableEmitter e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(0);
                if(countBean.getCount() == 1){
                    e.onComplete();
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new CompletableObserver() {
        @Override
        public void onSubscribe(Disposable d) {
            
        }

        @Override
        public void onComplete() {

        }

        @Override
        public void onError(Throwable e) {

        }
    });

(5)Maybe

没有onNext方法,同样需要onSuccess发射数据,且只能发射0或1个数据,多发也不再处理。

具体实现如下:

    Maybe.create(new MaybeOnSubscribe<String>() {

        @Override
        public void subscribe(MaybeEmitter<String> e) throws Exception {
            if(!e.isDisposed()){
                CountBean countBean = new CountBean();
                countBean.setCount(1);
                if(countBean.getCount() == 1){
                    e.onSuccess("aaaaa");
                }else if(countBean.getCount() == 0){
                    e.onComplete();
                }else{
                    e.onError(new Throwable("nullpoint exception"));
                }
            }
        }
    }).subscribe(new MaybeObserver<String>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(String s) {
            System.out.println("onSuccess:"+s);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("onError");
        }

        @Override
        public void onComplete() {
            System.out.println("onComplete");
        }
    });

相关文章

  • RxJava<第五篇>:5种被观察者的创建

    5种被观察者分别是:Observable,Flowable, Single, Completable, Maybe...

  • 手写简单Rxjava理解其内部实现(一)

    首先创建原始被观察者及观察者接口 创建抽象的被观察者 创建数据发送者接口 创建数据发送者与被观察者建立联系的接口 ...

  • 一、初识rxjava

    1、创建被观察者Observable 2、创建观察者 3、被观察者订阅观察者 Fun1接口/*** Represe...

  • RXJava2简单使用

    1.添加依赖 2.创建观察者和被观察者2.1创建获取观察者和被观察者的方法 2.2初始化观察者和被观察者,并进行订...

  • RxJava常用操作符

    创建unsafeCreate(create)创建一个Observable(被观察者),当被观察者(Observer...

  • RxJava常用操作符

    创建unfaseCreate(create)创建一个Observable(被观察者),当被观察者(Observer...

  • RxJava2.0学习笔记

    第一步:创建被观察者(observable) 第二步 创建观察者(observer) 被观察者与观察者建立联系ob...

  • Rxjava2的操作符 一

    一 创建操作符 1.1 创建被观察者 表示创建一个被观察者,其中 e.onNext("Hello Observe...

  • RXJava

    传统的观察者模式 RxJava 四个要素 被观察者 观察者 订阅 事件 创建被观察者subscriber就是观察者...

  • RxAndroid

    使用 1.简单使用(类似Rx1) 创建被观察者: 创建观察者: 将观察者绑定到被观察者: 把代码连起来就是链式操作...

网友评论

    本文标题:RxJava<第五篇>:5种被观察者的创建

    本文链接:https://www.haomeiwen.com/subject/mrctmqtx.html