zoukankan      html  css  js  c++  java
  • 简单的Python 多进程异步处理 | 王晨的博客

    简单的Python 多进程异步处理 | 王晨的博客

    简单的Python 多进程异步处理

    它启动后,监视队列,如果有新的请求进来,就fork 一个子进程去处理。
    为了更易理解,删减了一些异常处理、日志等代码。

    #!/usr/bin/env python
    #encoding: utf-8
    import logging
    import os
    import time
    
    class Queue(object):
        '''
        基类,队列的抽象接口
        '''
        def pop(self):
            pass
    
    class QueueObserver:
        '''
        监视队列,若发现请求,建立新的进程处理之
        '''
        def __init__(self, queue, func, max=1, sleepInterval=0.1):
            '''
            queue         - 必选,队列对象,必须继承自Queue 类,并实现pop 方法
            func          - 必选,要执行的函数引用
            max           - 可选,最多启动多少个进程,默认为1,单进程
            sleepInterval - 可选,默认为0.1秒
            '''
            self.children = []
    
            self.queue = queue
            assert queue
            self.func = func
            assert func
            self.max = max
            self.sleepInterval = sleepInterval
    
        def start(self):
            while True:
                item = self.queue.pop()
                if item == None:
                    # Empty queue, sleepInterval and check it again
                    time.sleep(self.sleepInterval)
                    continue
                # Got a job
                pid = os.fork()
                if pid:
                    # The parent
                    self.children.append(pid)
                    self.collect_children()
                    while len(self.children) >= self.max:
                        # Limit the number of forked processes
                        self.collect_children()
                        time.sleep(self.sleepInterval)
                else:
                    # The child
                    ecode = 0
                    self.func(item)
                    logging.debug('P-%d has done: %s.' % (os.getpid(), item))
                    os._exit(ecode)
    
        def collect_children(self):
            '''
            清理已完成的子进程
            '''
            while self.children:
                try:
                    pid, status = os.waitpid(0, os.WNOHANG)
                except os.error:
                    pid = None
                if pid:
                    self.children.remove(pid)
                else:
                    break 
    
    if __name__ == '__main__':
        import redis
        class RedisQueue(Queue):
            '''
            演示用的实现,基于Redis 的队列
            '''
            def __init__(self, host, port, key):
                self.r = redis.Redis(host, port)
                self.key = key
    
            def pop(self):
                return self.r.rpop(self.key)
    
        def test(x):
            logging.info(int(x) * 2)
    
        logging.basicConfig(level=logging.DEBUG)
        q = RedisQueue('localhost', 6300, 'Q')
        qo = QueueObserver(q,test)
        qo.start()
  • 相关阅读:
    lists and Dictionaries
    捕获鼠标点击 位置移动
    Preventing and Event from Propagation Through a set of Nested Elements
    瀑布流
    Using Function Closures with Timers
    $.getJSON 的用法
    Overlay 遮罩层
    git常见问题
    spring 全局异常处理
    spring 事务
  • 原文地址:https://www.cnblogs.com/lexus/p/2864920.html
Copyright © 2011-2022 走看看