【Spark八十四】Spark零碎知识点记录

编程技术  /  houtizong 发布于 3年前   90

1. ShuffleMapTask的shuffle数据在什么地方记录到MapOutputTracker中的

ShuffleMapTask的runTask方法负责写数据到shuffle map文件中。当任务执行完成成功,DAGScheduler会收到通知,在DAGScheduler的handleTaskCompletion方法中完成记录到MapOutputTracker中

 

    event.reason match {      case Success =>        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,          event.reason, event.taskInfo, event.taskMetrics))        stage.pendingTasks -= task        task match {          case rt: ResultTask[_, _] =>            stage.resultOfJob match {              case Some(job) =>                if (!job.finished(rt.outputId)) {                  updateAccumulators(event)                  job.finished(rt.outputId) = true                  job.numFinished += 1                  // If the whole job has finished, remove it                  if (job.numFinished == job.numPartitions) {                    markStageAsFinished(stage)                    cleanupStateForJobAndIndependentStages(job)                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))                  }                  // taskSucceeded runs some user code that might throw an exception. Make sure                  // we are resilient against that.                  try {                    job.listener.taskSucceeded(rt.outputId, event.result)                  } catch {                    case e: Exception =>                      // TODO: Perhaps we want to mark the stage as failed?                      job.listener.jobFailed(new SparkDriverExecutionException(e))                  }                }              case None =>                logInfo("Ignoring result from " + rt + " because its job has finished")            }          case smt: ShuffleMapTask =>            updateAccumulators(event)            ///从通知事件中获得MapStatus对西那个            val status = event.result.asInstanceOf[MapStatus]            ////ExecutorId            val execId = status.location.executorId            logDebug("ShuffleMapTask finished on " + execId)            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {              logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)            } else {              stage.addOutputLoc(smt.partitionId, status)            }            if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {              markStageAsFinished(stage)              logInfo("looking for newly runnable stages")              logInfo("running: " + runningStages)              logInfo("waiting: " + waitingStages)              logInfo("failed: " + failedStages)              if (stage.shuffleDep.isDefined) {                // We supply true to increment the epoch number here in case this is a                // recomputation of the map outputs. In that case, some nodes may have cached                // locations with holes (from when we detected the error) and will need the                // epoch incremented to refetch them.                // TODO: Only increment the epoch number if this is not the first time                //       we registered these map outputs.               ///在此处将MapOutput注册到mapOutputTracker中               mapOutputTracker.registerMapOutputs(                  stage.shuffleDep.get.shuffleId,                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,                  changeEpoch = true)              }              clearCacheLocs()              if (stage.outputLocs.exists(_ == Nil)) {                // Some tasks had failed; let's resubmit this stage                // TODO: Lower-level scheduler should also deal with this                logInfo("Resubmitting " + stage + " (" + stage.name +                  ") because some of its tasks had failed: " +                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))                submitStage(stage)              } else {                val newlyRunnable = new ArrayBuffer[Stage]                for (stage <- waitingStages) {                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))                }                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {                  newlyRunnable += stage                }                waitingStages --= newlyRunnable                runningStages ++= newlyRunnable                for {                  stage <- newlyRunnable.sortBy(_.id)                  jobId <- activeJobForStage(stage)                } {                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")                  submitMissingTasks(stage, jobId)                }              }            }          }

 

2. ShuffleMapTask在写shuffle map数据时(调用SortShuffleWriter.write方法),首先写内存,当内存不够使用时,将spill到磁盘;

 

 override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {    if (dep.mapSideCombine) {      require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")      sorter = new ExternalSorter[K, V, C](        dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)      sorter.insertAll(records) ///Spill到磁盘    } else {      // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't      // care whether the keys get sorted in each partition; that will be done on the reduce side      // if the operation being run is sortByKey.      sorter = new ExternalSorter[K, V, V](        None, Some(dep.partitioner), None, dep.serializer)      sorter.insertAll(records)    }    val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)    val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)    val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile) ///写到磁盘文件中    shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)    mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)  }

 

 

3. ResultTask在都去ShuffledRDD中的数据时(通过调用HashShufflerReader),首先读取到内存,当内存不够使用时,将spill到磁盘

 

 

 

override def read(): Iterator[Product2[K, C]] = {    val ser = Serializer.getSerializer(dep.serializer)    ///将shuffle数据转换成可遍历的Iterator对象    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser)    val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) {    ///从Mapper端读取数据前,做Combine    ///combine时,可能会spill到磁盘      if (dep.mapSideCombine) {        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))       } else {        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))      }    } else {      require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")      // Convert the Product2s to pairs since this is what downstream RDDs currently expect      iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))    }    // Sort the output if there is a sort ordering defined.    ///对output排序,可能spill到磁盘    dep.keyOrdering match {      case Some(keyOrd: Ordering[K]) =>        // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled,        // the ExternalSorter won't spill to disk.        val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser))        sorter.insertAll(aggregatedIter)        context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled        context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled        sorter.iterator      case None =>        aggregatedIter    }  }
 

 

4. 任务本地性处理

a.DriverActor收到

 

 

 

 

 

 

 

 

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客