zoukankan      html  css  js  c++  java
  • python并发编程之进程池,线程池,协程(Python标准模块--concurrent.futures(并发未来))

    需要注意一下
    不能无限的开进程,不能无限的开线程
    最常用的就是开进程池,开线程池。其中回调函数非常重要
    回调函数其实可以作为一种编程思想,谁好了谁就去掉

    只要你用并发,就会有锁的问题,但是你不能一直去自己加锁吧
    那么我们就用QUEUE,这样还解决了自动加锁的问题
    由Queue延伸出的一个点也非常重要的概念。以后写程序也会用到
    这个思想。就是生产者与消费者问题

    一、Python标准模块--concurrent.futures(并发未来)

    concurent.future模块需要了解的
    1.concurent.future模块是用来创建并行的任务,提供了更高级别的接口,
    为了异步执行调用
    2.concurent.future这个模块用起来非常方便,它的接口也封装的非常简单
    3.concurent.future模块既可以实现进程池,也可以实现线程池
    4.模块导入进程池和线程池
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    还可以导入一个Executor,但是你别这样导,这个类是一个抽象类
    抽象类的目的是规范他的子类必须有某种方法(并且抽象类的方法必须实现),但是抽象类不能被实例化
    5.
    p = ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是cpu的数目,默认是4个
    p = ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是cpu的数目*5
    6.如果是进程池,得到的结果如果是一个对象。我们得用一个.get()方法得到结果
    但是现在用了concurent.future模块,我们可以用obj.result方法
    p.submit(task,i) #相当于apply_async异步方法
    p.shutdown() #默认有个参数wite=True (相当于close和join)

    那么什么是线程池呢?我们来了解一下

    二、线程池

    进程池:就是在一个进程内控制一定个数的线程
    基于concurent.future模块的进程池和线程池 (他们的同步执行和异步执行是一样的)
    1 # 1.同步执行--------------
     2 from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
     3 import os,time,random
     4 def task(n):
     5     print('[%s] is running'%os.getpid())
     6     time.sleep(random.randint(1,3))  #I/O密集型的,,一般用线程,用了进程耗时长
     7     return n**2
     8 if __name__ == '__main__':
     9     start = time.time()
    10     p = ProcessPoolExecutor()
    11     for i in range(10): #现在是开了10个任务, 那么如果是上百个任务呢,就不能无线的开进程,那么就得考虑控制
    12         # 线程数了,那么就得考虑到池了
    13         obj  = p.submit(task,i).result()  #相当于apply同步方法
    14     p.shutdown()  #相当于close和join方法
    15     print('='*30)
    16     print(time.time() - start)  #17.36499309539795
    17 
    18 
    19 # 2.异步执行-----------
    20 # from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    21 # import os,time,random
    22 # def task(n):
    23 #     print('[%s] is running'%os.getpid())
    24 #     time.sleep(random.randint(1,3))  #I/O密集型的,,一般用线程,用了进程耗时长
    25 #     return n**2
    26 # if __name__ == '__main__':
    27 #     start = time.time()
    28 #     p = ProcessPoolExecutor()
    29 #     l = []
    30 #     for i in range(10): #现在是开了10个任务, 那么如果是上百个任务呢,就不能无线的开进程,那么就得考虑控制
    31 #         # 线程数了,那么就得考虑到池了
    32 #         obj  = p.submit(task,i)  #相当于apply_async()异步方法
    33 #         l.append(obj)
    34 #     p.shutdown()  #相当于close和join方法
    35 #     print('='*30)
    36 #     print([obj.result() for obj in l])
    37 #     print(time.time() - start)  #5.362306594848633
    基于concurrent.futures模块的进程池
     1 from  concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
     2 from threading import currentThread
     3 import os,time,random
     4 def task(n):
     5     print('%s:%s is running'%(currentThread().getName(),os.getpid()))  #看到的pid都是一样的,因为线程是共享了一个进程
     6     time.sleep(random.randint(1,3))  #I/O密集型的,,一般用线程,用了进程耗时长
     7     return n**2
     8 if __name__ == '__main__':
     9     start = time.time()
    10     p = ThreadPoolExecutor() #线程池 #如果不给定值,默认cup*5
    11     l = []
    12     for i in range(10):  #10个任务 # 线程池效率高了
    13         obj  = p.submit(task,i)  #相当于apply_async异步方法
    14         l.append(obj)
    15     p.shutdown()  #默认有个参数wite=True (相当于close和join)
    16     print('='*30)
    17     print([obj.result() for obj in l])
    18     print(time.time() - start)  #3.001171827316284
    基于concurrent.futures模块的线程池

    应用线程池(下载网页并解析)

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import requests
    import time,os
    def get_page(url):
        print('<%s> is getting [%s]'%(os.getpid(),url))
        response = requests.get(url)
        if response.status_code==200:  #200代表状态:下载成功了
            return {'url':url,'text':response.text}
    def parse_page(res):
        res = res.result()
        print('<%s> is getting [%s]'%(os.getpid(),res['url']))
        with open('db.txt','a') as f:
            parse_res = 'url:%s size:%s
    '%(res['url'],len(res['text']))
            f.write(parse_res)
    if __name__ == '__main__':
        # p = ThreadPoolExecutor()
        p = ProcessPoolExecutor()
        l = [
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
        ]
        for url in l:
            res = p.submit(get_page,url).add_done_callback(parse_page) #这里的回调函数拿到的是一个对象。得
            #  先把返回的res得到一个结果。即在前面加上一个res.result() #谁好了谁去掉回调函数
                                    # 回调函数也是一种编程思想。不仅开线程池用,开线程池也用
        p.shutdown()  #相当于进程池里的close和join
        print('主',os.getpid())

    map函数的应用

    # map函数举例
    obj= map(lambda x:x**2 ,range(10))
    print(list(obj))
    
    #运行结果[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    可以和上面的开进程池/线程池的对比着看,就能发现map函数的强大了
    map函数的应用

    三、协程介绍

    协程:单线程下实现并发(提高效率)

    说到协成,我们先说一下协程联想到的知识点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    切换关键的一点是:保存状态(从原来停留的地方继续切)
    return:只能执行一次,结束函数的标志
    yield:函数中但凡有yield,这个函数的执行结果就变成了一个生成器,
    生成器本质就是一个迭代器,那么迭代器怎么用呢?用一个next()方法
     
     
    1.yield语句的形式:yield 1
    yield功能1:可以用来返回值,可以返回多次值
    yield功能2:可以吧函数暂停住,保存原来的状态
     
    2.yield表达式的形式:x = yieldsend可以吧一个函数的结果传给另一个函数,以此实现单线程内程序之间的切换
    send()要想用就得先next()一下
    但是要用send至少要用两个yield  

    yield复习

     1 3.yield功能2(可以吧函数暂停住,保存原来的状态)--------------
     2 def f1():
     3     print('first')
     4     yield 1
     5     print('second')
     6     yield 2
     7     print('third')
     8     yield 3
     9 # print(f1())  #加了yield返回的是一个生成器
    10 g = f1()
    11 print(next(g))  #当遇见了yield的时候就返回一个值,而且保存原来的状态
    12 print(next(g))  #当遇见了yield的时候就返回一个值
    13 print(next(g))  #当遇见了yield的时候就返回一个值
    yield功能示例1
    1 # 3.yield表达式(对于表达式的yield)--------------------
     2 import time
     3 def wrapper(func):
     4     def inner(*args,**kwargs):
     5         ret =func(*args,**kwargs)
     6         next(ret)
     7         return ret
     8     return inner
     9 @wrapper
    10 def consumer():
    11     while True:
    12         x= yield
    13         print(x)
    14 
    15 def producter(target):
    16     '''生产者造值'''
    17     # next(g)  #相当于g.send(None)
    18     for i in range(10):
    19         time.sleep(0.5)
    20         target.send(i)#要用send就得用两个yield
    21 producter(consumer())
    yield功能示例2

    引子

    本节主题是实现单线程下的并发,即只在一个主线程,并且很明显的是,可利用的cpu只有一个情况下实现并发,

    为此我们需要先回顾下并发的本质:切换+保存状态

     cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),

    一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长

         其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来大家都被执行的效果,如果多个程序都是纯计算任务,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

    1
    2
    1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
    2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换

    单纯的切反而会影响效率

     1 #串行执行
     2 import time
     3 def consumer(res):
     4     '''任务1:接收数据,处理数据'''
     5     pass
     6 
     7 def producer():
     8     '''任务2:生产数据'''
     9     res=[]
    10     for i in range(10000000):
    11         res.append(i)
    12     return res
    13 
    14 start=time.time()
    15 #串行执行
    16 res=producer()
    17 consumer(res)
    18 stop=time.time()
    19 print(stop-start) #1.5536692142486572
    串行执行
    1 import time
     2 def wrapper(func):
     3     def inner(*args,**kwargs):
     4         ret =func(*args,**kwargs)
     5         next(ret)
     6         return ret
     7     return inner
     8 @wrapper
     9 def consumer():
    10     while True:
    11         x= yield
    12         print(x)
    13 
    14 def producter(target):
    15     '''生产者造值'''
    16     # next(g)  #相当于g.send(None)
    17     for i in range(10):
    18         time.sleep(0.5)
    19         target.send(i)#要用send就得用两个yield
    20 producter(consumer())
    基于yield并发执行

     对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下多个任务能遇到io就切换,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态,相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,对于操作系统来说:这哥们(该线程)好像是一直处于计算过程的,io比较少。   

        协程的本质就是在单线程下,由用户自己控制一个任务遇到io阻塞了就切换另外一个任务去执行,以此来提升效率。

        因此我们需要找寻一种可以同时满足以下条件的解决方案:

        1. 可以控制多个任务之间的切换,切换之前将任务的状态保存下来(重新运行时,可以基于暂停的位置继续)

        2. 作为1的补充:可以检测io操作,在遇到io操作的情况下才发生切换

    四、Greenlet

    Greenlet模块和yield没有什么区别,就只是单纯的切,跟效率无关。

    只不过比yield更好一点,切的时候方便一点。但是仍然没有解决效率

    Greenlet可以让你在多个任务之间来回的切

    1
    2
    #安装
    pip3 install greenlet

    举例:

     1 from greenlet import greenlet
     2 import time
     3 def eat(name):
     4     print('%s eat 1' %name)
     5     time.sleep(10)  #当遇到IO的时候它也没有切,这就得用gevent了
     6     g2.switch('egon')
     7     print('%s eat 2' %name)
     8     g2.switch()
     9 def play(name):
    10     print('%s play 1' %name)
    11     g1.switch()
    12     print('%s play 2' %name)
    13 
    14 g1=greenlet(eat)
    15 g2=greenlet(play)
    16 
    17 g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
    greenlet

    所以上面的方法都不可行,那么这就用到了Gevert ,也就是协程。就解决了单线程实现并发的问题,还提升了效率

    五、Gevent介绍

    1
    2
    #安装
    pip3 install gevent

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet,

    它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    #用法
    g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
    
    g2=gevent.spawn(func2)
    
    g1.join() #等待g1结束
    
    g2.join() #等待g2结束
    
    #或者上述两步合作一步:gevent.joinall([g1,g2])
    
    g1.value#拿到func1的返回值

    举例;

    1 from gevent import monkey;monkey.patch_all()
     2 import gevent
     3 import time
     4 def eat(name):
     5     print('%s eat 1' %name)
     6     time.sleep(2)  #我们用等待的时间模拟IO阻塞
     7     ''' 在gevent模块里面要用gevent.sleep(2)表示等待的时间
     8         然而我们经常用time.sleep()用习惯了,那么有些人就想着
     9         可以用time.sleep(),那么也不是不可以。要想用,就得在
    10         最上面导入from gevent import monkey;monkey.patch_all()这句话
    11         如果不导入直接用time.sleep(),就实现不了单线程并发的效果了
    12     '''
    13     # gevent.sleep(2)
    14     print('%s eat 2' %name)
    15     return 'eat'
    16 def play(name):
    17     print('%s play 1' %name)
    18     time.sleep(3)
    19     # gevent.sleep(3)
    20     print('%s play 2' %name)
    21     return 'paly'  #当有返回值的时候,gevent模块也提供了返回结果的操作
    22 
    23 start = time.time()
    24 g1 = gevent.spawn(eat,'egon')  #执行任务
    25 g2 = gevent.spawn(play,'egon')  #g1和g2的参数可以不一样
    26 # g1.join()  #等待g1
    27 # g2.join()  #等待g2
    28 #上面等待的两句也可以这样写
    29 gevent.joinall([g1,g2])
    30 print('',time.time()-start) #3.001171588897705
    31 
    32 print(g1.value)
    33 print(g2.value)
    gevent的一些方法(重要)

    需要说明的是:

    gevent.sleep(2)模拟的是gevent可以识别的io阻塞,

    而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了

    from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前

    或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

    六、Gevent之同步于异步

    1 from gevent import spawn,joinall,monkey;monkey.patch_all()
     2 
     3 import time
     4 def task(pid):
     5     """
     6     Some non-deterministic task
     7     """
     8     time.sleep(0.5)
     9     print('Task %s done' % pid)
    10 
    11 
    12 def synchronous():
    13     for i in range(10):
    14         task(i)
    15 
    16 def asynchronous():
    17     g_l=[spawn(task,i) for i in range(10)]
    18     joinall(g_l)
    19 
    20 if __name__ == '__main__':
    21     print('Synchronous:')
    22     synchronous()
    23 
    24     print('Asynchronous:')
    25     asynchronous()
    26 #上面程序的重要部分是将task函数封装到Greenlet内部线程的gevent.spawn。 初始化的greenlet列表存放在数组threads中,此数组被传给gevent.joinall 函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在 所有greenlet执行完后才会继续向下走。
    View Code

    七、Gevent之应用举例一

     1 from gevent import monkey;monkey.patch_all()  #打补丁
     2 import gevent
     3 import requests
     4 import time
     5 def get_page(url):
     6     print('get :%s'%url)
     7     response = requests.get(url)
     8     if response.status_code==200: #下载成功的状态
     9         print('%d bytes received from:%s'%(len(response.text),url))
    10 start=time.time()
    11 gevent.joinall([
    12     gevent.spawn(get_page,'http://www.baidu.com'),
    13     gevent.spawn(get_page, 'https://www.yahoo.com/'),
    14     gevent.spawn(get_page, 'https://github.com/'),
    15 ])
    16 stop = time.time()
    17 print('run time is %s' %(stop-start))
    协程应用爬虫
    from gevent import joinall,spawn,monkey;monkey.patch_all()
    import requests
    from threading import current_thread
    
    def parse_page(res):
        print('%s PARSE %s' %(current_thread().getName(),len(res)))
    
    def get_page(url,callback=parse_page):
        print('%s GET %s' %(current_thread().getName(),url))
        response=requests.get(url)
        if response.status_code == 200:
            callback(response.text)
    
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.taobao.com',
            'https://www.openstack.org',
        ]
    
        tasks=[]
        for url in urls:
            tasks.append(spawn(get_page,url))
    
        joinall(tasks)
    协程应用爬虫(回调函数)

    八、Gevent之应用举例二

     也可以利用协程实现并发

    1 #!usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 from  gevent import monkey;monkey.patch_all()
     4 import gevent
     5 from socket import *
     6 print('start running...')
     7 def talk(conn,addr):
     8     while True:
     9         data = conn.recv(1024)
    10         print('%s:%s %s'%(addr[0],addr[1],data))
    11         conn.send(data.upper())
    12     conn.close()
    13 def server(ip,duankou):
    14     server = socket(AF_INET, SOCK_STREAM)
    15     server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
    16     server.bind((ip,duankou))
    17     server.listen(5)
    18     while True:
    19         conn,addr = server.accept()  #等待链接
    20         gevent.spawn(talk,conn,addr)  #异步执行 (p =Process(target=talk,args=(coon,addr))
    21                                                 # p.start())相当于开进程里的这两句
    22     server.close()
    23 if __name__ == '__main__':
    24     server('127.0.0.1',8081)
    服务端利用协程
    1 #!usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 from multiprocessing import Process
     4 from gevent import monkey;monkey.patch_all()
     5 from socket import *
     6 def client(ip,duankou):
     7     client = socket(AF_INET, SOCK_STREAM)
     8     client.connect((ip,duankou))
     9     while True:
    10         client.send('hello'.encode('utf-8'))
    11         data = client.recv(1024)
    12         print(data.decode('utf-8'))
    13 if __name__ == '__main__':
    14     for i in range(100):
    15         p = Process(target=client,args=(('127.0.0.1',8081)))
    16         p.start()
    客户端开了100个进程
     
  • 相关阅读:
    react ts axios 配置跨域
    npm run eject“Remove untracked files, stash or commit any changes, and try again.”错误
    java 进程的参数和list的线程安全
    帆软报表 大屏列表跑马灯效果JS
    帆软报表 快速复用数据集,避免重复劳动
    分析云 OA中部门分级思路和实现方法
    分析云 分段器 只显示一个块的数据
    分析云 更改服务默认的端口号
    分析云U8项目配置方法新版本(2)
    Oracle 创建时间维度表并更新是否工作日字段
  • 原文地址:https://www.cnblogs.com/shangping/p/12536962.html
Copyright © 2011-2022 走看看