Spark实战(4)_Master原理剖析与源码分析

in cn •  7 years ago 

主备切换机制原理剖析

Master可以配置两个,Spark原生的standalone模式支持Master主备切换。

Spark Master主备切换可以基于两种机制,一种是基于文件系统的,一种是基于ZooKeeper的,基于文件系统的主备切换机制,需要在Active Master挂掉之后,手动去切换到Standby Master上。基于ZooKeeper的主备切换机制,可以实现自动切换Master。

Master主备切换机制,就是在Active Master挂掉之后,切换到Standby Master会做哪些操作。

  1. Standby Master,使用持久化引擎去读取持久化的storedApps、storedDrivers、storedWorkers。FileSystemPersistenceEngine,ZooKeeperPersistenceEngine。
  2. 判断,如果storedApps、storedDrivers、storedWorkers有任何一个是非空的。
  3. 将持久化的Application、Driver、Worker的信息,重新进行注册,注册到Master内部的内存缓存结构中。
  4. 将Application和Worker的状态都修改为UNKNOWN,然后向Application所对应的Driver,以及Worker发送Standby Master的地址。
  5. Driver和Worker,理论上来说,如果它们目前都是正常在运作的话,那么在接收到Master发送来的地址之后,就会返回响应消息给新的Master。
  6. 此时,Master在陆续接收到Driver和Worker发送来的响应消息之后,会使用completeRecovery()方法对没有发送响应消息的Driver和Worker进行处理,过滤掉它们的信息。
  7. 调用Master自己的schedule()方法,对正在等待资源配置的Driver和Application进行调度,比如在某个worker上启动Driver,或者为Application在Worker上启动它需要的Executor。

主备切换机制源码分析

org/apache/spark/deploy/master/Master.scala,completeRecovery方法。

/**
    * 完成Master的主备切换,就是完成Master的恢复
    */
  def completeRecovery() {
    // Ensure "only-once" recovery semantics using a short synchronization period.
    synchronized {
      if (state != RecoveryState.RECOVERING) { return }
      state = RecoveryState.COMPLETING_RECOVERY
    }

    // 将Application和Worker,过滤出来目前状态还是UNKNOWN的
    // 然后遍历,分别调用removeWorker和finishApplication方法,对可能已经出故障,
    // 或者甚至已经死掉的Application和Worker,进行清理
    // 总结一下清理的机制,三点:1、从内存缓存结构中移除;
    // 2、从相关的组件的内存缓存中移除;
    // 3、从持久化存储中移除。
    // Kill off any workers and apps that didn't respond to us.
    workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
    apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)

    // Reschedule drivers which were not claimed by any workers
    drivers.filter(_.worker.isEmpty).foreach { d =>
      logWarning(s"Driver ${d.id} was not found after master recovery")
      if (d.desc.supervise) {
        logWarning(s"Re-launching ${d.id}")
        relaunchDriver(d)
      } else {
        removeDriver(d.id, DriverState.ERROR, None)
        logWarning(s"Did not re-launch ${d.id} because it was not supervised")
      }
    }

    state = RecoveryState.ALIVE
    schedule()
    logInfo("Recovery complete - resuming operations!")
  }

注册机制原理剖析

Worker的注册

  1. Worker,在启动之后,就会主动向Master进行注册。
  2. Master,过滤,将状态为DEAD的Worker过滤掉,对于状态为UNKNOWN的Worker,清理掉旧的Worker信息,替换为新的Worker信息。
  3. 把Worker加入内存缓存中(HashMap)。
  4. 用持久化引擎,将Worker信息进行持久化(文件系统、ZooKeeper)。
  5. 调用schedule()方法。

Driver的注册

  1. Driver,用spark-submit提交spark Application,首先就会注册Driver。
  2. 将Driver信息放入内存缓存中(HashMap)。
  3. 加入等待调度队列(ArrayBuffer)。
  4. 用持久化引擎,将Driver信息持久化。
  5. 调用schedule()进行调度。

Application的注册

  1. Driver启动好了,执行我们编写的Application,执行SparkContext初始化,底层的SparkDeploySchedulerBackend,会通过AppClient内部的线程,ClientActor,发送RegisterApplication,到Master,进行Application的注册。
  2. 将Application信息放入内存缓存(HashMap)。
  3. 将Application加入等待调度的Application队列(ArrayBuffer)。
  4. 用持久化引擎将Application信息持久化。
  5. 调用schedule()方法,进行资源调度。

Application注册的源码分析

org/apache/spark/deploy/master/Master.scala,RegisterApplication样例类。

/**
      * 处理Application注册的请求
      */
    case RegisterApplication(description) => {
      // 如果master的状态是standby,也就是当前这个master,是standBy Master,不是Active Master,
      // 那么Application来请求注册,什么都不会干
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        // 用ApplicationDescription信息,创建ApplicationInfo
        val app = createApplication(description, sender)
        // 注册Application
        // 将ApplicationInfo加入缓存,将Application加入等待调度的队列waitingApps
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        // 使用持久化引擎,将Application进行持久化
        persistenceEngine.addApplication(app)
        // SparkDeploySchedulerBackend创建的AppClient通过ClientActor线程向Master Actor发送注册请求,
        // ClientActor会把自己的引用给带过来,用sender来命名,
        // Master就知道RegisterApplication这个消息是谁给我发过来的,
        // 反向,向SparkDeploySchedulerBackend的AppClient的ClientActor,发送消息,
        // 也就是RegisteredApplication
        sender ! RegisteredApplication(app.id, masterUrl)
        schedule()
      }
    }
    
def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.path.address
    if (addressToApp.contains(appAddress)) {
      logInfo("Attempted to re-register application at same address: " + appAddress)
      return
    }

    applicationMetricsSystem.registerSource(app.appSource)
    // 将app的信息加入内存缓存中
    apps += app
    idToApp(app.id) = app
    actorToApp(app.driver) = app
    addressToApp(appAddress) = app
    // 将app加入等待调度的队列waitingApps
    waitingApps += app
  }

Master状态改变处理机制源码分析

Driver的状态改变,package org.apache.spark.deploy.master,DriverStateChanged。

case DriverStateChanged(driverId, state, exception) => {
      state match {
          // 如果Driver的状态是错误、完成、被杀掉、失败
          // 那么就移除Driver。
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
    }
    
def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
    // 用scala的find()高阶函数,找到driverId对应的driver
    drivers.find(d => d.id == driverId) match {
        // 如果找到了,Some,样例类(Option)
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        // 将driver从内存缓存中移除
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 想completedDrivers中加入driver
        completedDrivers += driver
        // 使用持久化引擎去除driver的持久化信息
        persistenceEngine.removeDriver(driver)
        // 设置driver的state、exception
        driver.state = finalState
        driver.exception = exception
        // 将driver所在的worker,移除dirver
        driver.worker.foreach(w => w.removeDriver(driver))
        // 同样,调用schedule()方法
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

Executor的状态改变,package org.apache.spark.deploy.master,ExecutorStateChanged。

case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      // 找到executor对应的app,然后再反过来通过app内部的executors缓存获取executor信息
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
          // 如果有值
        case Some(exec) => {
          // 设置executor的当前状态
          val appInfo = idToApp(appId)
          exec.state = state
          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
          // 向driver同步发送ExecutorUpdated消息
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus)
          // 判断,如果executor完成了
          if (ExecutorState.isFinished(state)) {
            // Remove this executor from the worker and app
            logInfo(s"Removing executor ${exec.fullId} because it is $state")
            // 从app的缓存中移除executor
            appInfo.removeExecutor(exec)
            // 从运行executor的worker的缓存中移除executor
            exec.worker.removeExecutor(exec)

            // 判断,如果executor的退出状态是非正常的
            val normalExit = exitStatus == Some(0)
            // Only retry certain number of times so we don't go into an infinite loop.
            if (!normalExit) {
              // 判断application当前的重试次数,是否达到了最大值10,
              if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
                // 重新进行调度
                schedule()
              } else {
                // 否则,那么就removeApplication操作
                // executor反复调度都是失败,那么就认为application也失败了
                val execs = appInfo.executors.values
                if (!execs.exists(_.state == ExecutorState.RUNNING)) {
                  logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
                    s"${appInfo.retryCount} times; removing it")
                  removeApplication(appInfo, ApplicationState.FAILED)
                }
              }
            }
          }
        }
        case None =>
          logWarning(s"Got status update for unknown executor $appId/$execId")
      }
    }

Master资源调度算法原理剖析与源码分析

org/apache/spark/deploy/master/Master.scala,schedule方法。

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule() {
    // 首先判断,master状态不是ALIVE的话,直接返回,
    // 也就是说,standby master是不会进行application等资源的调度的。
    if (state != RecoveryState.ALIVE) { return }

    // First schedule drivers, they take strict precedence over applications
    // Randomization helps balance drivers
    // Random.shuffle的原理,就是对传入的集合的元素进行随机的打乱
    // 取出workers中的所有之前注册上来的worker,进行过滤,必须是状态为ALIVE的worker
    // 对状态为ALIVE的worker,调用Random的shuffle方法,进行随机的打乱
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    // 获取shuffledAliveWorkers的大小
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    // 首先调度driver
    // 为什么要调度driver,大家想一下,什么情况下,会注册driver,并且会导致driver被调度
    // 其实,只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,
    // 都会在本地直接启动driver,而不会来注册driver,就更不可能让master来调度driver了。

    // driver的调度机制
    // 遍历waitingDrivers ArrayBuffer
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      // numWorkersVisited小于numWorkersAlive
      // 就是说只要还有或者的worker没有遍历到,那么就继续进行遍历,
      // 而且,当前这个driver还没有被启动,也就是launched为false
      while (numWorkersVisited < numWorkersAlive && !launched) {
        // 拿到一个活着的worker
        val worker = shuffledAliveWorkers(curPos)
        // 遍历过的worker加1
        numWorkersVisited += 1
        // 如果当前这个worker的空闲内存量大于等于driver需要的内存
        // 并且worker的空闲cpu数量,大于等于driver需要的cpu数量
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 启动driver
          launchDriver(worker, driver)
          // 并且将driver从waitingDrivers队列中移除,后面就不会调度它了,把launched设为true。
          waitingDrivers -= driver
          launched = true
        }
        // 将指针指向下一个worker
        curPos = (curPos + 1) % numWorkersAlive
        // 这个driver去循环遍历所有活着的worker,只要launched为true,表明当前这个driver已经在某个worker启动。
      }
    }

    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    // Application的调度机制(核心之核心)
    // Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps
    // val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores

      // 首先遍历waitingApps中的ApplicationInfo,并且过滤出还有需要调度的core的Application
      for (app <- waitingApps if app.coresLeft > 0) {
        // 从workers中,过滤出状态为ALIVE的,再次过滤出可以被Application使用的Worker,
        // 然后,按照剩余cpu数量倒序排序
        // canUse,worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        // 创建一个空数组,存储了要分配给每个worker的cpu数量
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        // 获取到底要分配多少cpu,取app剩余要分配的cpu数量和worker总共可用cpu数量的最小值
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

        // 通过这种算法,其实会将每个application,要启动的executor,都平均分布到各个worker上去
        // 比如有20个cpu core要分配,那么实际会循环两遍worker,每次循环,给每个worker分配1个core
        // 最后每个worker分配了2个core

        // while条件,只要要分配的cpu,还没分配完,就继续循环
        var pos = 0
        while (toAssign > 0) {
          // 每一个worker,如果空闲的cpu数量,大于已经分配出去的cpu数量
          // 也就是说,worker还有可分配的cpu
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            // 将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了
            toAssign -= 1
            // 给这个worker分配的cpu数量,加1
            assigned(pos) += 1
          }
          // 指针移动到下一个worker
          pos = (pos + 1) % numUsable
        }
        // Now that we've decided how many cores to give on each node, let's actually give them

        // 给每个worker分配完application要求的cpu core之后
        // 遍历worker
        for (pos <- 0 until numUsable) {
          // 只要判断之前给这个worker分配到了core
          if (assigned(pos) > 0) {
            // 首先,在application内部缓存结构中,添加executor
            // 并且创建ExecutorDesc对象,其中封装了,给这个executor分配多个cpu core
            // 这里至少是spark1.3.0版本的executor启动的内部机制
            // 在spark-submit脚本中,可以指定要多少个executor,每个executor多少个cpu,多少内存
            // 基于我们的机制,实际上,最后,executor的实际数量,以及每个executor的cpu,可能与配置是不一样的
            // 因为,我们这里是基于总的cpu来分配的,就是说,比如要求3个executor,每个要3个cpu,
            // 比如有9个worker,每个有1个cpu,那么其实总共知道,要分配9个core,其实根据这种算法,
            // 会给每个worker分配一个core,然后,给每个worker启动一个executor吧,最后,
            // 会启动9个executor,每个executor有1个cpu core
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            // 那么就在worker上启动executor
            launchExecutor(usableWorkers(pos), exec)
            // 将application的状态设置为RUNNING
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      // 非spreadOutApps调度算法

      // 这种算法和spreadOutApps算法正好相反
      // 每个application,都尽可能分配到尽量少的worker上去,
      // 比如总共有10个worker,每个有10个core,app总共要分配20个core,那么,其实
      // 只会分配到两个worker上,每个worker都占满10个core,那么其余的app,就只能分配到下一个worker了。
      // 所以,比方说,application,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,
      // 但是在这种算法下,其实只会启动2个executor,每个有10个core。

      // 将每一个Application,尽可能少的分配到worker上去
      // 首先遍历worker,并且是状态为ALIVE,还有空闲cpu的worker
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        // 遍历application,并且是还有需要分配的core的application
        for (app <- waitingApps if app.coresLeft > 0) {
          // 判断,如果当前这个worker可以被application使用
          if (canUse(app, worker)) {
            // 取worker剩余cpu数量,与app要分配的cpu数量的最小值
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              // 给app添加一个executor
              val exec = app.addExecutor(worker, coresToUse)
              // 在worker上启动executor
              launchExecutor(worker, exec)
              // 将application状态修改为RUNNING
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

launchDriver方法,

def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // 将driver键入worker内部的缓存结构
    // 将worker内使用的内存和cpu数量,都加上driver需要的内存和cpu数量
    worker.addDriver(driver)
    // 同时把worker加入到driver内部的缓存结构中
    driver.worker = Some(worker)
    // 然后调用worker的actor,给它发送LaunchDriver,让Worker来启动Driver
    worker.actor ! LaunchDriver(driver.id, driver.desc)
    // 将driver的状态设置为RUNNING
    driver.state = DriverState.RUNNING
  }

launchExecutor方法,

def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    // 将executor加入worker内部的缓存
    worker.addExecutor(exec)
    // 向worker的actor发送LaunchExecutor消息
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    // 向executor对应的application的driver,发送ExecutorAdded消息
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo


微信公众号「padluo」,分享数据科学家的自我修养,既然遇见,不如一起成长。

数据分析


读者交流电报群

https://t.me/sspadluo


知识星球交流群

知识星球读者交流群

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!
Sort Order:  

@padluo, 投稿到cn-reader的你肯定是文学爱好者!