【spark八十七】给定Driver Program, 如何判断哪些代码在Driver运行,哪些代码在Worker上执行

编程技术  /  houtizong 发布于 3年前   88

Driver Program是用户编写的提交给Spark集群执行的application,它包含两部分

  • 作为驱动: Driver与Master、Worker协作完成application进程的启动、DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等。
  • 计算逻辑本身,当计算任务在Worker执行时,执行计算逻辑完成application的计算任务

接下来的问题是,给定一个driver programming,哪些作为"驱动代码"在Driver进程中执行,哪些"任务逻辑代码”被包装到任务中,然后分发到计算节点进行计算?

 

1. 基本Spark driver application

package spark.examples.databricks.reference.apps.loganalysisimport org.apache.spark.{SparkContext, SparkConf}import org.apache.spark.SparkContext._object LogAnalyzer {  def main(args: Array[String]) {    val sparkConf = new SparkConf().setAppName("Log Analyzer in Scala").setMaster("local[3]")    val sc = new SparkContext(sparkConf)    //logFile path is provided by the program args    val logFile = if (args != null && args.length == 1) args(0) else "E:\\softwareInstalled\\Apache2.2\\logs\\access.log"    //transform each line of the logFile into ApacheAccessLog object,    //RDD[T],T is of type ApacheAccessLog    //Because it will be used more than more, so cache it.    ////从数据源中读取文本文件内容,每一行转换为ApacheAccessLog,然后进行cache    val accessLogs = sc.textFile(logFile).map(ApacheAccessLog.parseLogLine).cache()    // Calculate statistics based on the content size.    //Retrieve the contentSize column and cache it    val contentSizes = accessLogs.map(log => log.contentSize).cache()    ///reduce是个action,count是个action    println("Content Size Avg: %s, Min: %s, Max: %s".format(      contentSizes.reduce(_ + _) / contentSizes.count,      contentSizes.min,      contentSizes.max))    // Compute Response Code to Count.    //Take first 100 responseCode, no sort here    //take操作是个action    val responseCodeToCount = accessLogs      .map(log => (log.responseCode, 1))      .reduceByKey(_ + _)      .take(100)    println( s"""Response code counts: ${responseCodeToCount.mkString("[", ",", "]")}""")    // Any IPAddress that has accessed the server more than 10 times.    //take操作是个action    val ipAddresses = accessLogs      .map(log => (log.ipAddress, 1))      .reduceByKey(_ + _)      .filter(_._2 > 10) //Get the ipAddress that accessed the server 10+ times      .map(_._1) //Map to the IP Address column      .take(100)    println( s"""IPAddresses > 10 times: ${ipAddresses.mkString("[", ",", "]")}""")    // Top Endpoints.    //top操作是个action    val topEndpoints = accessLogs      .map(log => (log.endpoint, 1))      .reduceByKey(_ + _)      .top(10)(OrderingUtils.SecondValueOrdering)    println( s"""Top Endpoints: ${topEndpoints.mkString("[", ",", "]")}""")    sc.stop()  }}

 

Spark application首先在Driver上开始运行main函数,执行过程中。 计算逻辑的开始是从读取数据源(比如HDFS中的文件)创建RDD开始,RDD分为transform和action两种操作,transform使用的是懒执行,而action操作将会触发Job的提交。

Job的提交以为着DAG划分、计算任务封装、计算任务分发到各个计算节点(Worker)、计算资源的分配等,这是真正的将任务分发的开始。因此,从代码中首先要识别出哪些action。

另外一个问题, 任务执行过程中执行的逻辑代码如何识别?

Application在Driver上执行,遇到RDD的action动作后,开始提交作业,当作业执行完成后,后面的作业陆续提交,也就是说,虽然一个Spark Application可以有多个Job(每个action对应一个Job),这些Job是顺序执行的,Job(X)执行完成才会执行Job(X+1)。

 

遇到Job执行,比如上面的contentSize.reduce(_+_),那么所谓的计算逻辑就是函数 _+_,这是个求和操作。具体的任务逻辑执行时,还是从action回溯RDD,中间经过转换得到最终的这个Task所属的Partition的数据,然后执行reduce(_+_)

 

2. Spark Stream程序

 

package spark.examples.streamingimport java.sql.{PreparedStatement, Connection, DriverManager}import java.util.concurrent.atomic.AtomicIntegerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._//No need to call Class.forName("com.mysql.jdbc.Driver") to register Driver?object SparkStreamingForPartition {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("NetCatWordCount")    conf.setMaster("local[3]")    val ssc = new StreamingContext(conf, Seconds(5))    val dstream = ssc.socketTextStream("192.168.26.140", 9999)    //foreachRDD是DStream的动作函数,会触发Job执行,然后对一个时间间隔内创建的RDD进行处理。如果RDD执行RDD的动作函数,是否继续触发Job执行?    dstream.foreachRDD(rdd => {      //embedded function      def func(records: Iterator[String]) {        var conn: Connection = null        var stmt: PreparedStatement = null        try {          val url = "jdbc:mysql://192.168.26.140:3306/person";          val user = "root";          val password = ""          conn = DriverManager.getConnection(url, user, password)          records.flatMap(_.split(" ")).foreach(word => {            val sql = "insert into TBL_WORDS(word) values (?)";            stmt = conn.prepareStatement(sql);            stmt.setString(1, word)            stmt.executeUpdate();          })        } catch {          case e: Exception => e.printStackTrace()        } finally {          if (stmt != null) {            stmt.close()          }          if (conn != null) {            conn.close()          }        }      }      ///对RDD进行重新分区,以改变处理的并行度      val repartitionedRDD = rdd.repartition(3)      ///对每个分区调用func函数,func函数的参数就是一个分区对应的数据的遍历器(Iterator)      repartitionedRDD.foreachPartition(func)    })    ssc.start()    ssc.awaitTermination()  }}

 

DStream的foreachRDD是个Output Operation,类似于RDD的action,因此,高阶函数foreachRDD的函数参数,将在worker上执行。这里的func是定义为foreachRDD参数函数的内部函数,因此会发送到Worker上执行,如果func定义在最外层,比如作为main函数的直接内部函数,是否可以顺利的从Driver序列化到Worker上呢?我认为是可以的。函数在Scala中只是一个普通的对象,没有状态,序列化反序列化时需要创建MySQL的Connection。

 

3. foreachRDD中的rdd继续执行action算子

 

package spark.examples.streamingimport java.sql.{PreparedStatement, Connection, DriverManager}import java.util.concurrent.atomic.AtomicIntegerimport org.apache.spark.SparkConfimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming._import org.apache.spark.streaming.StreamingContext._object SparkStreamingForPartition2 {  def main(args: Array[String]) {    val conf = new SparkConf().setAppName("NetCatWordCount")    conf.setMaster("local[3]")    val ssc = new StreamingContext(conf, Seconds(5))    val dstream = ssc.socketTextStream("192.168.26.140", 9999)    dstream.foreachRDD(rdd => {      //对于空RDD调用flatMap报错,这里进行判断      if (!rdd.isEmpty()) {        ///RDD的Record个数        val recordCount = rdd.count()        ///RDD中的单词数        val wordCount = rdd.flatMap(_.split(" ")).map(word => 1).reduce(_ + _)        println("recordCount: =" + recordCount + "," + "wordCount:=" + wordCount)      } else {        println("Empty RDD, No Data")      }    })    ssc.start()    ssc.awaitTermination()  }}

 

 需要注意的是,对于没有任何元素的RDD(RDD.isEmpty为true),那么不能执行转换算子如flatMap等,这在Spark Streaming中读取空的RDD是很常见的情况?但是这个空检查是在RDD的reduce函数中执行的

 

  def reduce(f: (T, T) => T): T = {    val cleanF = sc.clean(f)    val reducePartition: Iterator[T] => Option[T] = iter => {      if (iter.hasNext) {        Some(iter.reduceLeft(cleanF))      } else {        None      }    }    var jobResult: Option[T] = None    val mergeResult = (index: Int, taskResult: Option[T]) => {      if (taskResult.isDefined) {        jobResult = jobResult match {          case Some(value) => Some(f(value, taskResult.get))          case None => taskResult        }      }    }    sc.runJob(this, reducePartition, mergeResult)    // Get the final result out of our Option, or throw an exception if the RDD was empty    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))  }

 

如下操作对于空RDD是可以的,返回0

  rdd.flatMap(_.split(" ")).map(_ => 1).count()

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客