zoukankan      html  css  js  c++  java
  • Python并发实践_02_通过yield实现协程

    python中实现并发的方式有很多种,通过多进程并发可以真正利用多核资源,而多线程并发则实现了进程内资源的共享,然而Python中由于GIL的存在,多线程是没有办法真正实现多核资源的。

    对于计算密集型程序,应该使用多进程并发充分利用多核资源,而在IO密集型程序中,多核优势并不明显,甚至由于大多数时间都是在IO堵塞状态,多进程的切换消耗反而让程序效率更加低下。

    而当需要并发处理IO密集型任务时,就需要用到协程(Coroutine)。协程并没有系统级的调度,而是用户级的调度方式,避免了系统调用的开销,虽然协程最终是串行工作,但是却可以实现非常大的并发量。通过多进程+协程的方式,可以有效均衡多核计算和请求等待。

    参考文章:

    https://blog.tonyseek.com/post/event-manage-with-greenlet/

    producer-consumer

    利用yield生成器,可以简单展现协程的工作方式:

    import time
    def consumer():
        print "Ready to receive"
        while True:
            y = (yield )
            time.sleep(1)
            print "Receive %s from producer”%y
    def producer():
        c = consumer()
        c.next()
        i = 1
        while i > 0 and i < 11:
            time.sleep(1)
            print "Send %s to consumer"%i
            c.send(i)
            i += 1
    if __name__ == '__main__':
        producer()

    上述过程展示了基本的生产者-消费者模型,消费者consumer是一个生成器;

    当第一次在producer中调用c.next()时,激活consumer,并且运行到yield时协程(consumer)被挂起,等待生成器被调用next或者send。

    producer进行后续操作,并进入一个循环,每次暂停1s后,向生成器send一个消息,消费者yield获取到该消息,并进行后续的工作。

    可以看到,每次yield都需要等待send传入的消息之后才会继续执行之后的任务。

    通过yield实现协程

    现在要来用yield真正创建一个协程了。

    可以想象这样一个模型,一个工地里有很多相似的任务(jobs),并且会源源不断产生这些任务,工地里有一个工头(foreman)负责,工头为了分配任务给工人(worker),会制定一套流程(pipeline)来方便管理:分配工人,验收工作(accept),由于工人工作(work)的时间远远大于分配任务的时间,将这些工人的工作(简单枯燥的重复劳动)看成IO操作的话,这就是一个IO密集型的任务。下面看看python是如何通过yield来实现协程完成真个工作的:

     1 def main():
     2     foreman(args_of_overall,worker_num)
     3 
     4 def foreman(args_of_overall,worker_num):
     5     pipeline = create_pipeline(args_of_pipeline,worker_num)
     6     for i,job in enumerate(get_jobs(args_of_ceate_jobs)):
     7         worker_id  = i % worker_num
     8         pipeline.send((job,worker_id))
     9 
    10 @coroutine
    11 def worker(pipeline,accepting,job,my_id):
    12     while True:
    13         args_of_job, worker_id = (yield )
    14         if worker_id == my_id:
    15             result = work(args_of_job)
    16             accepting.send(result)
    17         elif pipeline is not None:
    18             pipeline.send((job,worker_id))
    19 
    20 @coroutine
    21 def accept():
    22     while True:
    23         result = (yield )
    24         #do_some_accepting
    25 
    26 def create_pipeline(args_of_pipeline,worker_num):
    27     pipeline = None
    28     accepting = accept()
    29     for work_id in range(work_num):
    30         pipeline = worker(pipeline,accepting,job,work_id)
    31     return pipeline
    32 
    33 def get_jobs(args_of_ceate_jobs):
    34     for job in job_source:
    35         yield job
    36 
    37 def coroutine(func):
    38     def warper(*args):
    39         f = func(*args)
    40         f.next()
    41         return f
    42     return warper
    43 
    44 def work(args_of_job):
    45     pass
    46     #do_some_work
    47 
    48 if __name__ == '__main__':
    49     main()

     上述过程中,工人和验收工作都是协程,而get_jobs()函数是一个生成器,当job是动态添加时,就可以改写成一个协程。

    上述所有的工作都是串行完成,虽然有很多工人,工人之间的工作是并发的(IO等待时间),但是工作一直是从第一个开始一个一个分配任务。

  • 相关阅读:
    大话重构连载首页
    大话重构连载19:大对象的演化过程
    大话重构连载18:最常见的问题
    大话重构连载17:抽取方法的实践
    大话重构连载16:超级大函数
    大话重构连载15:采用Mock技术完成测试
    大话重构连载14:我们是这样自动化测试的
    大话重构连载13:自动化测试——想说爱你不容易
    大话重构连载12:你不能没有保险索
    大话重构连载11:小步快跑是这样玩的
  • 原文地址:https://www.cnblogs.com/lyon2014/p/4596483.html
Copyright © 2011-2022 走看看