【Spark八十】Spark RDD API二

编程技术  /  houtizong 发布于 3年前   77

coGroup

package spark.examples.rddapiimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.SparkContext._object CoGroupTest_05 {  def main(args: Array[String]) {    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")    val sc = new SparkContext(conf);    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (11, "F"), (81, "Y"), (77, "Z"), (31, "X")), 3)    val z2 = sc.parallelize(List((4, "ABC"), (6, "B2"), (7, "Z2"), (7, "Z3"), (91, "E"), (11, "FF"), (88, "N"), (77, "S"), (36, "M")), 4)    //隐式函数,定义于PairRDDFunctions    //结果由两个(至多四个)RDD的Key组成,(Key,(ValuesOfRDD1Seq, ValuesOfRDD2Seq, ValuesOfRDD3Seq))    //cogroup [W]( other : RDD [(K, W)]): RDD [(K, (Seq [V], Seq [W]))]    //cogroup [W1 , W2 ]( other1 : RDD [(K, W1)], other2 : RDD [(K, W2)]): RDD [(K , (Seq[V], Seq[W1], Seq[W2 ]))]    val r = z1.cogroup(z2)    r.collect.foreach(println)    /*Result:,(4,(CompactBuffer(),CompactBuffer(ABC)))(36,(CompactBuffer(),CompactBuffer(M)))(88,(CompactBuffer(),CompactBuffer(N)))(81,(CompactBuffer(Y),CompactBuffer()))(77,(CompactBuffer(Z),CompactBuffer(S)))(9,(CompactBuffer(E),CompactBuffer()))(6,(CompactBuffer(B1),CompactBuffer(B2)))(11,(CompactBuffer(F),CompactBuffer(FF)))(3,(CompactBuffer(A),CompactBuffer()))(7,(CompactBuffer(Z1),CompactBuffer(Z2, Z3)))(91,(CompactBuffer(),CompactBuffer(E)))(31,(CompactBuffer(X),CompactBuffer()))     */  }}

 

groupBy

package spark.examples.rddapiimport org.apache.spark.{Partitioner, SparkContext, SparkConf}object GroupByTest_06 {  def main(args: Array[String]) {    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")    val sc = new SparkContext(conf);    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)    /**     * Return an RDD of grouped items. Each group consists of a key and a sequence of elements     * mapping to that key. The ordering of elements within each group is not guaranteed, and     * may even differ each time the resulting RDD is evaluated.     *     * Note: This operation may be very expensive. If you are grouping in order to perform an     * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]     * or [[PairRDDFunctions.reduceByKey]] will provide much better performance.     */    //  def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] =  groupBy[K](f, defaultPartitioner(this))    //根据指定的函数进行分组,分组得到的集合的元素类型是(K,V),K是分组函数的返回值,V是组内元素列表    val r = z1.groupBy(x => if (x._1 % 2 == 0) "even" else "odd")    r.collect().foreach(println)    //结果:    /*    (even,CompactBuffer((6,B1)))   (odd,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X)))     */    //Partitioner是HashPartitioner    val r2 = z1.groupBy(_._1 % 2)    r2.collect().foreach(println)    //结果:    /*    (0,CompactBuffer((6,B1)))    (1,CompactBuffer((3,A), (7,Z1), (9,E), (7,F), (9,Y), (77,Z), (31,X)))    */    class MyPartitioner extends Partitioner {      override def numPartitions = 3      def getPartition(key: Any): Int = {        key match {          case null => 0          case key: Int => key % numPartitions          case _ => key.hashCode % numPartitions        }      }      override def equals(other: Any): Boolean = {        other match {          case h: MyPartitioner => true          case _ => false        }      }    }    println("=======================GroupBy with Partitioner====================")    //分组的同时进行分区;分区的key是分组函数的计算结果?    val r3 = z1.groupBy((x:(Int, String)) => x._1, new MyPartitioner())    r3.collect().foreach(println)    /*    //6,3,9一个分区,7,31一个分区,77一个分区    (6,CompactBuffer((6,B1)))    (3,CompactBuffer((3,A)))    (9,CompactBuffer((9,E), (9,Y)))    (7,CompactBuffer((7,Z1), (7,F)))    (31,CompactBuffer((31,X)))    (77,CompactBuffer((77,Z)))    */  }}

 

 collect

 

 

package spark.examples.rddapiimport org.apache.spark.{SparkContext, SparkConf}object CollectTest_07 {  def main(args: Array[String]) {    val conf = new SparkConf().setMaster("local").setAppName("CoGroupTest_05")    val sc = new SparkContext(conf);    val z1 = sc.parallelize(List((3, "A"), (6, "B1"), (7, "Z1"), (9, "E"), (7, "F"), (9, "Y"), (77, "Z"), (31, "X")), 3)    /**     * Return an array that contains all of the elements in this RDD.     */    //这是一个行动算子    z1.collect().foreach(println)    /**     * Return an RDD that contains all matching values by applying `f`.     */    //    def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {    //      filter(f.isDefinedAt).map(f)    //    }//    val f  = {//      case x: (Int, String) => x//    }//    val z2 = z1.collect(f)//    println(z2)  }}

 

RDD有个toArray方法,已经不推荐使用了,推荐使用collect方法

 

 

 

 

 

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客