0. 流式的一些sense
流式的数据都是带有时间戳的
那么流上的计算有哪些 ?
类别 | 例子 | 对应中spark的api |
---|---|---|
单条 | 及时报警 | 无 |
某一批 (spark streaming中的批比flink更明显一点) | 当前流量的大小 | reduceByKey |
某个时间窗口 | 要计算每个小时的pv, 但是数据要每秒更新 | reduceByKeyAndWindow |
整个流 | 到目前为止的总量,但是数据要每秒更新 | updateStateByKey |
1. streaming如何生成一个个的batch任务
1.1 StreamingContext
StreamingContext
的start
会单独起一个线程,并在该线程中启动一个JobScheduler
1.2 JobScheduler
- 创建并启动
eventLoop
- 添加
StreamingListener
- 启动
listenerBus
(将event转发给listener) - 创建
ExecutorAllocationClient
(作为一个client向yarn提交申请/杀掉executor的命令) - 创建
ExecutorAllocationManager
(在开启动态资源分配的时候需要不同于批处理的动态资源管理策略) - 启动
jobGenerator
1.3 JobGenerator
- 启动自己的
eventLoop
- 调用
restart()
(当有checkpoint时), 或者startFirstTime
来启动DStreamGraph
其中restart
可能会立刻启动一个job来恢复之前的数据
初始化一个 RecurringTimer
用来给自己发消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
并且在自己的eventLoop
中进行处理:
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Generate jobs and perform checkpointing for the given `time`. */
private def generateJobs(time: Time) {
// Checkpoint all RDDs marked for checkpointing to ensure their lineages are
// truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match {
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
1.4 DStreamGraph
由JobGenerator
调用, 自己再挨个调用graph
中的DStream#generateJob
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
1.5 DStream
getOrCompute
获取RDD
, 并将其提交给sparkContext
private[streaming] def generateJob(time: Time): Option[Job] = {
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}
2. 杂记
2.1 EventLoop
用来从caller接收信息, 并在receiver端进行信息的处理, 例如:
// JobScheduler.scala
jobGenerator.onBatchCompletion(jobSet.time)
// JobGenerator.scala
def onBatchCompletion(time: Time) {
eventLoop.post(ClearMetadata(time))
}
// eventLoop中会不断的调用onReceive方法(消息放在一个BlockingQueue中)
eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = {
jobScheduler.reportError("Error in job generator", e)
}
}
// 真正对消息进行路由和处理
private def processEvent(event: JobGeneratorEvent) {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
private def clearMetadata(time: Time) {
// ....
}
// 以上这些代码都在JobGenerator.scala中
2.2 foreachRDD
和批处理的api略有不同, foreachRDD
里面的代码并不是在executor端跑的
dstream.foreachRDD { rdd =>
// 这部分代码在driver端跑
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
网友评论