- 原创-Spark源码分析五: Standalone模式下spar
- 原创-Spark源码分析四: Standalone模式下spar
- 深入理解Spark 2.1 Core (六):资源调度的原理与源
- Spark executor 模块③ - 启动 executor
- Spark Task 的执行流程② - 创建、分发 Task
- Spark executor 模块② - AppClient 向
- Spark executor模块① - 主要类以及创建 AppC
- 原创-Spark源码分析一:Standalone模式下Maste
- 原创-Spark源码分析二:Standalone模式下Maste
- 原创-Spark源码分析六:Standalone模式下Drive
Standalone模式下,spark-submit最后会通过执行org.apache.spark.deploy.ClientApp.start方法注册client rpc客户端,默认调用onStart方法,向Master注册Driver
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = getProperty(classPathConf, conf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = getProperty(libraryPathConf, conf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = getProperty(extraJavaOptsConf, conf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
asyncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}
在onstart方法中,执行asyncSendToMasterAndForwardReply方法
/**
* Send the message to master and forward the reply to self asynchronously.
*/
private def asyncSendToMasterAndForwardReply[T: ClassTag](message: Any): Unit = {
for (masterEndpoint <- masterEndpoints) {
masterEndpoint.ask[T](message).onComplete {
case Success(v) => self.send(v)
case Failure(e) =>
logWarning(s"Error sending messages to master $masterEndpoint", e)
}(forwardMessageExecutionContext)
}
}
向master发送RequestSubmitDriver请求,Master节点receiveAndReply方法中处理RequestSubmitDriver请求
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
主要执行流程:
- 判断当前master节点是否是alive
- zk中持久化driver信息
- 加入到待执行waitingDrivers集合中
- 调用schedule()方法开始执行driver
至此通过spark-submit提交任务主要流程结束,主要涉及流程图如下:

网友评论