zoukankan      html  css  js  c++  java
  • 28 Apr 18 异步+回调 线程queue 线程event 协程(yield,greenlet,gevent)

    28 Apr 18
    一、异步+回调机制
    a、问题引入
    问题:
    1)任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理
    2)解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os
    import requests
    import time
    import random
     
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
        if response.status_code == 200:
            return response.text
     
    def pasrse(res):
        print('%s 解析结果为:%s' %(os.getpid(),len(res)))
     
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
        ]
        pool=ProcessPoolExecutor(4)
        objs=[]
        for url in urls:
            obj=pool.submit(get,url)
            objs.append(obj)
     
        pool.shutdown(wait=True)
     
        for obj in objs:
            res=obj.result()
            pasrse(res)
     
    b、进阶解决方案: 可以解决上述两个问题,但使得获取信息函数set和解析信息函数pasrse耦合到了一起
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import requests
    import os
    import time
    import random
     
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
     
        if response.status_code == 200:
            pasrse(response.text)
     
    def pasrse(res):
        print('%s 解析结果为:%s' %(os.getpid(),len(res)))
     
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
     
        ]
        pool=ProcessPoolExecutor(4)
        for url in urls:
            pool.submit(get,url)
     
    c1、终极解决方案: 可以解决上述两个问题,同时使获取信息函数set和解析信息函数pasrse解耦合(进程版)
    主进程作为回调的执行者
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import requests
    import os
    import time
    import random
     
    def get(url):
        print('%s GET %s' %(os.getpid(),url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
     
        if response.status_code == 200:
            # 干解析的活
            return response.text
     
    def pasrse(obj):  #后续回调是obj会将自身传给pasrse,所以pasrse必须有且仅有一个参数
        res=obj.result()
        print('%s 解析结果为:%s' %(os.getpid(),len(res)))
     
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
        ]
     
        pool=ProcessPoolExecutor(4)
        for url in urls:
            obj=pool.submit(get,url)
            obj.add_done_callback(pasrse)
     
        print('主进程',os.getpid())
     
    c2、终极解决方案: 可以解决上述两个问题,同时使获取信息函数set和解析信息函数pasrse解耦合(线程版)
    哪个子进程空闲就由那个子进程作为回调的执行者
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import requests
    import os
    import time
    import random
     
    def get(url):
        print('%s GET %s' %(current_thread().name,url))
        response=requests.get(url)
        time.sleep(random.randint(1,3))
     
        if response.status_code == 200:
            # 干解析的活
            return response.text
     
    def pasrse(obj):
        res=obj.result()
        print('%s 解析结果为:%s' %(current_thread().name,len(res)))
     
    if __name__ == '__main__':
        urls=[
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.baidu.com',
            'https://www.python.org',
        ]
        pool=ThreadPoolExecutor(4)
        for url in urls:
            obj=pool.submit(get,url)
            obj.add_done_callback(pasrse)
     
        print('主线程',current_thread().name)
     
    二、线程queue
    import queue
     
    q=queue.Queue(3) #队列:先进先出
    q.put(1)
    q.put(2)
    q.put(3)
    # q.put(4)       #阻塞
    print(q.get())
    print(q.get())
    print(q.get())
     
    q=queue.LifoQueue(3) #堆栈:后进先出
    q.put('a')
    q.put('b')
    q.put('c')
    print(q.get())
    print(q.get())
    print(q.get())
     
    q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高
    q.put((10,'user1'))
    q.put((-3,'user2'))
    q.put((-2,'user3'))
    print(q.get())
    print(q.get())
    print(q.get())
     
    三、线程event
    a、案例一: 等待check重置event内的值后,connect从event.wait()后继续运行
    from threading import Event,current_thread,Thread
    import time
     
    event=Event()   #event内部维护着一个全局变量
     
    def check():
        print('%s 正在检测服务是否正常....' %current_thread().name)
        time.sleep(3)
        event.set() #改变event中的全局变量的值
     
    def connect():
        print('%s 等待连接...' %current_thread().name)
        event.wait() #等待全局变量的值被重置;如果括号中为1,即只等1秒
        print('%s 开始连接...' % current_thread().name)
     
    if __name__ == '__main__':
        t1=Thread(target=connect)
        t2=Thread(target=connect)
        t3=Thread(target=connect)
        c1=Thread(target=check)
        t1.start()
        t2.start()
        t3.start()
    c1.start()
     
    b、案例二:三次刷尝试后退出
    from threading import Event,current_thread,Thread
    import time
     
    event=Event()
     
    def check():
        print('%s 正在检测服务是否正常....' %current_thread().name)
        time.sleep(5)
        event.set()
     
    def connect():
        count=1
        while not event.is_set():
            if count ==  4:
                print('尝试的次数过多,请稍后重试')
                return
            print('%s 尝试第%s次连接...' %(current_thread().name,count))
            event.wait(1)
            count+=1
        print('%s 开始连接...' % current_thread().name)
     
    if __name__ == '__main__':
        t1=Thread(target=connect)
        t2=Thread(target=connect)
        t3=Thread(target=connect)
        c1=Thread(target=check)
        t1.start()
        t2.start()
        t3.start()
        c1.start()
     
    四、协程
    1、单线程下实现并发:协程 (为了提高效率;但不是说所有协程都会提升效率)
       并发指的多个任务看起来是同时运行的;并发实现的本质:切换+保存状态
       有效的协程在一定程度‘骗过’了CPU;通过自己内部协调,一遇到IO就切到自己的其他程序中,使得CPU以为这个程序一直在运行,从而使其更有可能处于就绪态或运行态,以更多的占用CPU。
    2、实现并发的三种手段:
    a)单线程下的并发;由程序自己控制,相对速度快
    b)多线程下的并发;由操作系统控制,相对速度较慢
    c)多进程下的并发;由操作系统控制,相对速度慢
     
    3、基于yield保存状态,实现两个任务直接来回切换,即并发的效果 (但yield不会遇到阻塞自动切程序)
       PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
     
    import time
    def consumer():
        '''任务1:接收数据,处理数据'''
        while True:
            x=yield
     
    def producer():
        '''任务2:生产数据'''
        g=consumer()
        next(g)
        for i in range(10000000):
            g.send(i)
     
    start=time.time()
    producer() #1.0202116966247559
    stop=time.time()
    print(stop-start)
     
    # 纯计算的任务并发执行
    import time
    def task1():
        res=1
        for i in range(1000000):
            res+=i
            yield
            time.sleep(10000)  #yield不会自动跳过阻塞
            print('task1')
     
    def task2():
        g=task1()
        res=1
        for i in range(1000000):
            res*=i
            next(g)
            print('task2')
     
    start=time.time()
    task2()
    stop=time.time()
    print(stop-start)
     
    五、单线程下实现遇到IO切换
    1、 用greenlet(封装yield,遇到IO不自动切)
    from greenlet import greenlet
    import time
     
    def eat(name):
        print('%s eat 1' %name)
        time.sleep(30)
        g2.switch('alex')  #只在第一次切换时传值
        print('%s eat 2' %name)
        g2.switch()
    def play(name):
        print('%s play 1' %name)
        g1.switch()
        print('%s play 2' %name)
     
    g1=greenlet(eat)
    g2=greenlet(play)
    g1.switch('egon')
     
    2、 用gevent模块(封装greenlet,不处理的话,遇到自己的IO才主动切)
    import gevent
     
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(5)  #换成time.sleep(5),不会自动切
        print('%s eat 2' %name)
    def play(name):
        print('%s play 1' %name)
        gevent.sleep(3)
        print('%s play 2' %name)
     
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,'alex')
     
    # gevent.sleep(100)
    # g1.join()
    # g2.join()
    gevent.joinall([g1,g2])
     
    3、 用gevent模块(封装greenlet,处理的话,遇到其他IO也主动切)
    from gevent import monkey;monkey.patch_all()
    from threading import current_thread
    import gevent
    import time
     
    def eat():
        print('%s eat 1' %current_thread().name)
        time.sleep(5)
        print('%s eat 2' %current_thread().name)
    def play():
        print('%s play 1' %current_thread().name)
        time.sleep(3)
        print('%s play 2' %current_thread().name)
     
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play)
     
    # gevent.sleep(100)
    # g1.join()
    # g2.join()
    print(current_thread().name)
    gevent.joinall([g1,g2])
  • 相关阅读:
    elastic-job-console
    CentOS7_安装mysql5.7
    CentOS7_开放指定端口
    CentOS7_防火墙
    Docker-CentOS7-安装
    MySQL_写锁_lock tables tableName write
    cesium 动态流动纹理
    cesium加载二维贴地的地名(本地地名数据)
    python3.6安装open AI gym环境(windows)
    python PIL打开较大的tif影像时出错-OSError: cannot identify image file Image.open
  • 原文地址:https://www.cnblogs.com/zhangyaqian/p/py20180428.html
Copyright © 2011-2022 走看看