part05_Rxjava操作符

作者: IT魔幻师 | 来源:发表于2018-08-30 20:08 被阅读9次

作者:IT魔幻师
博客:www.huyingzi.top
转载请注明出处:https://www.jianshu.com/p/afeba5aea533


一、创建型操作符

主要用于创建被观察者

  • just
    create的快捷创建操作,create操作符必须手动调用onNext才能触发事件,just会自动触发

    @Test
     public void testjust() {
         //just是create的快捷创建操作
         Observable.just("我是你爸爸","我是你爸爸2").subscribe(new Observer<String>() {
             @Override
             public void onSubscribe(Disposable d) {
             }
    
             @Override
             public void onNext(String s) {
                 //此处会依次收到just参数传递过来的值
             }
    
             @Override
             public void onError(Throwable e) {
    
             }
    
             @Override
             public void onComplete() {
    
             }
         });
     }
    
  • fromArray
    相比于just,fromArray适用于多参数的情况.

      @Test
      public void testFromArray() {
          Observable.fromArray(new String[]{"我是你爸爸",
                  "我是你爸爸2",
                  "我是你爸爸3",
                  "我是你爸爸4"}).subscribe(new Observer<String>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("onNext  "+s);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • range
    创建在一定范围内的事件

    @Test
      public void testRange() {
          //从5开始执行11次事件
          Observable.range(5,11).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
              @Override
              public void onError(Throwable e) {
              }
              @Override
              public void onComplete() {
              }
          });
      }
    
  • empty
    主要适用于调用后不需要返回参数只需要关心结果,如:发起网络请求后在onComplete()中处理结果即可他不会回调onNext函数.

    
      @Test
      public void testempty() {
          Observable.empty().subscribe(new Observer<Object>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
                  System.out.println("执行结束");
              }
          });
      }
    
  • interval
    定时器操作符,需要依赖Android的api不能在纯java环境下使用

    //每隔1单位秒的时间执行一次
    Observable.interval(1, TimeUnit.SECONDS);
    
  • intervalRange
    定时器操作符,需要依赖Android的api不能在纯java环境下使用

     //从0开始每隔1000毫秒发送50个事件  初始延时0
          Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    
  • timer
    跟interval一样.

二、转换操作符

将事件类型转换成我们想要的结果

  • map
     @Test
      public void testMap() {
          //场景:根据图片地址最终转换成bitmap
          Observable.just("icon01.png","icon02.png").map(new Function<String, Bitmap>() {
              @Override
              public Bitmap apply(String url) throws Exception {
                  //在此次模拟执行网络请求等操作
                  // ...  此处省略
                  Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888);
                  return mBitmap;
                  
              }
          }).subscribe(new Observer<Bitmap>() {
              @Override
              public void onSubscribe(Disposable d) {
                  
              }
    
              @Override
              public void onNext(Bitmap bitmap) {
                  //在此次就可以 以此得到请求到的图片 
                  System.out.println("得到结果:"+bitmap);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
    
  • flatMap
    在上一个事件完成后才能开始下一个事件的情况
    @Test
      public void testFlatMap() {
          //比如:token过期了 必须先请求一个token 再进行登录请求
          Observable.just("getToken","login").flatMap(new Function<String, ObservableSource<?>>() {
              @Override
              public ObservableSource<?> apply(String s) throws Exception {
                  System.out.println("执行事件:"+s);
                  return createRespone(s);
              }
          }).subscribe(new Observer<Object>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
                  //依次回调处理结果
                  System.out.println(o);
    
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
    
      }
    
      private ObservableSource<?> createRespone(final String s) {
          //根据请求再创建一个被观察者,观察上一个请求是否成功了
          return Observable.create(new ObservableOnSubscribe<String>() {
              @Override
              public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                  System.out.println("上一个事件已经执行完成开始执行此事件:"+s);
                  //此处是基于getToken完成之后才会执行
                  emitter.onNext(s);
              }
          });
      }
    
  • groupBy
    对传入的事件进行分组,分组的条件可以自己指定
    @Test
      public void testGroupBy() {
          Observable.just(1,2,3,4).groupBy(new Function<Integer, String>() {
              @Override
              public String apply(Integer integer) throws Exception {
                  return integer>2?"A组":"B组";
              }
          }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
              @Override
              public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable)
                      throws Exception {
                  //stringIntegerGroupedObservable 是一个分组后的被观察者
                  stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          String key = stringIntegerGroupedObservable.getKey();
                          System.out.println("key="+key+" "+integer);
                      }
                  });
              }
          });
      }
    
  • buffer
    大批量数据需要处理的时候,对其进行分批次处理
     @Test
      public void testBuffer() {
          //将6条数据每2条分一个组执行
          Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer<List<Integer>>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(List<Integer> integers) {
                  //以此回调每个组的数据
                  System.out.println(integers);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • range
    上一个结果作为下一个参数,所有的结果累加得到最终结果,文件合并或者字符串拼接等场景.
      @Test
      public void testScan() {
          Observable.range(1,5).scan(new BiFunction<Integer, Integer, Integer>() {
              @Override
              public Integer apply(Integer integer, Integer integer2) throws Exception {
                  //第一个参数integer为之前所有结果的和  就是累加的形式
                  //相当于 第一个文件跟第二个文件合并,合并后的结果跟第三个文件合并...最终合并成一个大文件
                  return integer+integer2;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    

三、过滤操作符

  • filter
    对事件进行过滤或者不过滤的处理
    @Test
      public void testFilter() {
          Observable.just(1,2,3,4,5,6).filter(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  //此次决定是否过滤
                  //true 不过滤
                  //false 过滤掉不计入结果中
                  return integer>2;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  //接受过滤后的结果 
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • take
    限制产生事件的数量
        @Test
      public void testTake() {
          //每隔1单位秒的时间执行一次 take限制只执行5次
          Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer<Long>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Long aLong) {
                  System.out.println(aLong+"");
                  
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • distinct
    过滤重复事件
        @Test
      public void testDistinct() {
          Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • elementAt
    过滤指定的事件
          //指定过虑出第5个事件
          Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    

四、条件操作符

  • all
    判断所有事件是否满足一个条件,如果全部满足则为true
     Observable.just(1,2,3,4,5,6).all(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  //所有的事件都大于2吗
                  return integer>2;
              }
          }).subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  //此次返回时间结果
                  System.out.println(aBoolean);
              }
          });
    
  • contains
    判断所有事件中是否包含某项事件
     Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer<Boolean>() {
             @Override
             public void accept(Boolean aBoolean) throws Exception {
                 //此处返回是否包含3的结果
                 System.out.println(aBoolean);
             }
         });
    
  • any
    所有事件中只要有有一个符合条件即为true
    Observable.just(1,2,3,4,5).any(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  return integer==3;
              }
          }).subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  System.out.println(aBoolean);
              }
          });
    
  • isEmpty
    判断一个观察者是否有事件
     Observable.just(1).isEmpty().subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  //有事件返回true  空事件返回false
                  System.out.println(aBoolean);
              }
          });
    
  • defaultIfEmpty
    如果被观察者不发送任何事件,则会发送默认事件
    .defaultIfEmpty(0)
    
  • skipWhile
    跳过满足条件的事件
             //从0开始每隔1000毫秒发送50个事件  初始延时0  
            Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate<Long>() {
              @Override
              public boolean test(Long aLong) throws Exception {
                  //跳过<10的事件
                  return aLong<10;
              }
          }).subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    

五、合并操作符

将被观察者进行合并

  • startWith
    把需要的事件合并成一个事件进行处理,会先处理startWith添加的事件
            //把需要的事件合并成一个事件进行处理,会先处理2,4,6,8的事件
          Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • concat
    合并最多4个事件 以先来后到的顺序进行处理,跟startWith相反。
            //合并两个事件 123 会优先处理
          Observable.concat(
                  Observable.just(1,2,3),
                  Observable.just(4,5,6))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • merge
    merge合并多个被观察者,合并之后按照时间顺序并行执行
            Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS);
    
          Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • mergeDelayError
    延迟抛出异常事件,当合并的其它事件都执行完成之后再抛出异常
    //延迟抛出异常事件
          Flowable observable1 = Flowable.create(new FlowableOnSubscribe<Publisher<?>>() {
              @Override
              public void subscribe(FlowableEmitter<Publisher<?>> emitter) throws Exception {
                  //假设此处发生了异常
                  emitter.onError(new NullPointerException());
              }
          }, BackpressureStrategy.BUFFER);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
    
    
          Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • zip
    将多个被观察者压缩成单个,输出事件最少的被观察者结果

相关文章

  • part05_Rxjava操作符

    作者:IT魔幻师博客:www.huyingzi.top转载请注明出处:https://www.jianshu.co...

  • Rxjava2-二、操作符

    Rxjava记录总结操作符:创建操作符、转换操作符、合并操作符、过滤操作符、其他操作符、条件操作符. 创建操作符 ...

  • Kotlin 笔记(二)集合和函数操作符

    总数操作符 过滤操作符 映射操作符 元素操作符 生产操作符 顺序操作符

  • 走进RxJava源码(三) -- 创建型操作符

    创建型操作符 create操作符 just操作符 fromArray操作符 empty操作符 range 操作符

  • RxJava2操作符

    操作符分类 操作符分类有十三种: 变换操作符 过滤操作符列表 组合操作符 错误处理操作符 辅助操作符 条件操作符 ...

  • js运算符与表达式

    操作符 一元操作符:++,-- 算术操作符:+,-,*,/,% 关系操作符:>,<,>=,<= 相等操作符:==,...

  • 操作符和表达式

    操作符 一元操作符(++,--) 算术操作符(+,-,*,/,%) 关系操作符(>,<,>=,<=,) 相等操作符...

  • Rxjava讲解(2)

    上面文章讲过创建操作符, 转换操作符,过滤操作符, 这篇文字介绍组合操作符,条件操作符,功能操作符。 组合操作符 ...

  • JavaScript高程读书笔记(2)

    五、操作符 算数操作符、位操作符、关系操作符、 相等操作符 一元操作符++和--:注意a+++,- 位操作符按位非...

  • rxjava2 学习笔记

    特点 链式调用 线程切换 操作符 创建操作符 转换操作符 过滤操作符 组合操作符 错误处理操作符 辅助性操作符 条...

网友评论

    本文标题:part05_Rxjava操作符

    本文链接:https://www.haomeiwen.com/subject/dsloiftx.html