zoukankan      html  css  js  c++  java
  • 流畅python学习笔记:第十七章:并发处理

    第十七章:并发处理

    本章主要讨论Python3引入的concurrent.futures模块。在python2.7中需要用pip install futures来安装。concurrent.futures 是python3新增加的一个库,用于并发处理,提供了多线程和多进程的并发功能 类似于其他语言里的线程池(也有一个进程池),他属于上层的封装,对于用户来说,不用在考虑那么多东西了。

    使用方法:
    1 Executor:两个子类ThreadPoolExecutor和ProcessPoolExecutor分别是线程和进程
    submit(fn,*args,**kwargs): fn是需要异步执行的函数,args,kwargs为给函数传递的参数

    2 map(func, *iterables, timeout=None) 
    此map函数和Python自带的map函数功能类似,只不过concurrent模块的map函数从迭代器获得参数后异步执行。并且,每一个异步操作,能用timeout参数来设置超时时间,timeout的值可以是int或float型,如果操作timeout的话,会raisesTimeoutError。如果timeout参数不指定的话,则不设置超时间。 
    func:为需要异步执行的函数 
    iterables:可以是一个能迭代的对象,例如列表等。每一次func执行,会从iterables中取参数。 
    timeout:设置每次异步操作的超时时间 

    3 Future: Future实例是由Executor.submit()创建的。Future提供了丰富的方法来处理调用。

    Future.cancel: 用cancel(),可以终止某个线程和进程的任务,返回状态为 True False

    Future.cancelled():判断是否真的结束了任务。

    Future.running():判断是否还在运行

    Future.done():判断是正常执行完毕的。

    Future.result(timeout=None): 针对result结果做超时的控制。

    4 Wait: wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION和ALL_COMPLETE,默认设置为ALL_COMPLETED。三个参数的意义分别如下:

    FIRST_COMPLETED - Return when any future finishes or is
                      cancelled.
    FIRST_EXCEPTION - Return when any future finishes by raising an
                      exception. If no future raises an exception
                      then it is equivalent to ALL_COMPLETED.
    ALL_COMPLETED -   Return when all futures finish or are cancelled.

    下面来看一个实际的例子:

    def caculate_value_by_wait(x):
        time.sleep(1)
        print 'The value of x*x=%d' % (x*x)


    if __name__=="__main__":
        num=[1,2,3,4,5,6]
        start_time=time.clock()
        for n in num:
            caculate_value_by_wait(n)     (1)
        print 'The toal time is %d' % (time.clock()-start_time)
        start_time1=time.clock()
        with futures.ThreadPoolExecutor(max_workers=6) as executor: (2)
            for n in num:
                executor.submit(caculate_value_by_wait,n)
        print 'Thread pool consume time is %d' % (time.clock()-start_time1)
        start_time2=time.clock()
        with futures.ProcessPoolExecutor(max_workers=6) as executor: (3)
            for n in num:
                executor.submit(caculate_value_by_wait,n)
        print 'Process pool consume time is %d' % (time.clock()-start_time2)

    在这个例子中,分别用线性,多线程和多进程执行了caculate_value_by_wait。执行结果如下:在caculate_value_by_wait中每一次操作都会等待1秒。因此线性的执行总的时间为6秒。而多线程和多进程执行则总共耗时1秒

    E:python2.7.11python.exe E:/py_prj/fluent_python/chapter17.py

    The value of x*x=1

    The value of x*x=4

    The value of x*x=9

    The value of x*x=16

    The value of x*x=25

    The value of x*x=36

    The toal time is 6

    The value of x*x=4

    The value of x*x=1

    The value of x*x=9

    The value of x*x=16

    The value of x*x=25The value of x*x=36

    Thread pool consume time is 1

    The value of x*x=1

    The value of x*x=4

    The value of x*x=9

    The value of x*x=16

    The value of x*x=25

    The value of x*x=36

    Process pool consume time is 1

    如果是用map函数来改造的话,可以写成如下:

    with futures.ProcessPoolExecutor(max_workers=6) as executor:
        executor.map(caculate_value_by_wait,num)
     
    在上面的多线程或者多进程中,我们还可以进一步对每个线程进行监控。方法就是用Future。代码如下
    def caculate_value_by_wait(x):
        time.sleep(1)
        return x*x


    if __name__=="__main__":
        num=[1,2,3,4,5,6]
        with futures.ThreadPoolExecutor(max_workers=6) as executor:
            future_task=[executor.submit(caculate_value_by_wait,n) for n in num]   (1)
            for f in future_task:
                if f.running():    (2)
                    print '%s is running' % str(f)
            for f in as_completed(future_task):   (3)
                try:
                    ret=f.done()                 (4)
                    if ret:
                        f_ret=f.result()          (5)
                        print '%s done,result is %s' % (str(f),str(f_ret))
                except BaseException,e:
                    f.cancel()
                    print e
    (1)  future_task得到所有运行的实例对象
    (2)  判断线程是否在运行
    (3)  得到完成线程的列表
    (4)  判断是否真的完成,是返回True,否则返回False
    (5)  得到各个线程返回的对象
    得到的结果如下:
    E:python2.7.11python.exe E:/py_prj/fluent_python/chapter17.py
    <Future at 0x17cfed0 state=running> is running
    <Future at 0x17d9050 state=running> is running
    <Future at 0x17d9210 state=running> is running
    <Future at 0x17d93d0 state=running> is running
    <Future at 0x17d9590 state=running> is running
    <Future at 0x17d9750 state=running> is running
    <Future at 0x17d9210 state=finished returned int> done,result is 9
    <Future at 0x17cfed0 state=finished returned int> done,result is 1
    <Future at 0x17d93d0 state=finished returned int> done,result is 16
    <Future at 0x17d9050 state=finished returned int> done,result is 4
    <Future at 0x17d9750 state=finished returned int> done,result is 36
    <Future at 0x17d9590 state=finished returned int> done,result is 25

    再来看下wait的用法:

    if __name__=="__main__":
        num=[1,2,3,4,5,6]
        with futures.ThreadPoolExecutor(max_workers=6) as executor:
            future_task=[executor.submit(caculate_value_by_wait,n) for n in num]
            for f in future_task:
                if f.running():
                    print '%s is running' % str(f)
            results=wait(future_task)   (1)
            done=results[0]   (2)
            not_done=results[1]     (3)
            print 'The threads that have finished %s' % done
            print 'The threads that not have finished %s' % not_done
            for x in done:
                print x
            for y in not_done:
                print y

    (1)    得到所有的线程

    (2)    得到已完成的线程

    (3)    得到未完成的线程

    运行结果如下:

    E:python2.7.11python.exe E:/py_prj/fluent_python/chapter17.py

    <Future at 0x177def0 state=running> is running

    <Future at 0x1788070 state=running> is running

    <Future at 0x1788230 state=running> is running

    <Future at 0x17883f0 state=running> is running

    <Future at 0x17885b0 state=running> is running

    <Future at 0x1788770 state=running> is running

    The threads that have finished set([<Future at 0x1788230 state=finished returned int>, <Future at 0x1788070 state=finished returned int>, <Future at 0x177def0 state=finished returned int>, <Future at 0x1788770 state=finished returned int>, <Future at 0x17885b0 state=finished returned int>, <Future at 0x17883f0 state=finished returned int>])

    The threads that not have finished set([])

    <Future at 0x1788230 state=finished returned int>

    <Future at 0x1788070 state=finished returned int>

    <Future at 0x177def0 state=finished returned int>

    <Future at 0x1788770 state=finished returned int>

    <Future at 0x17885b0 state=finished returned int>

    <Future at 0x17883f0 state=finished returned int>

  • 相关阅读:
    奇虎360安全牛人全球挑战赛无线部…
    Portugal 2 1 minute has Pipansihuan Germany and USA tacit or kick the ball
    求最大公约数和最小公倍数
    JQuery的Ajax跨域请求的解决方式
    从Java到C++——从union到VARIANT与CComVariant的深层剖析
    抽卡概率的測试
    jquery序列化表单以及回调函数的使用
    Notepad++插件安装和使用和打开大文件
    Android开发遇到的问题
    bzoj3068: 小白树
  • 原文地址:https://www.cnblogs.com/zhanghongfeng/p/7367937.html
Copyright © 2011-2022 走看看