深入分析Voldemort的PerformParallelRequests

编程技术  /  houtizong 发布于 3年前   139

 

 

PerformParallelRequests是PipelineRoutedStore中用来执行get操作的action。该action并行向各个节点发送get请求。底层的通信采用非阻塞的nio方式,但是PerformParallelRequests对外界是一种阻塞模式,CountDownLatch被用来实现这种阻塞。

 

PerformParallelRequests在构造方法中必须指定下面一些参数:

 

BasicPipelineData pipelineData: 在整个pipeline执行过程的上下文环境数据集合

Event completeEvent: 该action成功执行完之后需要执行的Event

ByteArray key: 该get操作的key

FailureDetector failureDetector: 失败节点的监听组件

int preferred: 推荐成功读的节点数

int required: 必须成功读的节点数

byte[] transforms: get操作对应的value

Map<Integer, NonblockingStore> nonblockingStores: 需要通信的节点列表

Event insufficientSuccessesEvent: 成功的读节点数目没有达到必须要求时需要触发的事件

Event insufficientZonesEvent: 成功读取的zone数目没有达到必须要求时需要触发的事件

 

一般情况下,从一个类所拥有的数据基本上就能知道该类的主要执行逻辑。详细的execute(Pipeline)逻辑是这样的:

 

+-------------------------------------------------------------------------------------+

|         调用PipelineData.getNodes得到需要访问的节点列表

+-------------------------------------------------------------------------------------+

                                                |

                                                v

+-------------------------------------------------------------------------------------+

|          构造一个CountDownLatch,

|          需要计数的数目是需要访问的节点的数目

+-------------------------------------------------------------------------------------+

                                                |

                                                v

+-------------------------------------------------------------------------------------+             

|         对于每一个节点,构造NonblockingStoreCallback,                                        

|         通过nonblockingStores得到访问该节点的NonblockingStore,          

|         调用submitGetRequest提交Get请求              

+-------------------------------------------------------------------------------------+

                                               |

                                               v

+-------------------------------------------------------------------------------------+

|         latch.await(timeoutMs, TimeUnit.MILLISECONDS);

+-------------------------------------------------------------------------------------+

                                               |

                                               v

+-------------------------------------------------------------------------------------+

|         分析response列表里的response

|         在NonblockStoreCallback的callback方法里,每个节点的Response

|         被放置在了一个Response的List,这里就处理这个List的Response对象

|

|         如果某个Response是一个Exception,则调用handleResponseError方法,

|         handleResponseError的逻辑是检查异常类型,如果异常是UnreachableStoreException

|         就调用PipelineData的addFailedNode和recordFailure方法记录该节点以及该异常

|         同时,调用FailureDetector的recordException记录该异常,供该节点的状态检测用

|

|         如果某个Response是一个正常的结果(不是一个Exception),

|         就将该Response添加到PipelineData的response列表中,

|         同时,调用FailureDetector的recordSuccess记录这次成功的调用,供该节点的状态检测用

+------------------------------------------------------------------------------------+

                                               |

                                               v

+-------------------------------------------------------------------------------------+

|         检查PipelineData里的response列表大小是否小于required。

|

|         如果小于required,为Pipeline添加insufficientSuccessesEvent事件。

|

|         如果得到的response数目大于或小于required,为Pipeline添加completeEvent事件。

+------------------------------------------------------------------------------------+

 

 

能学到什么经验?

 

Voldemort的代码看起来很漂亮清晰,PerformParallelRequest也不是例外。从PerformParalleRequest的实现我们可以总结:

 

1. 虽然底层是异步的IO,但是,借助CountDownLatch这个工具类可以实现对外同步的效果。异步调用的Callback逻辑里调用countDown,外部逻辑调用带有超时机制的await方法。

 

2. 仔细处理一些极端情况。下面是NonblockingStoreCallback里callback的逻辑:

 

 

// Note errors that come in after the pipeline has finished.                    // These will *not* get a chance to be called in the loop of                    // responses below.                    if(pipeline.isFinished() && response.getValue() instanceof Exception) {                        if(response.getValue() instanceof InvalidMetadataException) {                            logger.warn("Received invalid metadata problem after a successful "                                        + pipeline.getOperation().getSimpleName()                                        + " call on node " + node.getId() + ", store '"                                        + pipelineData.getStoreName() + "'");                        } else {                            handleResponseError(response, pipeline, failureDetector);                        }                    }
 

如果我在写这个逻辑的时候,我可能不会注意到在callback里处理pipeline关掉的情况。

 

 

 

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客