【Spark九十八】Standalone Cluster Mode下的资源调度源代码分析

编程技术  /  houtizong 发布于 3年前   127

在分析源代码之前,首先对Standalone Cluster Mode的资源调度有一个基本的认识:

首先,运行一个Application需要Driver进程和一组Executor进程。在Standalone Cluster Mode下,Driver和Executor都是在Master的监护下给Worker发消息创建(Driver进程和Executor进程都需要分配内存和CPU,这就需要Master进行资源的调度)。

另外,在为Application的Executor进程s分配CPU内核时,需要考虑CPU内核是尽可能的分散到所有的Worker上分配,还是尽可能在尽量少的Worker上分配,这是有计算任务的特点决定的,是数据密集型还是计算密集型(比如求PI值)

 

源代码:

 

/**   * Schedule the currently available resources among waiting apps. This method will be called   * every time a new app joins or resource availability changes.   */  private def schedule() {    if (state != RecoveryState.ALIVE) { return }    // First schedule drivers, they take strict precedence over applications    // Randomization helps balance drivers    //首先为已经提交Application,但是Application对应的Driver进程还没有启动的应用分配Driver资源    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))    val numWorkersAlive = shuffledAliveWorkers.size    var curPos = 0    ///waitingDrivers列表保存了已经提交Application,但是Application对应的Driver进程还没有启动的DriverInfo信息,每个元素是DriverInfo类型    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we      // start from the last worker that was assigned a driver, and continue onwards until we have      // explored all alive workers.      var launched = false      var numWorkersVisited = 0      //对于给定的待申请资源的Driver,遍历所有可用的Worker,寻找满足条件的Worker(CPU和内存足够)      while (numWorkersVisited < numWorkersAlive && !launched) {        //获取第curPos号的可用Worker        val worker = shuffledAliveWorkers(curPos)        numWorkersVisited += 1        ////如果该worker满足Driver的资源需求,则启动Driver进程,并且设置为已经动(launched=true)        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {          launchDriver(worker, driver)          waitingDrivers -= driver          launched = true        }        ///判断下一个可用的Worker        curPos = (curPos + 1) % numWorkersAlive      }    }    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app    // in the queue, then the second app, etc.    //将Executor尽量分散到尽可能多的Worker上    if (spreadOutApps) {      // Try to spread out each app among all the nodes, until it has all its cores      for (app <- waitingApps if app.coresLeft > 0) {        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)          .filter(canUse(app, _)).sortBy(_.coresFree).reverse        val numUsable = usableWorkers.length        //assigned数组记录每个Worker上已经分配的CPU内核数        val assigned = new Array[Int](numUsable) // Number of cores to give on each node                //toAssign表示为该app将要分配的CPU内核数,它是app申请的内核数和所有Worker的剩余CPU内核数的较小值        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)        var pos = 0        while (toAssign > 0) {          //遍历所有的可用Worker,一次分配一个CPU内核,记录在assigned数组中          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {            toAssign -= 1            assigned(pos) += 1          }          pos = (pos + 1) % numUsable        }        // Now that we've decided how many cores to give on each node, let's actually give them        //根据assigned数组中记录的每个Worker要分配的CPU内核数,进行实际的分配        for (pos <- 0 until numUsable) {          if (assigned(pos) > 0) {            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))            //启动给定指定内核数的Executor进程(实际上是CoarceGrainedExecutorBackend进程)            launchExecutor(usableWorkers(pos), exec)            app.state = ApplicationState.RUNNING          }        }      }    } else {      // Pack each app into as few nodes as possible until we've assigned all its cores      //将Executor的分配集中在尽可能上的Worker上,外围循环是Worker,内层循环是Application      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {                for (app <- waitingApps if app.coresLeft > 0) {          if (canUse(app, worker)) {//针对该Application,如果可以分配资源,            //分配的内核数是该Worker可用的内核数与App申请的内核数的最小值            val coresToUse = math.min(worker.coresFree, app.coresLeft)            if (coresToUse > 0) {              //要么Worker的内核数全部分配,要么app需要的内核数得到满足              val exec = app.addExecutor(worker, coresToUse)              launchExecutor(worker, exec)              app.state = ApplicationState.RUNNING            }          }        }      }    }  }

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客