美文网首页java
Reactor学习:五、中间操作—转换

Reactor学习:五、中间操作—转换

作者: 睦月MTK | 来源:发表于2020-08-04 21:46 被阅读0次

声明:差不多就是Transforming an Existing Sequence的翻译


一、转换当前的序列
  • 一对一转换(比如将当前字符串的值转换为其长度):map
    Flux.just("str1" , "str").map(item -> {return   item.length();}).subscribe(System.out::println);//4 3 
    
    • 仅作类型转换:cast
      Mono.just("str1").cast(Object.class);
      
    • 将元素转换为带序号的封装元素,序号来自index操作所接收到的顺序,从0开始
      Flux.just("str1","str2").index();//3.1.0版本似乎没有这个方法
      
    • 将元素转换为带有记录间隔时间的封装元素,间隔时间是elapsed操作接收到上一个数据到当前数据的时间间隔
      Flux.interval(Duration.ofSeconds(1)).elapsed().subscribe(System.out::println,System.out::println,null,sub -> sub.request(3));
      //-----------------------
      [1007,0]
      [1001,1]
      [999,2]
      
  • 一对多转换(比如将字符串转换为组成它们的字符): flatMap
    注意:flatMap会将其内部多个Publisher合成为一个(按照时间顺序),所以最后展现的还是一个序列
    Flux.just("str1","str2").flatMap(item -> {
                return Flux.fromStream(item.chars().mapToObj(c ->   Character.valueOf((char) c)));
            } , 2).subscribe(System.out::println);
    //----------------(结果去掉换行)
    s t r 1 s t r 2
    
    • 如果想忽略掉flatMap中某些数据可以使用Mono.empty()
    • 如果想使得flatMap的数据按原数据顺序排列(比如转换公式为:str => s t r ,原数据如果为str1 str2 那么转换后的排序顺序必为 s t r 1 s t r 2,即只要上一个元素内部转换流没有走完,下一个转换内容的永远不会被输出,但会被记录),可以试试Flux#flatMapSequential
    • Mono的一对多转换(转换为Flux):Mono#flatMapMany

二、在已有序列中增添新的元素
  • 在序列之前 :Flux#startWith
    Flux.just(1,2,3).startWith(0).subscribe(System.out::println);
    //------------------
    0 1 2 3
    
  • 在序列之后 :Flux#concatWith

三、对Flux进行归并
  • 归并为List:Flux#collectList , Flux#collectSortedList
    等待接收完成,并将所有数据归并成一个Mono<List>进行返回,后者比前者多个排序功能

    Flux.just(3,5,2,1).collectSortedList((left , right) -> {
        if(left > right){
            return 1;
        }else if(left < right){
            return -1;
        }else{
            return 0;
        }
    }).subscribe(System.out::println);
    //---------------
    [1, 2, 3, 5]
    
  • 归并为Map:Flux#collectMap , Flux#collectMultiMap
    等待接收完成,将所有数据归并为一个Mono<Map>进行返回,后者返回的是Mono<Map<key,Collection>>

    Flux.just(3,5,2,1).collectMap(item -> item.toString()).subscribe(System.out::println);
    //------------------------
    //{1=1, 2=2, 3=3, 5=5}
    
  • 由Collector来完成归并:Flux#collect

    Flux.just(3,5,2,1).collect(Collectors.counting()).subscribe(System.out::println);
    //--------------------
    //4
    
  • 计算序列中元素数量:Flux#count

    Flux.just(3,5,2,1).count().subscribe(System.out::println);
    //-----------------------------------
    //4
    
  • 对序列中的每一个元素应用一个回调,该回调的结果会带入下一次回调,直到所有元素被转换为一个最终回调结果:Flux#reduce

    Flux.just(3,5,2,1).reduce("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println);
    //----------------
    //number:3521
    
    • 在Flux#reduce的基础上,增加输出每次转换结果的功能:Flux#scan
      Flux.just(3,5,2,1).scan("number:" , (lastResult,item) -> lastResult + item.toString()).subscribe(System.out::println);
      //-------------------------
      number:
      number:3
      number:35
      number:352
      number:3521
      
  • 归并成布尔值

    • 指定判断式,是否符合所有元素:Flux#all
    Flux.just(3,5,2,1).all(item -> item < 10).subscribe(System.out::println);
    //---------------
    //true
    

    注意,如果有一个不符合,将会立刻向上游发送cancel信号,并向下游发出元素false

    • 指定判断式,是否至少有一个符合:Flux#any

    注意,如果有一个符合,将会将会立刻向上游发送cancel信号,并向下游发出元素true,下面两个也同理,达到要求便立刻中止

    • 检验序列中是否有元素:Flux#hasElements()
      Flux.empty().hasElements().subscribe(System.out::println);
      //---------------
      //false
      
    • 检验序列中是否有特定元素:Flux#hasElement(T value)

四、合并多个生产者
  • 以序列的顺序进行合并:Flux#concatWith(other)
    Flux.just(1,2,3).concatWith(Flux.just(4,5,6)).subscribe(System.out::println);
    //---------------------------------
    //1 2 3 4 5 6
    
  • 以发出的元素顺序进行合并,先发出的元素在前:Flux#mergeWith(other)
    Flux.interval(Duration.ofSeconds(1)).mergeWith(Flux.interval(Duration.ofSeconds(1))).subscribe(System.out::println);
    //---------------
    //0 0 1 1 2 2 3 3 ....
    
    • mergeWith基础上,增加了兼容不同类型生产者的功能和合并元素的功能,如[1,2,3]和[a,b,c,d]会合并成[1a,2b,3c]:Flux#zipWith
      Flux.just(1,2,3).zipWith(Flux.just("a" , "b" , "c")).subscribe(System.out::println);
      //------
      //[1,a] [2,b] [3,c]
      Flux.interval(Duration.ofSeconds(1)).zipWith(Flux.just("a","b","c") , (f,s) -> f.toString().concat(s)).subscribe(System.out::println);
      //--------
      //0a 1b 2c
      
  • 等待另一个序列结束,然后丢出一个Mono<Void>:Mono#and
    Mono.just("e").and(Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok"));
    //-----------
    //0 1 2 ok
    
  • 等待指定所有序列结束,然后丢出一个Mono<Void>:Mono#when
    Mono.when(Flux.just(1,2,3) , Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(null , null ,() -> System.out.println("ok"));
    //-------------
    //0 1 2 ok
    
  • zipWith相似,但是每次合并的元素都取自其他序列发出的最近一个值,而不是一直等待其他序列发出下一个值:Flux#combineLatest
    Flux.combineLatest(objects -> objects[0].toString().concat(objects[1].toString()) ,Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just("a","b","c")).subscribe(System.out::println);
    // 0c 1c 2c
    
  • 有一个生产者序列集合,以谁先发出第一个元素来判断选择哪个序列进行输出
    Flux.first(Flux.interval(Duration.ofSeconds(1)).take(3) , Flux.just(4,5,6)).subscribe(System.out::println);
    //----------
    // 4 5 6
    
  • flatMap相似,但是当转换的序列还没结束,但是主序列的下一个元素已经到来额时候,会直接取消掉转换后的序列,也就是说同时只能存在一个转换的序列被执行:switchMap

五、重复
  • 完成后,重新订阅该序列,重复输出,永不停止:repeat
    • 上述功能,再加上时间间隔:Flux.interval(duration).flatMap(tick → myExistingPublisher)

六、只对完成信号感兴趣,即忽略元素
  • 忽略所有的元素,如果上游发出完成信号,则完成:ignoreElements
  • 忽略所有的元素,只响应错误信号和完成信号,完成返回Mono<Void> : then
    • 变体--响应完成时不再返回Mono<Void>,而是正常执行参数中的Mono,并将其返回值作为返回值:then(Mono<T> other)
  • 完成后,还要再完成一个提供的空任务后才返回: Mono<Void> thenEmpty(Publisher<Void> other)
  • 完成后,返回提供的值:Mono<T> Mono#thenReturn(T)
  • 完成后,执行提供的Flux,其元素会正常输出:thenMany

七、有一个需要延迟完成的Mono
  • 需要等待该Mono中元素所生成的序列完成后再向下游发出该元素:Mono#delayUntil(Function)
    Mono.just("complete!").delayUntil(item -> Flux.interval(Duration.ofSeconds(1)).take(3).doOnNext(System.out::println)).subscribe(System.out::println);
    //-------------------------------------------
    //0 1 2 complete!
    

八、如果需要对流中元素进行递归操作
  • 以分支为先展开: Flux#expand(Function)
    即先对所有元素进行一遍递归,然后再对各个递归后序列中的序列进行递归
    Flux.just(1,2).expand(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println);
    //--------------
    //1 2 2 4 4 8 8
    
  • 以深度为先展开:Flux#expandDeep(Function)
    Flux.just(1,2).expandDeep(item -> item > 6 ? Mono.empty() : Flux.just(item *2)).subscribe(System.out::println);
    //-----------------
    //1 2 4 8 2 4 8
    

九、序列为空时转换
  • 如果序列为空,则输出提供的预定值: defaultIfEmpty
  • 如果序列为空,则订阅提供的预定序列::switchIfEmpty

参考文档:
[1] Reactor api doc
[2] Reactor reference doc

相关文章

网友评论

    本文标题:Reactor学习:五、中间操作—转换

    本文链接:https://www.haomeiwen.com/subject/wsglqktx.html