【Spark八十八】Spark Streaming累加器操作(updateStateByKey)

编程技术  /  houtizong 发布于 3年前   317

在实时计算的实际应用中,有时除了需要关心一个时间间隔内的数据,有时还可能会对整个实时计算的所有时间间隔内产生的相关数据进行统计。

比如: 对Nginx的access.log实时监控请求404时,有时除了需要统计某个时间间隔内出现的次数,有时还需要统计一整天出现了多少次404,也就是说404监控横跨多个时间间隔。

 

Spark Streaming的解决方案是累加器,工作原理是,定义一个类似全局的可更新的变量,每个时间窗口内得到的统计值都累加到上个时间窗口得到的值,这样这个累加值就是横跨多个时间间隔

 

package spark.examples.streamingimport org.apache.spark.SparkConfimport org.apache.spark.HashPartitionerimport org.apache.spark.streaming._/** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second starting with initial value of word count. * Usage: StatefulNetworkWordCount <hostname> <port> * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive * data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example * org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999` */object StatefulNetworkWordCount {  def main(args: Array[String]) {    ///函数常量定义,返回类型是Some(Int),表示的含义是最新状态    ///函数的功能是将当前时间间隔内产生的Key的value集合,加到上一个状态中,得到最新状态    val updateFunc = (values: Seq[Int], state: Option[Int]) => {      val currentCount = values.sum      val previousCount = state.getOrElse(0)      Some(currentCount + previousCount)    }    ///入参是三元组遍历器,三个元组分别表示Key、当前时间间隔内产生的对应于Key的Value集合、上一个时间点的状态    ///newUpdateFunc的返回值要求是iterator[(String,Int)]类型的    val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {      ///对每个Key调用updateFunc函数(入参是当前时间间隔内产生的对应于Key的Value集合、上一个时间点的状态)得到最新状态      ///然后将最新状态映射为Key和最新状态      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))    }    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount").setMaster("local[3]")    // Create the context with a 5 second batch size    val ssc = new StreamingContext(sparkConf, Seconds(5))    ssc.checkpoint(".")    // Initial RDD input to updateStateByKey    val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))    // Create a ReceiverInputDStream on target ip:port and count the    // words in input stream of \n delimited test (eg. generated by 'nc')    val lines = ssc.socketTextStream("192.168.26.140", 9999)    val words = lines.flatMap(_.split(" "))    val wordDstream = words.map(x => (x, 1))        // Update the cumulative count using updateStateByKey    // This will give a Dstream made of state (which is the cumulative count of the words)    //注意updateStateByKey的四个参数,第一个参数是状态更新函数    val stateDstream = wordDstream.updateStateByKey[Int](newUpdateFunc,      new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)    stateDstream.print()    ssc.start()    ssc.awaitTermination()  }}

 

上面的核心操作时DStream的updateStateByKey函数操作,它接受四个参数,

 

2. DStream.updateStateByKey剖析

方法说明:

 /**   * Return a new "state" DStream where the state for each key is updated by applying   * the given function on the previous state of the key and the new values of each key.   * org.apache.spark.Partitioner is used to control the partitioning of each RDD.   * @param updateFunc State update function. Note, that this function may generate a different   *                   tuple with a different key than the input key. Therefore keys may be removed   *                   or added in this way. It is up to the developer to decide whether to   *                   remember the  partitioner despite the key being changed.   * @param partitioner Partitioner for controlling the partitioning of each RDD in the new   *                    DStream   * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.   * @param initialRDD initial state value of each key.   * @tparam S State type   */  def updateStateByKey[S: ClassTag](      updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],      partitioner: Partitioner,       rememberPartitioner: Boolean,      initialRDD: RDD[(K, S)]    ): DStream[(K, S)] = {     new StateDStream(self, ssc.sc.clean(updateFunc), partitioner,       rememberPartitioner, Some(initialRDD))  }
  • initialRDD是(K,S)类型的RDD,它表示一组Key的初始状态,每个(K,S)表示一个Key以及它对应的State状态。 K表示updateStateByKey的Key的类型,比如String,而S表示Key对应的状态(State)类型,在上例中,是Int
  • rememberPartitioner: 表示是否在接下来的Spark Streaming执行过程中产生的RDD使用相同的分区算法
  • partitioner: 分区算法,上例中使用的Hash分区算法,分区数为ssc.sparkContext.defaultParallelism
  • updateFunc是函数常量,类型为(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],表示状态更新函数

(Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]如何解读?

入参: 三元组迭代器,三元组中K表示Key,Seq[V]表示一个时间间隔中产生的Key对应的Value集合(Seq类型,需要对这个集合定义累加函数逻辑进行累加),Option[S]表示上个时间间隔的累加值(表示这个Key上个时间点的状态)

出参:二元组迭代器,二元组中K表示Key,S表示当前时间点执行结束后,得到的累加值(即最新状态)

 

 

 

 

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客