【Spark102】Spark存储模块BlockManager剖析
编程技术  /  houtizong 发布于 3年前   309
Spark围绕着BlockManager构建了存储模块,包括RDD,Shuffle,Broadcast的存储都使用了BlockManager。而BlockManager在实现上是一个针对每个应用的Master/Executor结构,即Driver上BlockManager充当了Master角色,而各个Slave上(具体到应用范围,就是Executor)的BlockManager充当了Slave角色。
因此,BlockManager在Driver和应用的各个Executor之间各有一份,Driver上的BlockManager不具备实际存储的能力,它记录了各个Executor的BlockManager的状态(通过查看BlockManagerMaster和BlockManagerMasterActor的源代码,BlockManagerMaster和BlockManagerMasterActor并没有持有一个BlockManager对象,那么每个Executor BlockManager的状态存储在什么地方?通过查看BlockManager的类注释,发现BlockManager确实运行在Driver上)。Master BlockManager和ExecutorBlockManager之间的通信也是基于Akka,消息格式定义于BlockManagerMessages类中。
/** * Manager running on every node (driver and executors) which provides interfaces for putting and * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap). * * Note that #initialize() must be called before the BlockManager is usable. */private[spark] class BlockManager(
Master BlockManager向Executor BlockManager可以发送的消息包括:
sealed trait ToBlockManagerSlave // Remove a block from the slaves that have it. This can only be used to remove // blocks that the master knows about. case class RemoveBlock(blockId: BlockId) extends ToBlockManagerSlave // Remove all blocks belonging to a specific RDD. case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific shuffle. case class RemoveShuffle(shuffleId: Int) extends ToBlockManagerSlave // Remove all blocks belonging to a specific broadcast. case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true) extends ToBlockManagerSlave
Executor BlockManager向Master BlockManager可以发送的消息包括:
sealed trait ToBlockManagerMaster case class RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long, sender: ActorRef) extends ToBlockManagerMaster //获取某个Block在哪些Executor的BlockManager上 case class GetLocations(blockId: BlockId) extends ToBlockManagerMaster //获取一组Block在哪些Executor的BlockManager上 case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case class GetActorSystemHostPortForExecutor(executorId: String) extends ToBlockManagerMaster //删除Executor case class RemoveExecutor(execId: String) extends ToBlockManagerMaster case object StopBlockManagerMaster extends ToBlockManagerMaster case object GetMemoryStatus extends ToBlockManagerMaster case object GetStorageStatus extends ToBlockManagerMaster case class GetBlockStatus(blockId: BlockId, askSlaves: Boolean = true) extends ToBlockManagerMaster case class GetMatchingBlockIds(filter: BlockId => Boolean, askSlaves: Boolean = true) extends ToBlockManagerMaster case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster case object ExpireDeadHosts extends ToBlockManagerMaster //更新Block信息 case class UpdateBlockInfo( var blockManagerId: BlockManagerId, var blockId: BlockId, var storageLevel: StorageLevel, var memSize: Long, var diskSize: Long, var tachyonSize: Long) extends ToBlockManagerMaster with Externalizable { def this() = this(null, null, null, 0, 0, 0) // For deserialization only override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException { blockManagerId.writeExternal(out) out.writeUTF(blockId.name) storageLevel.writeExternal(out) out.writeLong(memSize) out.writeLong(diskSize) out.writeLong(tachyonSize) } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { blockManagerId = BlockManagerId(in) blockId = BlockId(in.readUTF()) storageLevel = StorageLevel(in) memSize = in.readLong() diskSize = in.readLong() tachyonSize = in.readLong() } }
/** * BlockManagerMasterActor is an actor on the master node to track statuses of * all slaves' block managers. */包含的数据结构:
// Mapping from block manager id to the block manager's information. ///BlockManagerId与BlockManagerInfo之间的对应,每个Executor对应一个BlockManagerId private val blockManagerInfo = new mutable.HashMap[BlockManagerId, BlockManagerInfo] // Mapping from executor ID to block manager ID. //Executor ID与BlockManagerID之间的对应关系 private val blockManagerIdByExecutor = new mutable.HashMap[String, BlockManagerId] // Mapping from block id to the set of block managers that have the block. //BlockId与包含这个Block的Location(由BlockManagerId表示) private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
BlockManagerId是一个由host,port和executorId表示的数据结构,从这里也可以看出来BlockManager是Executor范围的数据结构
/** * An actor to take commands from the master to execute options. For example, * this is used to remove blocks from the slave's BlockManager. */
BlockManagerSlaveActor包含的数据结构都体现在构造函数中了,如下所示,BlockManagerSlaveActor包含了本Executor对应的BlockManager以及该Executor的MapOutputTracker用于记录Map Shuffle输出
private[storage]class BlockManagerSlaveActor( blockManager: BlockManager, mapOutputTracker: MapOutputTracker) extends Actor with ActorLogReceive with Logging
弄清楚BlockManager的通信机制,发现要比分析BlockManager的读写数据(依赖于DiskStore和MemoryStore实现,而DiskStore又依赖于DiskBlockManager实现)复杂一些,主要是头脑中没有清晰的picuture:关于BlockManager,Driver有什么,Executor上有什么,它们之间如何通信,这个继续分析吧。
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接