【Spark八十三】BlockManager在Spark中的使用场景

编程技术  /  houtizong 发布于 3年前   94

1. Broadcast变量的存储,在HttpBroadcast类中可以知道

2. RDD通过CacheManager存储RDD中的数据,CacheManager也是通过BlockManager进行存储的

3. ShuffleMapTask得到的结果数据,是通过FileShuffleBlockManager进行管理的,而FileShuffleBlockManager最终也是使用BlockManager来对数据块进行管理

4. ResultTask的结果如果非常大,那么使用IndirectTaskResult作为返回结果给Driver,Driver通过IndirectTaskResult反查BlockManager获取数据,通过Netty的数据传输

代码在TaskRunner的run方法中。

 

       // directSend = sending directly back to the driver        val serializedResult = {          if (maxResultSize > 0 && resultSize > maxResultSize) {            logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +              s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +              s"dropping it.")            ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))          } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //如果结果大于Akka传输的数据量(默认10M)            val blockId = TaskResultBlockId(taskId)            env.blockManager.putBytes( //设置到BlockManager中,MEMORY_AND_DISK_SER表示              blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)            logInfo(              s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")            ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))          } else {            logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")            serializedDirectResult          }        }

 

 5. Spark Streaming将接收到的数据首先写入到BlockManager中

 

关于StorageLevel的设置:

 

class StorageLevel private(    private var _useDisk: Boolean,    private var _useMemory: Boolean,    private var _useOffHeap: Boolean,    private var _deserialized: Boolean,    private var _replication: Int = 1)

五个参数:

使用磁盘存储

使用内存存储

使用Tachyon存储

deserialzed表示存储的数据是否是原始数据(没有进行序列化)?

副本数

 

  val NONE = new StorageLevel(false, false, false, false)  val DISK_ONLY = new StorageLevel(true, false, false, false)  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)  val MEMORY_ONLY = new StorageLevel(false, true, false, true)  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)  val OFF_HEAP = new StorageLevel(false, false, true, false)

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客