美文网首页
广播变量

广播变量

作者: 焉知非鱼 | 来源:发表于2019-07-10 14:35 被阅读0次

从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。

  class GacXs6Offline @Inject()( sparkConf            : SparkConfiguration,
                               mysqlConf            : MysqlConfiguration,
                               hbaseConf            : HbaseConfiguration,
                               sparkContext         : EnterpriseSparkContext[SparkContext],
                               source               : NationDStream[(String,NaSourceData)],
                               naDriveTrip          : NaDriveTrip
                             ) extends Serializable {
    val naDriveTripDS = naDriveTrip.exteact(source)
    saveNaDriveTrip("drive_trip", naDriveTrip)
  }
  
  // serializable error, because the use of hbaseConf
  def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
    naDriveTrip.foreachRDD(rdd => {
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      jobConf.set("hbase.zookeeper.quorum", hbaseConf.hbaseUrl)
      jobConf.set("zookeeper.znode.parent", "/hbase")
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
      jobConf.setOutputFormat(classOf[TableOutputFormat])
      rdd.map(x => {
        (new ImmutableBytesWritable,  (new NaHbaseDao).putNaTripData(x._1, x._2))
      }).saveAsHadoopDataset(jobConf)
    })
  }

    // serializable ok,  because we create a new hbaseConf
    def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
    naDriveTrip.foreachRDD(rdd => {
      val conf = HBaseConfiguration.create()
      val jobConf = new JobConf(conf)
      val naHbaseConf =  new HbaseConfiguration
      jobConf.set("hbase.zookeeper.quorum", naHbaseConf.hbaseUrl)
      jobConf.set("zookeeper.znode.parent", "/hbase")
      jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
      jobConf.setOutputFormat(classOf[TableOutputFormat])

      rdd.map(x => {
        (new ImmutableBytesWritable,  (new NaHbaseDao).putNaTripData(x._1, x._2))
      }).saveAsHadoopDataset(jobConf)
    })
  }

  // serializable ok, because we create a new hbaseConf
  def saveNaDriveTrip(tableName: String, naDriveTrip: DStream[(String, TripState)]): Unit = {
    naDriveTrip.foreachRDD(rdd => {
      rdd.foreachPartition(partitionRdd => {
        val hbaseConf = new HbaseConfiguration
        val hbase = new HbaseUtil(hbaseConf)
        val connection = hbase.getHbaseConn
        val table      = connection.getTable(TableName.valueOf(tableName))
        val list = partitionRdd.map(data => {
          NaHbaseDao.putNaTripData(data._1, data._2)
        }).toList
        if (null != list && list.nonEmpty) {
          NaHbaseDao.saveData(list, table)
        }
      })
    })
  }

相关文章

  • 广播变量

    从 mysql 读取数据作为广播变量时, 虽然有 checkpoint 但是 kill 掉任务后,重启程序会失败。

  • 共享变量:广播变量

    一、使用场景如果我们要在分布式计算里面分发大对象(如:字典,集合,黑白名单等),由Driver端进行分发。如果这个...

  • Spark的广播变量机制

    Spark广播变量 什么是广播变量? 在同一个Execute共享同一份计算逻辑的变量 广播变量使用场景 我现在要在...

  • Spark之广播变量

    什么是广播变量 广播变量:分布式共享只读变量。广播变量用来高效分发较大的对象。向所有工作节点发送一个较大的只读值,...

  • Spark—广播变量

    广播变量 Spark有两种共享变量——累加器、广播变量。广播变量可以让程序高效地向所有工作节点发送一个较大的只读值...

  • Spark 广播join 与 Hive map join

    Sprak 广播变量 广播变量(Broadcast Variables)允许开发人员在每个节点(Worker or...

  • Spark-broadcast

    参见Spark相关--共享变量-广播变量-broadcast

  • spark广播变量

  • Spark广播变量

    原文链接

  • spark广播变量

    广播变量的好处:如果你的算子函数中,使用到了特别大的数据,那么,这个时候,推荐将该数据进行广播。这样的话,就不至于...

网友评论

      本文标题:广播变量

      本文链接:https://www.haomeiwen.com/subject/vklhkctx.html