【Scala三】分析Spark源代码总结的Scala语法一

编程技术  /  houtizong 发布于 3年前   129

Scala语法

1. classOf运算符

Scala中的classOf[T]是一个class对象,等价于Java的T.class,比如classOf[TextInputFormat]等价于TextInputFormat.class

 

 2. 方法默认值

defaultMinPartitions就是一个默认值,类似C++的方法默认值

 

 

  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],      minPartitions).map(pair => pair._2.toString).setName(path)  }

 

3. 函数常量

如下map方法的参数pair => pair._2.toString表示一个函数常量,pair是调用map方法的集合的每个元素,

 

 

  def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],      minPartitions).map(pair => pair._2.toString).setName(path)  }

 

4. Tuple类型

上例中,调用map方法的对象,即hadoopFile的返回值是Tuple类型,它是一个元素类型可以不同的元组,比如val t: Tuple = (A,2,Three),访问Three,则使用t._3进行操作

 

5. 范型

如下方法所示例子,RDD[(K,V)]是方法返回值类型,它的类型是RDD类,而这个RDD类包含的范型使用[]声明,而(K,V)是元组Tuple类,因此,RDD[(K,V)]表达的类型是RDD<K,V>

 

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {    reduceByKey(defaultPartitioner(self), func)  }

 

 

6. 函数类型

下面这个例子,func是个函数类型的对象引用,func也是对象,也有类型,在Scala中,func的类型使用参数输入和输出来指定的。

(V,V)=>V,表示func接受两个参数,类型为V,而输出也是类型为V,因此,(a:Int,b:Int)=>a*b,(a:Int,b:Int)=>a+b都属于这个类型的对象

  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {    reduceByKey(defaultPartitioner(self), func)  }

 

 7. 隐式函数以实现类型转换

 

7.1什么是隐式函数?

 

SparkContext中定义了如下的隐式函数,它的含义是将K,V类型的RDD(注:入参rdd是RDD[(K,V)])转换为PairRDDFunctions类型,它接受K,V类型的RDD作为构造入参

 

 

  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {    new PairRDDFunctions(rdd)  }
 

 

7.2 隐式函数何时被调用?

 

 

 val rdd = sc.textFile("file:///home/hadoop/spark1.2.0/word.txt") val rdd1 =  rdd.flatMap(_.split(" ")) val rdd2 = rdd1.map((_, 1)) //reduceByKey is not a member of org.apache.spark.rdd.RDD[(String, Int)],则需要import org.apache.spark.SparkContext._ val rdd3 = rdd2.reduceByKey(_ + _)
 

 

在上面的SparkWordCount例子中,reduceByKey方法并不是RDD的方法,而rdd2是RDD类型的对象,所以Scala必须提供一种机制,将这个方法的调用进行转换,这里就是隐式函数发挥作用的情况,

那么哪个隐式函数会用来做这个转换?在上面的注释中清楚的写到,为了使用reduceByKey方法,必须引入import org.apache.spark.SparkContext._,这里的意思就是通过这个import语句,将rdd2进行类型转换的函数引进来,更具体的是,也可以使用如下的import语句进行导入

 

 

import org.apache.spark.SparkContext.rddToPairRDDFunctions
 

 

在SparkContext中,将RDD[(K,V)]转换为其它类型的隐式函数只有rddToPairRDDFunctions,因此,此方法会自动调用,

  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) = {    new PairRDDFunctions(rdd)  }

rddToPairRDDFunctions函数将rdd转为为new PairRDDFunctions对象,那么rdd2.reduceByKey调用最终转换到了PairRDDFunctions的reduceByKey函数的调用。如下方法定义于PairRDDFunctions类中。

reduceByKey(_ + _)匹配到func:(V,V)=>V

 

  /**   * Merge the values for each key using an associative reduce function. This will also perform   * the merging locally on each mapper before sending results to a reducer, similarly to a   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/   * parallelism level.   */  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {    reduceByKey(defaultPartitioner(self), func)  }

 

 8. T*定义变长参数和定义_作为placeholder

在下面的例子中,RDD[ _ ]*同时使用了两个符号_和*,*表示变长参数,也就是说,others是个变长的RDD集合。而_表示在那个位置上的占位符,RDD[ _ ]中的_表示类型占位符,只是类型不定,此处类似于Java的Object,实现万能类型匹配

 

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse    for (r <- bySize if r.partitioner.isDefined) {      return r.partitioner.get    }    if (rdd.context.conf.contains("spark.default.parallelism")) {      new HashPartitioner(rdd.context.defaultParallelism)    } else {      new HashPartitioner(bySize.head.partitions.size)    }  }}

 

9.Seq()操作

在上面的代码中,有Seq(rdd)方法调用,它的含义是将参数rdd集合化,即Seq(rdd)表示仅有一个元素rdd的集合,这个集合有个方法++,Seq(rdd) ++ others是将两个集合中的元素进行合并成一个集合,++也是两个集合的取集合并集的操作

 

10.带if条件判断的for循环

 在8中,有个for循环语句,bySize是个集合,也就是for循环在遍历bySize(存入到r变量中),但是是否每次遍历都进入代码块可以在for循环中使用if语句进行指定。

 

    for (r <- bySize if r.partitioner.isDefined) {      return r.partitioner.get    }

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客