美文网首页
Master原理剖析与源码分析

Master原理剖析与源码分析

作者: 有一束阳光叫温暖 | 来源:发表于2019-02-11 21:43 被阅读0次

一、主备切换机制原理剖析

Spark Master主备切换可以基于两种机制,一种是基于文件系统,一种是基于Zookeeper的。基于文件系统的主备切换机制需要在Active Master挂掉之后,由我们手动去切换到Standby Master上,而基于Zookeeper的主备切换机制,可以实现自动切换Master这里要说的Master主备切换机制,实际上指的就是,在Active Master挂掉之后,切换到Standby Master时,Master会做哪些操作

  1. 使用持久化引擎去读取持久化的storedApps、storesDrivers、storedWorkers、
    FileSystemPersistenceEngine、ZookeeperPersistenceEngine
  2. 判断,如何storedApps、storedDrivers、storedWokers有任何一个是非空的
  3. 将持久化的Application、Driver、Worker的信息重新进行注册,注册到Master内部的内部缓存结构中
  4. 将application和Worker的状态都修改为UNKNOWN,然后向Application所对应的Driver,以及worker发送Standby Master的地址
  5. Master在陆续接收到Driver和worker发送来的响应消息之后,会使用completeRecovery()方法对没有发送响应消息的Driver和Worker进行处理,过滤掉它们的消息
  6. 调用Master自己的schedule()方法,对正在等待资源调度Driver和application进行调度,比如在某个worker上启动Driver或者为Application在Worker上启动excutor
主备切换机制原理剖析

二、注册机制原理剖析与源码分析

  1. worker注册

(1)worker在启动之后,就会主动向Master进行注册
(2)Master将状态为DEAD的Worker过滤掉,对于状态为UNKNOWN的Worker,清理掉旧的Worker信息,替换为新的worker信息
(3)把worker加入内存缓存中(HashMap)
(4)把持久化引擎,将worker信息进行持久化(文件系统,Zookeeper)
(5)调用schedule(方法)

  1. Driver注册

(1) 用spark-sumbit提交spark Application时,首先会注册Driver
(2) 将Driver信息放入内存缓存中
(3) 加入等待调度队列(ArrayBuffer)
(4) 用持久化引擎将Driver信息持久化
(5) 调用schedule(方法)

  1. Application

(1)Driver启动好了,执行我们编写的Application代码,执行SparkContext初始化,底层的SparkDeploySchedulerBackend,会通过AppClinet内部的线程,
ClinetActor,发送RegisterApplication到Master进行Application的注册
(2) 将Application信息放入内存缓存中
(3) 加入等待调度队列(ArrayBuffer)
(4) 用持久化引擎将Application信息持久化
(5) 调用schedule(方法)

注册机制原理剖析

三、状态改变机制源码分配

  1. driver
  2. excutor

四、资源调度机制源码分析

schedule()代码片段

  private def schedule() {

    // 首先判断,master状态不是ALIVE的话直接返回
    // 也就是说standby masters是不会进行application等资源调度
    if (state != RecoveryState.ALIVE) { return }

    // First schedule drivers, they take strict precedence over applications
    // Randomization helps balance drivers
    // Random的原理就是对传入的集合的元素进行随机打乱
    // 取出workers中的所有之前注册上来的worker进行过滤,状态是ALIVE
    // 对状态Alive的worker,调用random.shuffle进行打乱
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    // 首先,调度driver
    // 只有在yarn-cluster模式提交时候,才会调度driver,因为standalone和yarn-client模式都会在本地直接运行
    // 启动driver,而b不会来注册driver,就更不可能让master调度driver了
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      // 主要还有活着的worker没有遍历到,那就继续进行遍历
      // 当前driver没有被启动,launcher为false
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        // 如何当前这个worker内存空闲量大于等于driver需要内存
        // 并且worker的空闲cpu数量,大于等于driverx需要cpu数量
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 启动driver
          launchDriver(worker, driver)
          // 并且driver从waitingDrivers队列中移除
          waitingDrivers -= driver
          launched = true
        }
        // 指针指向下一个worker
        curPos = (curPos + 1) % numWorkersAlive
      }
    }

launchDriver()方法

/**
    * 在worker上,启动dirver
    * @param worker
    * @param driver
    */
  def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // 将driver加入worker内存的缓存结构
    // 将worker内使用的内存和cpu内存数量,都加上driver需要内存和cpu数量
    worker.addDriver(driver)
    // 同时把worker也加入到driver内存缓存结构中
    driver.worker = Some(worker)
    // 调用worker的actor,给它发送LanuchDriver消息,让worker来启动driver
    worker.actor ! LaunchDriver(driver.id, driver.desc)
    // 状态driver的状态设置RUNNING
    driver.state = DriverState.RUNNING
  }

Application的调度算法有两种,一种是spreadOutApps。另一种是非spreadOutApps
(1)spreadOutApps

平均分配到每一个worker
(2)非spreadOutApps
少启动worker,每个worker能启动多少个core,就分配多少个core
源码剖析

    // 将每个application要启动executor都平均分布到各个worker上去
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) {
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        // 创建一个空数组,存储了要分配给每个worker的cpu数量
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        // 获取到底要分配多少cpu,取cpu剩余要分配的cpu数量和worker总共可用cpus数量最小值
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        // cpu还分配完,就继续循环
        while (toAssign > 0) {
          // worker还有可分配cpu
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            // 总要分配cpu-1
            toAssign -= 1
            assigned(pos) += 1
          }
          // 指针指向下一个worker
          pos = (pos + 1) % numUsable
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        // 给每个worker分配完application要求cpu core之后,遍历worker
        for (pos <- 0 until numUsable) {
          // 只要判断之前worker分配core
          if (assigned(pos) > 0) {
            // 在application内部缓存结构中,添加executor
            // 并创建ExecuotorDesc对象,其中封装了这个executor分配多少个cpu
            // 在spark-submit中,可以指定要多少个exectuor,每个executor多少个cpu,多少内存。
            //那么基于我们机制,实际上最后executor的数量,以及 每个executor的cpu可能与配置不一样
            // 比如要求3个executor,每个3个cpu,实际有9个worker,每个woker有一个cpu,要分配9个core,
            // 根据这个算法会给每个worker分配9个executor,每个executor 1个core

            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            // 启动executor
            launchExecutor(usableWorkers(pos), exec)
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      // 非spreadOutApps调度算法
      // 这个算法和spreadOutApps算法正好相反
      // 每个application 最大可能少使用worker,比如总共有10个worker节点,每个有10个core
      // application要分配20个core,根据这种算法只会分配到2个worker上,

      // 遍历worker,并且状态为ALIVE还有空闲cpu的worker
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        // 遍历application,并且还有需要分配core的application
        for (app <- waitingApps if app.coresLeft > 0) {
          // 如果d当前这个worker可以被application使用
          if (canUse(app, worker)) {
            // 取出worker剩余cpu数量与app要分配的cpu的最小z值
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              // 给app启动一个executor
              val exec = app.addExecutor(worker, coresToUse)
              // 启动executor
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }

相关文章

网友评论

      本文标题:Master原理剖析与源码分析

      本文链接:https://www.haomeiwen.com/subject/gbsceqtx.html