简单的Python 多进程异步处理 | 王晨的博客
Python  /  houtizong 发布于 3年前   428
简单的Python 多进程异步处理
它启动后,监视队列,如果有新的请求进来,就fork 一个子进程去处理。
为了更易理解,删减了一些异常处理、日志等代码。#!/usr/bin/env python#encoding: utf-8import loggingimport osimport timeclass Queue(object): ''' 基类,队列的抽象接口 ''' def pop(self): passclass QueueObserver: ''' 监视队列,若发现请求,建立新的进程处理之 ''' def __init__(self, queue, func, max=1, sleepInterval=0.1): ''' queue - 必选,队列对象,必须继承自Queue 类,并实现pop 方法 func - 必选,要执行的函数引用 max - 可选,最多启动多少个进程,默认为1,单进程 sleepInterval - 可选,默认为0.1秒 ''' self.children = [] self.queue = queue assert queue self.func = func assert func self.max = max self.sleepInterval = sleepInterval def start(self): while True: item = self.queue.pop() if item == None: # Empty queue, sleepInterval and check it again time.sleep(self.sleepInterval) continue # Got a job pid = os.fork() if pid: # The parent self.children.append(pid) self.collect_children() while len(self.children) >= self.max: # Limit the number of forked processes self.collect_children() time.sleep(self.sleepInterval) else: # The child ecode = 0 self.func(item) logging.debug('P-%d has done: %s.' % (os.getpid(), item)) os._exit(ecode) def collect_children(self): ''' 清理已完成的子进程 ''' while self.children: try: pid, status = os.waitpid(0, os.WNOHANG) except os.error: pid = None if pid: self.children.remove(pid) else: break if __name__ == '__main__': import redis class RedisQueue(Queue): ''' 演示用的实现,基于Redis 的队列 ''' def __init__(self, host, port, key): self.r = redis.Redis(host, port) self.key = key def pop(self): return self.r.rpop(self.key) def test(x): logging.info(int(x) * 2) logging.basicConfig(level=logging.DEBUG) q = RedisQueue('localhost', 6300, 'Q') qo = QueueObserver(q,test) qo.start()
请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!
技术博客集 - 网站简介:
前后端技术:
后端基于Hyperf2.1框架开发,前端使用Bootstrap可视化布局系统生成
网站主要作用:
1.编程技术分享及讨论交流,内置聊天系统;
2.测试交流框架问题,比如:Hyperf、Laravel、TP、beego;
3.本站数据是基于大数据采集等爬虫技术为基础助力分享知识,如有侵权请发邮件到站长邮箱,站长会尽快处理;
4.站长邮箱:[email protected];
文章归档
文章标签
友情链接