pyzmq.ZMQStream中的bug

编程技术  /  houtizong 发布于 3年前   81

 

 

问题的现象

 

之前写了一个SharedSocketStream的组件,该组件的目的是用同一个socket负责处理消息发送,防止过多的socket打开和关闭。SharedSocketStream内部会持有一个zmqstream.ZMQStream实例,所有的消息都由ZMQStream发送。

 

测试发现,当有大量的消息发送时,会出现消息突然发送不出去的现象。对于该问题,排查了数日,昨天算是找到了最根本的原因。总结地说是多线程并发引起数据不一致。

 

问题的原因

 

ZMQStream内部会持有一个ZMQSocket,IOLoop会监听该ZMQSocket的事件(read, write, error),至于哪些事件需要IOLoop监听是调用IOLoop.update_handler方法设置的,该方法接受一个events的参数,该参数是一个整数,这个整数的意义是:

 

25:监听error, read事件

29:监听error, read, write事件。

 

当IOLoop监听到ZMQSocket的某个事件,ZMQStream._handle_events方法会被回调(需要注意的是,这个调用发生在IOLoop的线程中)。如果event是read,那么_handle_recv会被调用,会从socket中收消息。如果event是write,那么_handle_send方法会被调用,会做真正的发送消息的操作。

_handle_events方法的最后会调用下面代码:

 

 

          """rebuild io state based on self.sending() and receiving()"""

          if self.socket is None:

              return

          state = self.io_loop.ERROR

          if self.receiving():

              state |= self.io_loop.READ

          if self.sending():

              state |= self.io_loop.WRITE

          if state != self._state:

              self._state = state

              self.io_loop.update_handler(self.socket, state)

 

 

该方法的逻辑是根据ZMQStream内部的状态更新IOLoop需要监听的事件(调用update_handler)。注意这里的逻辑:如果当前发送队列里没有数据,新的state会是25,也就是说,不会监听该ZMQSocket的写事件。

 

再来看看ZMQStream.send_multipart方法。我们在外部使用ZMQStream就是调用send_multipart方法发送数据。ZMQStream.send_multipart方法并不会使用socket发送数据,而是把数据放到内部队列中。然后调用下面的代码来更新IOLoop需要监听该socket的write事件:

 

 

          """Add io_state to poller."""

          if not self._state & state:

              self._state = self._state | state

              self.io_loop.update_handler(self.socket, self._state)

 

 

该方法先做了一个判断,这个判断的逻辑是:如果ZMQStream没有注册write事件,才会向IOLoop注册write事件。具体来说,如果self._state == 25(25 & 4 = 0,4就是那个state),才会将生成一个29(使用 self._state | state),将29注册到IOLoop上。

 

其实,根本的原因就在于,下面的两行代码会被两个线程同时调到:

 

 

1              self._state = self._state | state

2             self.io_loop.update_handler(self.socket, self._state)

 

 

其中一个线程是IOLoop的线程,我们可以称之为ioloop_thread, 另外一个线程是我们的自己发送消息的线程(该线程会调用ZMQStream.send_multipart,间接调用了这两行代码),我们可以称之为work_thread。下面是一个极端的情况会导致bug的出现:(我们将这两行代码进行了编号)

 

1. ioloop_thread走完了语句1,将self._state设置为25

2. work_thread走完了语句1,将self._state设置为29

3. work_thread走完了语句2,设置监听read, write, error事件。

4. ioloop_thread走完了语句2,设置监听read, error事件。

 

如果按照这样的顺序走完,状态会变成下面这样:

 

a) ZMQStream内部的_state是29,也就是说,ZMQStream自己认为IOLoop会监听socket read, write, error事件。

b) IOLoop被配置只监听socket的read, error事件。

 

由于write事件没有被监听,所以,最可恶的是这个判断if not self._state & state。本来,在调用ZMQStream.send_multipart方法时,是有机会重新配置IOLoop监听write事件的,但是由于这个判断,并且ZMQStream的内部_state是29,所以IOLoop._update_handle不会被调用。从而,IOLoop永远都只监听read, error事件,ZMQStream._handle_send方法也永远不会被调用,导致消息不会被真正发送。

 

解决办法

 

可以绕过这个问题,在调用ZMQStream.send_multipart之前,强制设置ZMQStream._state = 25

 

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!

留言需要登陆哦

技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成

网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];

      订阅博客周刊 去订阅

文章归档

文章标签

友情链接

Auther ·HouTiZong
侯体宗的博客
© 2020 zongscan.com
版权所有ICP证 : 粤ICP备20027696号
PHP交流群 也可以扫右边的二维码
侯体宗的博客