union
DataStream → DataStream
作用:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。
限制:合并的每条流的数据类型一致
package com.atguigu.apiTest
import org.apache.flink.streaming.api.scala._
object TestSplitSelect {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream: DataStream[String] = env.readTextFile("C:\\Users\\Administrator\\Desktop\\0311Flink\\flink\\src\\main\\resources\\sensor")
    val dataStrem2: DataStream[SensorReading] = dataStream.map(data => {
      val dataArray: Array[String] = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
    val splitStream: SplitStream[SensorReading] = dataStrem2.split(sensorData => {
      if (sensorData.temperature > 30) Seq("high") else Seq("low")
    })
    val high = splitStream.select("high")
    val low = splitStream.select("low")
    val all = splitStream.select("high","low")
    //合并两条流
    val warning: DataStream[(String, Double)] = high.map(data => (data.id, data.temperature))
    //使用union合并两条流
    //warning.union(low) 报错,原因是:两条流类型一致
    val unionDataStream: DataStream[SensorReading] = high.union(low)
    unionDataStream.print()
    env.execute()
  }
}
输出结果:
2> SensorReading(sensor_1,1547718200,11.1)
1> SensorReading(sensor_10,1547718206,10.1)
4> SensorReading(sensor_10,1547718205,38.1)
4> SensorReading(sensor_7,1547718202,6.7)
3> SensorReading(sensor_1,1547718199,35.8)
3> SensorReading(sensor_6,1547718201,15.4)













网友评论