【Spark八十六】Spark Streaming之DStream vs. InputDStream

编程技术  /  houtizong 发布于 3年前   100

 

1. DStream的类说明文档:

 

/** * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous * sequence of RDDs (of the same type) representing a continuous stream of data (see * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs). * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume, * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by * transforming existing DStreams using operations such as `map`, * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream * periodically generates a RDD, either from live data or by transforming the RDD generated by a * parent DStream. * * This class contains the basic operations available on all DStreams, such as `map`, `filter` and * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and * `join`. These operations are automatically available on any DStream of pairs * (e.g., DStream[(Int, Int)] through implicit conversions. * * DStreams internally is characterized by a few basic properties: *  - A list of other DStreams that the DStream depends on *  - A time interval at which the DStream generates an RDD *  - A function that is used to generate an RDD after each time interval */

 

从上面可以看出来,

1. DStream每隔一段时间仅产生一个RDD

2. DStream如同RDD,也有依赖关系,比如通过转换算子,把一个DStream转换成另外一个DStream

3. DStream需要有一个方法,在指定的时间间隔到达后,用于创建RDD。这个方法同RDD计算它包含的数据行一样,是compute方法

  /** Method that generates a RDD for the given time */  def compute (validTime: Time): Option[RDD[T]]

 

2. DStream的实现类ReceiverInputDStream实现的compute方法

 

 /**   * Generates RDDs with blocks received by the receiver of this stream. */ ///根据在一个时间范围内获取到的数据创建RDD(这些接收到的数据由BlockManager管理)  override def compute(validTime: Time): Option[RDD[T]] = {    val blockRDD = {      ////如果时间不在要计算的范围内,则返回空RDD(具体是BlockRDD)      if (validTime < graph.startTime) {        // If this is called for any time before the start time of the context,        // then this returns an empty RDD. This may happen when recovering from a        // driver failure without any write ahead log to recover pre-failure data.        new BlockRDD[T](ssc.sc, Array.empty)      } else {        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream        // for this batch        //根据validTime获取Block信息,validTime和Block的对应关系放在一个Map中,因此,validTime是Block的索引,        //因此可以把validTime看成是一个块创建的时间点(此后,一个时间间隔的数据都写入这个块中)        //id is an unique identifier for the receiver input stream.        val blockInfos =          ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)        //对blockInfos中的每个元素(ReceivedBlockInfo)调用blockStoreResult中        val blockStoreResults = blockInfos.map { _.blockStoreResult }        val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray        // Check whether all the results are of the same type        val resultTypes = blockStoreResults.map { _.getClass }.distinct        if (resultTypes.size > 1) {          logWarning("Multiple result types in block information, WAL information will be ignored.")        }        // If all the results are of type WriteAheadLogBasedStoreResult, then create        // WriteAheadLogBackedBlockRDD else create simple BlockRDD.        if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {          val logSegments = blockStoreResults.map {            _.asInstanceOf[WriteAheadLogBasedStoreResult].segment          }.toArray          // Since storeInBlockManager = false, the storage level does not matter.          new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,            blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)        } else {          ///构造BlockRDD的参数是blockIds,也就是这个RDD包含的数据存放在BlockManager中,可以由blockId读取指定的块          new BlockRDD[T](ssc.sc, blockIds)        }      }    }    Some(blockRDD)  }

 

3. InputDStream的类说明文档

/** * This is the abstract base class for all input streams. This class provides methods * start() and stop() which is called by Spark Streaming system to start and stop receiving data. * Input streams that can generate RDDs from new data by running a service/thread only on * the driver node (that is, without running a receiver on worker nodes), can be * implemented by directly inheriting this InputDStream. For example, * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for * new files and generates RDDs with the new files. For implementing input streams * that requires running a receiver on the worker nodes, use * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class. * * @param ssc_ Streaming context that will execute this input stream */

 

InputDStream定义了接收和停止接收数据的方法(start和stop),InputDStream的直接实现者不需要定义专门的Receiver,它在Driver上即可完成生成RDD的任务。

比如FileInputDStream即是这样一个InputDStream,它监听本地或者HDFS上的新文件,然后生成RDD。

问题:如此说来,FileInputDStream是一个没有实现分布式的数据接收器?

 

4. ReceiverInputDStream

 

/** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] * that has to start a receiver on worker nodes to receive external data. * Specific implementations of ReceiverInputDStream must * define `the getReceiver()` function that gets the receiver object of type * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */

 

 ReceiverInputDStream的实现者需要实现getReceiver来回去数据接收器,这些Reciver将运行在所有的Worker节点上。

 

5. 总结

从上面的分析中,DStream是InputDStream的父类,而InputDStream是ReceiverInputDStream的父类。

DStream相对于InputDStream,更像是个RDD,即只是一个包含接收数据的一系列的集合,即DStream是一个数据集。

InputDStream在此基础之上需要实现启动接收和停止接收的逻辑,也就是,InputDStream更像是自来水管的水龙头开关,而DStream是流出来的自来水。

ReceiverInputStream将接收数据的逻辑进行分布式接收

 

 

 

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客