美文网首页
This is because the DStream obj

This is because the DStream obj

作者: 焉知非鱼 | 来源:发表于2019-06-05 10:45 被阅读0次

昨天把项目代码重新整合了一下, 然后就报错了:

ERROR StreamingContext: Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
com.gac.xs6.core.impl.EnAdapterImpl$


java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.

19/06/05 10:03:05 org.apache.spark.internal.Logging$class.logError(Logging.scala:91) ERROR Utils: Exception encountered
java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.DStreamCheckpointData is being serialized  possibly as a part of closure of an RDD operation. This is because  the DStream object is being referred to from within the closure.  Please rewrite the RDD operation inside this DStream to avoid this.  This has been enforced to avoid bloating of Spark tasks  with unnecessary objects.

原因可能是 mysql 数据库配置的序列化问题:

@Singleton
class MysqlConfiguration extends Serializable {
  private val config:             Config = ConfigFactory.load()
  lazy    val mysqlConf:          Config = config.getConfig("mysql")
  lazy    val mysqlJdbcUrl:       String = mysqlConf.getString("jdbcUrl")
  lazy    val mysqlJdbcDriver:    String = mysqlConf.getString("jdbcDriver")
  lazy    val dataSourceUser:     String = mysqlConf.getString("dataSource.user")
  lazy    val dataSourcePassword: String = mysqlConf.getString("dataSource.password")
  lazy    val dataSourceSize:     Int    = mysqlConf.getInt("dataSource.size")
}

但是我已经这么序列化了。

@Singleton
class NationApplication @Inject()(sparkConf:        SparkConfiguration,
                                  kafkaConf:        KafkaConfiguration,
                                  hbaseConf:        HbaseConfiguration,
                                  sparkContext:     NaSparkContext[SparkContext],
                                  mysqlConf:          MysqlConfiguration,
                                  source:           NaDStreamSource[Option[(String, String)]],
                                  adapter:          SourceDecode,
                                  sanitizer:        DataSanitizer,
                                  eventsExact:      EventsExact,
                                  eventsCheck:      EventsCheck,
                                  anomalyExact:     AnomalyExact,
                                  anomalyDetection: AnomalyDetection
                                  ) extends Serializable with Logging {

我在类里面添加了 MySQL 配置。然后在 eventStream.foreachRDD 里面引入

了 mysqlConf:

  def saveAnomaly(eventStream: DStream[(String, EventUpdate)]): Unit = {
    eventStream.foreachRDD((x: RDD[(String, EventUpdate)]) => {
      if (!x.isEmpty()) {
        x.foreachPartition { res: Iterator[(String, EventUpdate)] => {
           // val mysqlConf = new MysqlConfiguration
            val c      = new GacJdbcUtil(mysqlConf) // 获取 MySQL 数据库配置
            ...

结论:

在 foreachRDD 里面,使用 new 方法创建一个新的连接, 不使用类传递过来的配置

相关文章

网友评论

      本文标题:This is because the DStream obj

      本文链接:https://www.haomeiwen.com/subject/rljjxctx.html