美文网首页
Spark Streaming 概览

Spark Streaming 概览

作者: 天之見證 | 来源:发表于2019-11-06 23:17 被阅读0次

0. 流式的一些sense

流式的数据都是带有时间戳的

那么流上的计算有哪些 ?

类别 例子 对应中spark的api
单条 及时报警
某一批 (spark streaming中的批比flink更明显一点) 当前流量的大小 reduceByKey
某个时间窗口 要计算每个小时的pv, 但是数据要每秒更新 reduceByKeyAndWindow
整个流 到目前为止的总量,但是数据要每秒更新 updateStateByKey

1. streaming如何生成一个个的batch任务

1.1 StreamingContext

StreamingContextstart 会单独起一个线程,并在该线程中启动一个JobScheduler

1.2 JobScheduler

  1. 创建并启动eventLoop
  2. 添加StreamingListener
  3. 启动listenerBus (将event转发给listener)
  4. 创建ExecutorAllocationClient (作为一个client向yarn提交申请/杀掉executor的命令)
  5. 创建ExecutorAllocationManager (在开启动态资源分配的时候需要不同于批处理的动态资源管理策略)
  6. 启动jobGenerator

1.3 JobGenerator

  1. 启动自己的eventLoop
  2. 调用restart() (当有checkpoint时), 或者 startFirstTime 来启动DStreamGraph

其中restart可能会立刻启动一个job来恢复之前的数据

初始化一个 RecurringTimer用来给自己发消息

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

并且在自己的eventLoop中进行处理:

private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

/** Generate jobs and perform checkpointing for the given `time`.  */
private def generateJobs(time: Time) {
  // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
  // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
  ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
      PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

1.4 DStreamGraph

JobGenerator调用, 自己再挨个调用graph中的DStream#generateJob

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

1.5 DStream

getOrCompute 获取RDD, 并将其提交给sparkContext

private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) =>
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    case None => None
  }
}

2. 杂记

2.1 EventLoop

用来从caller接收信息, 并在receiver端进行信息的处理, 例如:

// JobScheduler.scala
jobGenerator.onBatchCompletion(jobSet.time)

// JobGenerator.scala
def onBatchCompletion(time: Time) {
  eventLoop.post(ClearMetadata(time))
}

// eventLoop中会不断的调用onReceive方法(消息放在一个BlockingQueue中)
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
  override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

  override protected def onError(e: Throwable): Unit = {
    jobScheduler.reportError("Error in job generator", e)
  }
}

// 真正对消息进行路由和处理
private def processEvent(event: JobGeneratorEvent) {
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

private def clearMetadata(time: Time) {
  // ....
}
// 以上这些代码都在JobGenerator.scala中

2.2 foreachRDD

和批处理的api略有不同, foreachRDD 里面的代码并不是在executor端跑的

dstream.foreachRDD { rdd =>
  // 这部分代码在driver端跑
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

相关文章

网友评论

      本文标题:Spark Streaming 概览

      本文链接:https://www.haomeiwen.com/subject/neapbctx.html