pyzmq.ZMQStream中的bug
编程技术  /  houtizong 发布于 3年前   81
问题的现象
之前写了一个SharedSocketStream的组件,该组件的目的是用同一个socket负责处理消息发送,防止过多的socket打开和关闭。SharedSocketStream内部会持有一个zmqstream.ZMQStream实例,所有的消息都由ZMQStream发送。
测试发现,当有大量的消息发送时,会出现消息突然发送不出去的现象。对于该问题,排查了数日,昨天算是找到了最根本的原因。总结地说是多线程并发引起数据不一致。
问题的原因
ZMQStream内部会持有一个ZMQSocket,IOLoop会监听该ZMQSocket的事件(read, write, error),至于哪些事件需要IOLoop监听是调用IOLoop.update_handler方法设置的,该方法接受一个events的参数,该参数是一个整数,这个整数的意义是:
25:监听error, read事件
29:监听error, read, write事件。
当IOLoop监听到ZMQSocket的某个事件,ZMQStream._handle_events方法会被回调(需要注意的是,这个调用发生在IOLoop的线程中)。如果event是read,那么_handle_recv会被调用,会从socket中收消息。如果event是write,那么_handle_send方法会被调用,会做真正的发送消息的操作。
_handle_events方法的最后会调用下面代码:
"""rebuild io state based on self.sending() and receiving()"""
if self.socket is None:
return
state = self.io_loop.ERROR
if self.receiving():
state |= self.io_loop.READ
if self.sending():
state |= self.io_loop.WRITE
if state != self._state:
self._state = state
self.io_loop.update_handler(self.socket, state)
该方法的逻辑是根据ZMQStream内部的状态更新IOLoop需要监听的事件(调用update_handler)。注意这里的逻辑:如果当前发送队列里没有数据,新的state会是25,也就是说,不会监听该ZMQSocket的写事件。
再来看看ZMQStream.send_multipart方法。我们在外部使用ZMQStream就是调用send_multipart方法发送数据。ZMQStream.send_multipart方法并不会使用socket发送数据,而是把数据放到内部队列中。然后调用下面的代码来更新IOLoop需要监听该socket的write事件:
"""Add io_state to poller."""
if not self._state & state:
self._state = self._state | state
self.io_loop.update_handler(self.socket, self._state)
该方法先做了一个判断,这个判断的逻辑是:如果ZMQStream没有注册write事件,才会向IOLoop注册write事件。具体来说,如果self._state == 25(25 & 4 = 0,4就是那个state),才会将生成一个29(使用 self._state | state),将29注册到IOLoop上。
其实,根本的原因就在于,下面的两行代码会被两个线程同时调到:
1 self._state = self._state | state
2 self.io_loop.update_handler(self.socket, self._state)
其中一个线程是IOLoop的线程,我们可以称之为ioloop_thread, 另外一个线程是我们的自己发送消息的线程(该线程会调用ZMQStream.send_multipart,间接调用了这两行代码),我们可以称之为work_thread。下面是一个极端的情况会导致bug的出现:(我们将这两行代码进行了编号)
1. ioloop_thread走完了语句1,将self._state设置为25
2. work_thread走完了语句1,将self._state设置为29
3. work_thread走完了语句2,设置监听read, write, error事件。
4. ioloop_thread走完了语句2,设置监听read, error事件。
如果按照这样的顺序走完,状态会变成下面这样:
a) ZMQStream内部的_state是29,也就是说,ZMQStream自己认为IOLoop会监听socket read, write, error事件。
b) IOLoop被配置只监听socket的read, error事件。
由于write事件没有被监听,所以,最可恶的是这个判断if not self._state & state。本来,在调用ZMQStream.send_multipart方法时,是有机会重新配置IOLoop监听write事件的,但是由于这个判断,并且ZMQStream的内部_state是29,所以IOLoop._update_handle不会被调用。从而,IOLoop永远都只监听read, error事件,ZMQStream._handle_send方法也永远不会被调用,导致消息不会被真正发送。
解决办法
可以绕过这个问题,在调用ZMQStream.send_multipart之前,强制设置ZMQStream._state = 25
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接