【Scala四】分析Spark源代码总结的Scala语法二

编程技术  /  houtizong 发布于 3年前   80

1. Some操作

 

在下面的代码中,使用了Some操作:if (self.partitioner == Some(partitioner)),那么Some(partitioner)表示什么含义?首先partitioner是方法combineByKey传入的变量,

Some的文档说明:

 

/** Class `Some[A]` represents existing values of type *  `A`. * *  @author  Martin Odersky *  @version 1.0, 16/07/2003 */final case class Some[+A](x: A) extends Option[A] {  def isEmpty = false  def get = x}

 

Some(partitioner)表达什么含义?它是Some类型的对象,包装了partitioner对象。self是RDD的partitioner属性,它是Option类型.

 

Some(partitioner) == Some(partitioner)为true

Option(partitioner) == Some(partitioner)为false

 

 

  /**   * Generic function to combine the elements for each key using a custom set of aggregation   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C   * Note that V and C can be different -- for example, one might group an RDD of type   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:   *   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)   * - `mergeCombiners`, to combine two C's into a single one.   *   * In addition, users can control the partitioning of the output RDD, and whether to perform   * map-side aggregation (if a mapper can produce multiple items with the same key).   */  def combineByKey[C](createCombiner: V => C,      mergeValue: (C, V) => C,      mergeCombiners: (C, C) => C,      partitioner: Partitioner,      mapSideCombine: Boolean = true,      serializer: Serializer = null): RDD[(K, C)] = {    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0    if (keyClass.isArray) {      if (mapSideCombine) { //对于array keys,不能使用 map-side combining        throw new SparkException("Cannot use map-side combining with array keys.")      }      if (partitioner.isInstanceOf[HashPartitioner]) {//对于array keys,不能使用默认的HashPartioner        throw new SparkException("Default partitioner cannot partition array keys.")      }    }    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)    if (self.partitioner == Some(partitioner)) {       self.mapPartitions(iter => {        val context = TaskContext.get()        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))      }, preservesPartitioning = true)    } else {      new ShuffledRDD[K, V, C](self, partitioner)        .setSerializer(serializer)        .setAggregator(aggregator)        .setMapSideCombine(mapSideCombine)    }  }

 

2. self变量

上面的代码中用了self变量,由于没看清楚,这个self不是想象中的关键字,而是PairRDDFunctions的构造属性(成为类属性),self是PairRDDFunctions隐式类对应的本尊类(RDD[(K,V)])对应的对象,这应该是Scala的命名规范吧

 

3. 继承

 

1. 如下ShuffledRDD继承自RDD,在类的继承中,ShuffleRDD可以接受构造参数,而且可以提供给父类的构造参数,比如prev.context,Nil,prev本是ShuffledRDD的构造参数,而后传递给了它的父类RDD

2. RDD[ _ <: Product2[K,V] ],其中的<:表示什么含义?

 

/** * :: DeveloperApi :: * The resulting RDD from a shuffle (e.g. repartitioning of data). * @param prev the parent RDD. * @param part the partitioner used to partition the RDD * @tparam K the key class. * @tparam V the value class. * @tparam C the combiner class. */// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs@DeveloperApiclass ShuffledRDD[K, V, C](    @transient var prev: RDD[_ <: Product2[K, V]],    part: Partitioner)  extends RDD[(K, C)](prev.context, Nil){  .....}

  

 

 

4. Scala单例

object SparkHadoopUtil {  private val hadoop = {    val yarnMode = java.lang.Boolean.valueOf(        System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))    if (yarnMode) {      try {        Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")          .newInstance()          .asInstanceOf[SparkHadoopUtil]      } catch {       case e: Exception => throw new SparkException("Unable to load YARN support", e)      }    } else {      new SparkHadoopUtil    }  }  def get: SparkHadoopUtil = {    hadoop  }}

 

 5. 方法调用不写(),

在调用没有参数的方法时,可以不加().

    val outputFormatInstance = hadoopConf.getOutputFormat    val keyClass = hadoopConf.getOutputKeyClass //方法调用    val valueClass = hadoopConf.getOutputValueClass

 

 6. until表达式

 

  0 until rdd.partitions.size表达式返回的是一个集合,Seq[Int]。而for(i <= 0 until 10)表示的是0,9一共10次循环,即0 until 10先得到一个集合,使用for( i <= 集合)来遍历集合

  /**   * Run a job on all partitions in an RDD and return the results in an array.   */  def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {    runJob(rdd, func, 0 until rdd.partitions.size, false)  }

 

 7.ClassTag的用法

下面这个方法定义中,可以分析的东西很多,首先,方法saveAsHadoopFile有两个参数,字符串类型的path以及ClassTag[F]类型的fm,所不同的是saveAsHadoopFile带有两个()()。

fm的类型是ClassTag[T],而fm.runtimeClass.asInstanceOf[Class[F]]的含义是什么?

 

 

  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {    saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])  }

 

 8.(index, res) => results(index) = res表示什么含义?

将results(index)复制位res

 

  /**   * Run a function on a given set of partitions in an RDD and return the results as an array. The   * allowLocal flag specifies whether the scheduler can run the computation on the driver rather   * than shipping it out to the cluster, for short actions like first().   */  def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      allowLocal: Boolean      ): Array[U] = {    val results = new Array[U](partitions.size)    runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)    results  }

 上面runJob方法的定义是:

 

  /**   * Run a function on a given set of partitions in an RDD and pass the results to the given   * handler function. This is the main entry point for all actions in Spark. The allowLocal   * flag specifies whether the scheduler can run the computation on the driver rather than   * shipping it out to the cluster, for short actions like first().   */  def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      allowLocal: Boolean,      resultHandler: (Int, U) => Unit) {    if (dagScheduler == null) {      throw new SparkException("SparkContext has been shutdown")    }    val callSite = getCallSite    val cleanedFunc = clean(func)    logInfo("Starting job: " + callSite.shortForm)    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,      resultHandler, localProperties.get)    progressBar.foreach(_.finishAll())    rdd.doCheckpoint()  }

 

 9.Any类型,

Scala中,Any表示任意类型,也就是它类似于Java的Object类型。

如下代码取自MapOutputTracker.scala

 

  /**   * Send a message to the trackerActor and get its result within a default timeout, or   * throw a SparkException if this fails.   */  protected def askTracker(message: Any): Any = {    try {      val future = trackerActor.ask(message)(timeout)      Await.result(future, timeout)    } catch {      case e: Exception =>        logError("Error communicating with MapOutputTracker", e)        throw new SparkException("Error communicating with MapOutputTracker", e)    }  }

 

 

 

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客