美文网首页
Spark-DStream数据转换

Spark-DStream数据转换

作者: 布莱安托 | 来源:发表于2020-07-07 14:47 被阅读0次

DStream的原语与RDD类似,分文转换(Transformation)和输出(Output)两种,此外还有一些特殊的原语,如:updateStateByKey,transform以及各种窗口(window)相关的原语。

无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。支持map、flatMap、filter、reduceByKey等RDD相同的操作。

注:如针对键值对的DStream使用reduceByKey需要添加import StreamingContext._

无状态转化应用到DStream内部各个批次的RDD上,但是只会对单个时间周期的数据进行操作,并不会跨越时间周期。

无状态转化操作也能在多个DStream间整合数据,不过也是在各个时间周期内。键值对DStream可以使用和RDD一样的连接相关操作,例如:cogroup、join、leftOutJoin等。

同样,可以使用union将两个DStream内容合并起来,也可以使用StreamingContext.union()合并多个流。

有状态转化操作

1. updateStateByKey

updateStateByKey用于记录历史记录,有事我们需要在DStream中跨批次维护状态。针对这种情况updateStateByKey为我们提供了一个对状态变量的访问,用于键值对的DStream。给定一个键值对类型的DStream,并传递一个按照键来更新值得函数,以此来构建出一个新的DStream,内部数据的形式为(原有键,状态)的一个键值对。

想要维护并更新状态需要做如下两步:

  1. 定义状态,可以为任意数据类型
  2. 定义状态更新函数,函数需要实现当前批次输入与当前状态的更新操作

使用updateStateByKey需要设置checkpoint,会通过checkpoint保存状态。

实现有状态的Wordcount如下:

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object UpdateStateDemo {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local[4]").setAppName("UpdateStateDemo")
    val streamingContext = new StreamingContext(conf, Seconds(5))
      
    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val topics = Array("test")

    val kafkaDStream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )

    val flatMapDStream = kafkaDStream.flatMap(_.key().split(" "))

    val mapDStream = flatMapDStream.map((_, 1))

    // 将原有状态值与新增数据value序列相加,获得新的状态
    val updateStateDStream = mapDStream.updateStateByKey {
      case (seq, state) => Option(state.getOrElse(0) + seq.sum)
    }

    updateStateDStream.print()

    streamingContext.start()
    streamingContext.awaitTermination()

  }
}

2. 窗口操作

窗口操作可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Streaming的允许状态。基于窗口的操作会在一个比StreamingContext的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。

注:所有基于窗口的操作均需要两个参数:1)窗口大小,2)滑动步长,两者都必须是StreamingContext中设置的采集周期的正整数倍。

窗口大小控制每次计算最近的多少个批次的数据,计算的批次数量为windowDuration/batchInterval个。滑动步长用来控制对新的DStream进行计算的间隔,默认与批次间隔相同。

关于窗口的操作有如下原语:

  1. window(windowLength, slideInterval):对DStream的窗口中的批次进行计算,并返回一个新的DStream

    利用window对窗口中的批次数据进行Wordcount

    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object WindowDemo {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[4]").setAppName("WindowDemo")
        val streamingContext = new StreamingContext(conf, Seconds(3))
    
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "use_a_separate_group_id_for_each_stream",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("test")
    
        val kafkaDStream = KafkaUtils.createDirectStream[String, String](
          streamingContext,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    
        // 窗口大小及滑动步长均为采集周期的整数倍
        val windowDStream = kafkaDStream.window(Seconds(9), Seconds(3))
    
        val flatMapDStream = windowDStream.flatMap(_.key().split(" "))
    
        val mapDStream = flatMapDStream.map((_, 1))
    
        val reduceByKeyDStream = mapDStream.reduceByKey(_ + _)
    
        reduceByKeyDStream.print()
    
        streamingContext.start()
        streamingContext.awaitTermination()
    
      }
    }
    
  1. countByWindow(windowLength, slideInterval):返回一个DStream窗口计算的元素数量

  2. reduceByWindow(func, windowLength, slideInterval):通过自定义函数对DStream窗口进行聚合

  3. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):通过自定义函数对键值对类型的DStream的窗口进行reduceByKey操作。默认任务并行大小根据配置属性spark.default.parallelism做分组,也可以通过制定numTasks来设置并行度。

  4. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):与上面的函数相比,新增了参数invFunc,这个函数是用来将上一个窗口包含的,本次窗口不包含的批次数据结果从上一个窗口的计算结果去除,这样窗口间相同的批次不需要重复计算,只需要将新增数据和除去无效数据后的结果进行计算,即可得到新的结果,提高了计算的效率。注:使用时必须开启checkpoint

  5. countByValueAndWindow(windowLength, slideInterval, [numTasks]):对键值对类型的DStream进行处理,返回(K, Long)类型的DStream,其中键的值为其在窗口中出现的频次。

3. 其他重要操作

  1. transform
  2. join

相关文章

  • Spark-DStream数据转换

    DStream的原语与RDD类似,分文转换(Transformation)和输出(Output)两种,此外还有一些...

  • day4循环和分支

    一、数据类型转换 1.数据类型自动转换 运行 2.强制转换 基本语法:类型名(需要转换的数据) a.将其他数据转换...

  • JAVA基础第四天

    JAVA数据类型---布尔类型; 数据类型转换自动数据类型转换 强制数据类型转换

  • python数据分析10:数据转换

    数据转换:一般包括一列数据转换为多列数据,行列转换,DataFrame转换为字典、列表和元组等 【一列数据转换为多...

  • 循环

    一,数据类型转换 基本格式:数据类型(带转换数据) 1.转换成int类型:int(待转换数据) (1).float...

  • 2018-08-23day04循环和分支学习总结

    一.数据类型转换 1.数据类型的自动转换 2.强制转换 基本语法:类型名(需要转换的数据)a.将其他数据转换成in...

  • SpringMVC 数据转换 Day22 2018-12-12

    SpringMVC 数据转换 springMVC数据绑定流程 使用ConversionService转换数据 使用...

  • SpringMVC多种数据类型转换器 Day30 2018-12

    SpringMVC 数据转换 springMVC数据绑定流程 使用ConversionService转换数据 使用...

  • JavaScript基础03- 数据类型转换

    数据类型转换 所谓的数据类型转换,就是将一种数据类型转换为另一种数据类型,所以,数据类型转换包括,转换为字符串类型...

  • 3-数据类型转换

    数据类型转换 将数据有当前类型变化为其他类型的操作就是数据类型转换。 数据类型转换分类: 数据类型转换一共分为2类...

网友评论

      本文标题:Spark-DStream数据转换

      本文链接:https://www.haomeiwen.com/subject/fdigqktx.html