深入分析Voldemort的PerformParallelRequests
编程技术  /  houtizong 发布于 3年前   139
PerformParallelRequests是PipelineRoutedStore中用来执行get操作的action。该action并行向各个节点发送get请求。底层的通信采用非阻塞的nio方式,但是PerformParallelRequests对外界是一种阻塞模式,CountDownLatch被用来实现这种阻塞。
PerformParallelRequests在构造方法中必须指定下面一些参数:
BasicPipelineData pipelineData: 在整个pipeline执行过程的上下文环境数据集合
Event completeEvent: 该action成功执行完之后需要执行的Event
ByteArray key: 该get操作的key
FailureDetector failureDetector: 失败节点的监听组件
int preferred: 推荐成功读的节点数
int required: 必须成功读的节点数
byte[] transforms: get操作对应的value
Map<Integer, NonblockingStore> nonblockingStores: 需要通信的节点列表
Event insufficientSuccessesEvent: 成功的读节点数目没有达到必须要求时需要触发的事件
Event insufficientZonesEvent: 成功读取的zone数目没有达到必须要求时需要触发的事件
一般情况下,从一个类所拥有的数据基本上就能知道该类的主要执行逻辑。详细的execute(Pipeline)逻辑是这样的:
+-------------------------------------------------------------------------------------+
| 调用PipelineData.getNodes得到需要访问的节点列表
+-------------------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------------------+
| 构造一个CountDownLatch,
| 需要计数的数目是需要访问的节点的数目
+-------------------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------------------+
| 对于每一个节点,构造NonblockingStoreCallback,
| 通过nonblockingStores得到访问该节点的NonblockingStore,
| 调用submitGetRequest提交Get请求
+-------------------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------------------+
| latch.await(timeoutMs, TimeUnit.MILLISECONDS);
+-------------------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------------------+
| 分析response列表里的response
| 在NonblockStoreCallback的callback方法里,每个节点的Response
| 被放置在了一个Response的List,这里就处理这个List的Response对象
|
| 如果某个Response是一个Exception,则调用handleResponseError方法,
| handleResponseError的逻辑是检查异常类型,如果异常是UnreachableStoreException
| 就调用PipelineData的addFailedNode和recordFailure方法记录该节点以及该异常
| 同时,调用FailureDetector的recordException记录该异常,供该节点的状态检测用
|
| 如果某个Response是一个正常的结果(不是一个Exception),
| 就将该Response添加到PipelineData的response列表中,
| 同时,调用FailureDetector的recordSuccess记录这次成功的调用,供该节点的状态检测用
+------------------------------------------------------------------------------------+
|
v
+-------------------------------------------------------------------------------------+
| 检查PipelineData里的response列表大小是否小于required。
|
| 如果小于required,为Pipeline添加insufficientSuccessesEvent事件。
|
| 如果得到的response数目大于或小于required,为Pipeline添加completeEvent事件。
+------------------------------------------------------------------------------------+
能学到什么经验?
Voldemort的代码看起来很漂亮清晰,PerformParallelRequest也不是例外。从PerformParalleRequest的实现我们可以总结:
1. 虽然底层是异步的IO,但是,借助CountDownLatch这个工具类可以实现对外同步的效果。异步调用的Callback逻辑里调用countDown,外部逻辑调用带有超时机制的await方法。
2. 仔细处理一些极端情况。下面是NonblockingStoreCallback里callback的逻辑:
// Note errors that come in after the pipeline has finished. // These will *not* get a chance to be called in the loop of // responses below. if(pipeline.isFinished() && response.getValue() instanceof Exception) { if(response.getValue() instanceof InvalidMetadataException) { logger.warn("Received invalid metadata problem after a successful " + pipeline.getOperation().getSimpleName() + " call on node " + node.getId() + ", store '" + pipelineData.getStoreName() + "'"); } else { handleResponseError(response, pipeline, failureDetector); } }
如果我在写这个逻辑的时候,我可能不会注意到在callback里处理pipeline关掉的情况。
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接