【Spark102】Spark存储模块BlockManager剖析

编程技术  /  houtizong 发布于 3年前   273

Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色。

因此,BlockManager在Driver和应用的各个Executor之间各有一份,Driver上的BlockManager不具备实际存储的能力,它记录了各个Executor的BlockManager的状态(通过查看BlockManagerMaster和BlockManagerMasterActor的源代码,BlockManagerMaster和BlockManagerMasterActor并没有持有一个BlockManager对象,那么每个Executor BlockManager的状态存储在什么地方?通过查看BlockManager的类注释,发现BlockManager确实运行在Driver上)。Master BlockManager和ExecutorBlockManager之间的通信也是基于Akka,消息格式定义于BlockManagerMessages类中。

 

上面的描述并不准确,事实上在Driver端,同Executor一样,各有一个BlockManager。除此之外,Driver上还有一个BlockManager Master,它的实现类是BlockManagerManager,因此,对于BlockManager而言,Driver既是Master也是Slave

 

0.BlockManager类注释:

 

/** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). * * Note that #initialize() must be called before the BlockManager is usable. */private[spark] class BlockManager(

 

 

1. Master/Slave的通信内容

Master BlockManager向Executor BlockManager可以发送的消息包括:

 

 sealed trait ToBlockManagerSlave  // Remove a block from the slaves that have it. This can only be used to remove  // blocks that the master knows about.  case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave  // Remove all blocks belonging to a specific RDD.  case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave  // Remove all blocks belonging to a specific shuffle.  case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave  // Remove all blocks belonging to a specific broadcast.  case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)    extends ToBlockManagerSlave

 

Executor BlockManager向Master BlockManager可以发送的消息包括:

 

 

 sealed trait ToBlockManagerMaster  case class RegisterBlockManager(      blockManagerId: BlockManagerId,      maxMemSize: Long,      sender: ActorRef)    extends ToBlockManagerMaster  //获取某个Block在哪些Executor的BlockManager上    case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster  //获取一组Block在哪些Executor的BlockManager上  case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster   case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster  case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster  //删除Executor  case class RemoveExecutor(execId: String) extends ToBlockManagerMaster    case object StopBlockManagerMaster extends ToBlockManagerMaster  case object GetMemoryStatus extends ToBlockManagerMaster  case object GetStorageStatus extends ToBlockManagerMaster  case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true)    extends ToBlockManagerMaster  case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true)    extends ToBlockManagerMaster  case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster  case object ExpireDeadHosts extends ToBlockManagerMaster  //更新Block信息  case class UpdateBlockInfo(      var blockManagerId: BlockManagerId,      var blockId: BlockId,      var storageLevel: StorageLevel,      var memSize: Long,      var diskSize: Long,      var tachyonSize: Long)    extends ToBlockManagerMaster    with Externalizable {    def this() = this(null, null, null, 0, 0, 0)  // For deserialization only    override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {      blockManagerId.writeExternal(out)      out.writeUTF(blockId.name)      storageLevel.writeExternal(out)      out.writeLong(memSize)      out.writeLong(diskSize)      out.writeLong(tachyonSize)    }    override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {      blockManagerId = BlockManagerId(in)      blockId = BlockId(in.readUTF())      storageLevel = StorageLevel(in)      memSize = in.readLong()      diskSize = in.readLong()      tachyonSize = in.readLong()    }  }

 

 

2. BlockManagerMasterActor说明

 

/** * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. */
 包含的数据结构:

 

 

 

  // Mapping from block manager id to the block manager's information.  ///BlockManagerId与BlockManagerInfo之间的对应,每个Executor对应一个BlockManagerId  private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo]  // Mapping from executor ID to block manager ID.  //Executor ID与BlockManagerID之间的对应关系  private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId]  // Mapping from block id to the set of block managers that have the block.  //BlockId与包含这个Block的Location(由BlockManagerId表示)  private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
 

 

BlockManagerId是一个由host,port和executorId表示的数据结构,从这里也可以看出来BlockManager是Executor范围的数据结构

 

 

3. BlockManagerSlaveActor说明

 

/** * An actor to take commands from the master to execute options. For example, * this is used to remove blocks from the slave's BlockManager. */

BlockManagerSlaveActor包含的数据结构都体现在构造函数中了,如下所示,BlockManagerSlaveActor包含了本Executor对应的BlockManager以及该Executor的MapOutputTracker用于记录Map Shuffle输出

private[storage]class BlockManagerSlaveActor(    blockManager: BlockManager,    mapOutputTracker: MapOutputTracker)  extends Actor with ActorLogReceive with Logging

 

弄清楚BlockManager的通信机制,发现要比分析BlockManager的读写数据(依赖于DiskStore和MemoryStore实现,而DiskStore又依赖于DiskBlockManager实现)复杂一些,主要是头脑中没有清晰的picuture:关于BlockManager,Driver有什么,Executor上有什么,它们之间如何通信,这个继续分析吧。

 

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客