美文网首页
多流转换算子Split和select

多流转换算子Split和select

作者: yayooo | 来源:发表于2019-08-25 16:10 被阅读0次
split
DataStream → SplitStream
并不是真正的切开拆分成了两条流,只是在splitStream里面分成了两组。所以SplitStream不能直接操作需要另一个算子select拿出来处理。
select
SplitStream→DataStream
从一个SplitStream中获取一个或者多个DataStream。

package com.atguigu.apiTest

import org.apache.flink.streaming.api.scala._

object TestSplitSelect {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\sensor")

    val dataStrem2: DataStream[SensorReading] = dataStream.map(data => {
      val dataArray: Array[String] = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })

    val splitStream: SplitStream[SensorReading] = dataStrem2.split(sensorData => {
      if (sensorData.temperature > 30) Seq("high") else Seq("low")
    })

    val high = splitStream.select("high")
    val low = splitStream.select("low")
    val all = splitStream.select("high","low")

    high.print("high")
    low.print("low")
    all.print("all")

    env.execute()


  }

}
//传感器读数样例类
case class SensorReading(id:String, timestamp:Long, temperature: Double)

输出:

low:1> SensorReading(sensor_7,1547718202,6.7)
all:4> SensorReading(sensor_1,1547718199,35.8)
low:3> SensorReading(sensor_1,1547718200,11.1)
high:4> SensorReading(sensor_1,1547718199,35.8)
all:1> SensorReading(sensor_7,1547718202,6.7)
all:4> SensorReading(sensor_6,1547718201,15.4)
low:4> SensorReading(sensor_6,1547718201,15.4)
all:2> SensorReading(sensor_10,1547718206,10.1)
low:2> SensorReading(sensor_10,1547718206,10.1)
all:3> SensorReading(sensor_1,1547718200,11.1)
high:1> SensorReading(sensor_10,1547718205,38.1)
all:1> SensorReading(sensor_10,1547718205,38.1)

相关文章

网友评论

      本文标题:多流转换算子Split和select

      本文链接:https://www.haomeiwen.com/subject/emfxectx.html