【Spark九十九】Spark Streaming的batch interval时间内的数据流转源码分析

编程技术  /  houtizong 发布于 3年前   121

 

以如下代码为例(SocketInputDStream):

Spark Streaming从Socket读取数据的代码是在SocketReceiver的receive方法中,撇开异常情况不谈(Receiver有重连机制,restart方法,默认情况下在Receiver挂了之后,间隔两秒钟重新建立Socket连接),读取到的数据通过调用store(textRead)方法进行存储。数据的流转需要关注如下几个问题:

1. 数据存储到什么位置了

2. 数据存储的结构如何?

3. 数据什么时候被读取

4. 读取到的数据(batch interval)如何转换为RDD

 

1. SocketReceiver#receive

  /** Create a socket connection and receive data until receiver is stopped */  def receive() {    var socket: Socket = null    try {      logInfo("Connecting to " + host + ":" + port)      socket = new Socket(host, port)      logInfo("Connected to " + host + ":" + port)      val iterator = bytesToObjects(socket.getInputStream())      while(!isStopped && iterator.hasNext) {        store(iterator.next)      }      logInfo("Stopped receiving")      restart("Retrying connecting to " + host + ":" + port)    } catch {      case e: java.net.ConnectException =>        restart("Error connecting to " + host + ":" + port, e)      case t: Throwable =>        restart("Error receiving data", t)    } finally {      if (socket != null) {        socket.close()        logInfo("Closed socket to " + host + ":" + port)      }    }  }

 

2. SocketReceiver#receive=>SocketReceiver#store

 

  /**   * Store a single item of received data to Spark's memory.   * These single items will be aggregated together into data blocks before   * being pushed into Spark's memory.   */  def store(dataItem: T) {    executor.pushSingle(dataItem)  }

 

数据存储作为Executor功能之一,store方法调用了executor中的pushSingle操作,此时的Single可以理解为一次数据读取,而dataItem就是一次读取的数据对象

 

 

3. SocketReceiver#store=>executor.pushSingle(ReceiverSupervisorImpl.pushSingle)

 

  /** Push a single record of received data into block generator. */  def pushSingle(data: Any) {    blockGenerator.addData(data)  }

 

数据放入到了blockGenerator数据结构中了,blockGenerator,类型为BlockGenerator,顾名思义是一个block生成器,所谓的block生成器,是指Spark Streaming每隔一段时间(默认200毫秒,   private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200))将接收到的数据合并成一个block,然后将这个block写入到BlockManager,继续沿着个思路分析

 

 

4. executor.pushSingle=>BlockGenerator.addData

  /**   * Push a single data item into the buffer. All received data items   * will be periodically pushed into BlockManager.   */  def addData (data: Any): Unit = synchronized {    waitToPush() ///通过阻塞控制Push的速度    currentBuffer += data,将数据追加到currentBuffer中  }

 

 当数据写入到currentBuffer中之后,似乎线索已经断了。事实上是BlockGenerator内部开启的两个线程(BlockIntervalTimer和BlockPushingThread)在背后继续处理currentBuffer

 

BlockIntervalTimer默认每200毫秒执行一次updateCurrentBufferer,该函数的功能是将类型为ArrayBuffer的currentBuffer合并成一个小的Block

  private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)  private val blockIntervalTimer =    new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
 

 

 BlockPushingThread是通过循环调用keepPushingBlocks将BlockIntervalTimer创建的各个Block写入到BlockManager中,

private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
 

 

 上面说到的两个线程的同步是通过ArrayBlockQueue实现的

  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
 

 

 

5. BlockGenerator#updateCurrentBuffer

updateCurrentBuffer由BlockIntervalTimer线程执行

 

  /** Change the buffer to which single records are added to. */  private def updateCurrentBuffer(time: Long): Unit = synchronized {    try {      val newBlockBuffer = currentBuffer       currentBuffer = new ArrayBuffer[Any] //这两句对currentBuffere这样的操作,是否有线程安全问题?没有,因为currentBuffer已经标注为@volatile类型的变量      if (newBlockBuffer.size > 0) {        val blockId = StreamBlockId(receiverId, time - blockInterval) //构造StreamBlockId        val newBlock = new Block(blockId, newBlockBuffer) //创建出一个Block        listener.onGenerateBlock(blockId) //通知谁?空实现,listener是作为BlockGenerator的构造函数传入的,这是一个所有通知时间的空实现        blocksForPushing.put(newBlock)  //添加到阻塞队列中,等待BlockPushingThread读取        logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)      }    } catch {      case ie: InterruptedException =>        logInfo("Block updating timer thread was interrupted")      case e: Exception =>        reportError("Error in block updating thread", e)    }  }
 

 

6. BlockGenerator#keepPushingBlocks

keepPushingBlocks由BlockPushingThread执行

 

 

  /** Keep pushing blocks to the BlockManager. */  private def keepPushingBlocks() {    logInfo("Started block pushing thread")    try {      while(!stopped) {        //poll是阻塞队列的非阻塞方法,但是如果队列中没有元素,则等待100ms,poll是取一个元素操作        Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {          case Some(block) => pushBlock(block)          case None =>        }      }      // Push out the blocks that are still left      logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")      while (!blocksForPushing.isEmpty) {        logDebug("Getting block ")        val block = blocksForPushing.take()        pushBlock(block)        logInfo("Blocks left to push " + blocksForPushing.size())      }      logInfo("Stopped block pushing thread")    } catch {      case ie: InterruptedException =>        logInfo("Block pushing thread was interrupted")      case e: Exception =>        reportError("Error in block pushing thread", e)    }  }
 

 

7. BlockGenerator#pushBlock

这个方法是针对一个Block进行push,而不是一次从队列中把所有的Block取出来,一次进行push。

 

 

  private def pushBlock(block: Block) {    listener.onPushBlock(block.id, block.buffer)    logInfo("Pushed block " + block.id)  }
 

 

8. BlockGeneratorListener#onPushBlock

pushBlock是通过Observer模式,通知listener,这个liestener是BlockGenerator的构造函数传入的(其实是作为内部类,在构造时创建的实例)

 

 

  /** Divides received data records into data blocks for pushing in BlockManager. */  private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {    def onAddData(data: Any, metadata: Any): Unit = { }    def onGenerateBlock(blockId: StreamBlockId): Unit = { }    def onError(message: String, throwable: Throwable) {      reportError(message, throwable)    }    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {      pushArrayBuffer(arrayBuffer, None, Some(blockId))    }  }, streamId, env.conf)
 

 

9. BlockGenerator#pushArrayBuffer

 

  /** Store an ArrayBuffer of received data as a data block into Spark's memory. */  def pushArrayBuffer(      arrayBuffer: ArrayBuffer[_],      metadataOption: Option[Any],      blockIdOption: Option[StreamBlockId]    ) {    pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption)  }
 

 

10. BlockGenerator#pushAndReportBlock

 

 

  /** Store block and report it to driver */  def pushAndReportBlock(      receivedBlock: ReceivedBlock,      metadataOption: Option[Any],      blockIdOption: Option[StreamBlockId]    ) {    val blockId = blockIdOption.getOrElse(nextBlockId)    val numRecords = receivedBlock match {      case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size      case _ => -1    }    val time = System.currentTimeMillis    val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)    logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)    Await.result(future, askTimeout)    logDebug(s"Reported block $blockId")  }
 

 

pushAndReportBlock做了两件事,一是Store Block,而是想Tracker汇报有Block加入

 

10.1 receivedBlockHandler.storeBlock(BlockManagerBasedBlockHandler#storeBlock)

 

 

  def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = {    val putResult: Seq[(BlockId, BlockStatus)] = block match {      case ArrayBufferBlock(arrayBuffer) =>        blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true)      case IteratorBlock(iterator) =>        blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true)      case ByteBufferBlock(byteBuffer) =>        blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true)      case o =>        throw new SparkException(          s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}")    }    if (!putResult.map { _._1 }.contains(blockId)) {      throw new SparkException(        s"Could not store $blockId to block manager with storage level $storageLevel")    }    BlockManagerBasedStoreResult(blockId)  }
 

 

其中,blockManager是BlockManager类型的变量,定义于org.apache.spark.storage包中,实现向BlockManager写入数据,具体调用putIterator,putBytes,这是Spark存储子系统的内容,此处不赘述,重要的是,在此处写入进了BlockManager

 

10.2 ReceiverTracker#AddBlock

通过下面两个语句,将写入到BlockManager的信息汇报给TrackActor,这是一个进程间的同步调用(ask语法)

    val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)    val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout)    Await.result(future, askTimeout)

 

trackerActor对应的实体是ReceiverTracker,AddBlock消息将触发ReceiverTracker.addBlock,进而调用ReceivedBlockTracker.addBlock

 

 

  /** Add new blocks for the given stream */  private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {    receivedBlockTracker.addBlock(receivedBlockInfo)  }
 

 

11. ReceivedBlockTracker.addBlock

 

 

  /** Add received block. This event will get written to the write ahead log (if enabled). */  def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {    try {      writeToLog(BlockAdditionEvent(receivedBlockInfo))//写WAL      getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo //getReceivedBlockQueue从Map<streamId,streamReceivedBlockQueue>中获取相应的streamReceivedBlockQueue      logDebug(s"Stream ${receivedBlockInfo.streamId} received " +        s"block ${receivedBlockInfo.blockStoreResult.blockId}")      true    } catch {      case e: Exception =>        logError(s"Error adding block $receivedBlockInfo", e)        false    }  }
 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客