zoukankan      html  css  js  c++  java
  • multiprocessing 多进程实现 生产者与消费者模型JoinableQueue

    from  multiprocessing  import JoinableQueue
    import time
    import random
    import asyncio
    import   logging
    from  multiprocessing import cpu_count
    from multiprocessing import Process
    logging.basicConfig(level = logging.INFO,format = '%(asctime)s  - %(levelname)s -->%(funcName)s  at line %(lineno)d: 
     %(message)s')
    log= logging.getLogger()
    # set max length of  queue and limit  concurrent numbers put
    q_init = JoinableQueue(maxsize=5)
    
    async def jobs(item):
        time.sleep(random.randint(1,4))
        status = random.randint(0, 1)
        if status == 0:
            return ("success",item)
        else:
            return ("failed",item)
    
    async def do_work(item):
        logging.info("do something %s,time start %s" % (item, time.asctime()))
        a =await jobs(item)
        return a
    
    def async_runner(checker):
        new_loop = asyncio.new_event_loop()
        asyncio.set_event_loop(new_loop)
        loop = asyncio.get_event_loop()
        task = asyncio.ensure_future(do_work(checker))
        loop.run_until_complete(asyncio.wait([task]))
        st = task.result()
        return st
    
    def worker_consumer(q_init):
            while True:
                if q_init.empty():  break
                # logging.info("queue is empty , Stop Each Process  BY Break " )
                checker = q_init.get()
                st=async_runner(checker)
                if st[0] in ["success", "failed"]:
                    logging.info("%s task finished status is %s" % (st[1],st[0]))
                    # notify q.join() in producer,consumer has get  element  of queue 
                    q_init.task_done()
    
    
    
    def producer(q_init):
         for  i in range(10):
             q_init.put(i)
         # block produce util consumer get all queue elements
         q_init.join()
    
    
    if __name__ == '__main__':
        # GET MAX CPU NUMBER OF MACHINE
        cpu_count=cpu_count()
        produce=[Process(target=producer,args=(q_init,))]
        consums=[Process(target=worker_consumer,args=(q_init,)) for i in range(cpu_count)]
        for p in produce:
            p.start()
        for c in consums:
            c.start()
        for pr in produce:
            pr.join()
        for c in consums:
            c.join()
    

      结果:

    2019-12-21 19:50:00,754  - INFO --><module>  at line 65: 
     cpucount is 8
    2019-12-21 19:50:01,036  - INFO -->do_work  at line 23: 
     do something 1,time start Sat Dec 21 19:50:01 2019
    2019-12-21 19:50:01,038  - INFO -->do_work  at line 23: 
     do something 2,time start Sat Dec 21 19:50:01 2019
    2019-12-21 19:50:01,040  - INFO -->do_work  at line 23: 
     do something 3,time start Sat Dec 21 19:50:01 2019
    2019-12-21 19:50:01,041  - INFO -->do_work  at line 23: 
     do something 4,time start Sat Dec 21 19:50:01 2019
    2019-12-21 19:50:01,055  - INFO -->do_work  at line 23: 
     do something 5,time start Sat Dec 21 19:50:01 2019
    2019-12-21 19:50:02,042  - INFO -->worker_consumer  at line 43: 
     4 task finished status is success
    2019-12-21 19:50:02,043  - INFO -->do_work  at line 23: 
     do something 6,time start Sat Dec 21 19:50:02 2019
    2019-12-21 19:50:02,056  - INFO -->worker_consumer  at line 43: 
     5 task finished status is success
    2019-12-21 19:50:02,056  - INFO -->do_work  at line 23: 
     do something 7,time start Sat Dec 21 19:50:02 2019
    2019-12-21 19:50:03,041  - INFO -->worker_consumer  at line 43: 
     3 task finished status is success
    2019-12-21 19:50:03,042  - INFO -->do_work  at line 23: 
     do something 8,time start Sat Dec 21 19:50:03 2019
    2019-12-21 19:50:03,058  - INFO -->worker_consumer  at line 43: 
     7 task finished status is success
    2019-12-21 19:50:03,059  - INFO -->do_work  at line 23: 
     do something 9,time start Sat Dec 21 19:50:03 2019
    2019-12-21 19:50:04,036  - INFO -->worker_consumer  at line 43: 
     1 task finished status is failed
    2019-12-21 19:50:04,037  - INFO -->do_work  at line 23: 
     do something 10,time start Sat Dec 21 19:50:04 2019
    2019-12-21 19:50:04,043  - INFO -->worker_consumer  at line 43: 
     6 task finished status is success
    2019-12-21 19:50:04,059  - INFO -->worker_consumer  at line 43: 
     9 task finished status is success
    2019-12-21 19:50:05,039  - INFO -->worker_consumer  at line 43: 
     2 task finished status is failed
    2019-12-21 19:50:07,042  - INFO -->worker_consumer  at line 43: 
     8 task finished status is success
    2019-12-21 19:50:08,038  - INFO -->worker_consumer  at line 43: 
     10 task finished status is failed
    
    Process finished with exit code 0
    

      

  • 相关阅读:
    xfire for web-Service
    如何使用 XSD
    XSD
    一个 XSD 实例
    RE:转:一些不常用的html代码
    <base target="_self"/>标签的用法
    C#有关日期的使用方法
    GridView 高亮某一行
    DropDownList绑定数据库
    Request.Querystring中文乱码问题解决
  • 原文地址:https://www.cnblogs.com/SunshineKimi/p/12078070.html
Copyright © 2011-2022 走看看