zoukankan      html  css  js  c++  java
  • Python之旅.第九章.并发编程。

    一、异步+回调机制

    a、问题引入

    问题:

    1)任务的返回值不能得到及时的处理,必须等到所有任务都运行完毕才能统一进行处理

    2)解析的过程是串行执行的,如果解析一次需要花费2s,解析9次则需要花费18s

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

    import os

    import requests

    import time

    import random

     

    def get(url):

        print('%s GET %s' %(os.getpid(),url))

        response=requests.get(url)

        time.sleep(random.randint(1,3))

        if response.status_code == 200:

            return response.text

     

    def pasrse(res):

        print('%s 解析结果为:%s' %(os.getpid(),len(res)))

     

    if __name__ == '__main__':

        urls=[

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.python.org',

        ]

        pool=ProcessPoolExecutor(4)

        objs=[]

        for url in urls:

            obj=pool.submit(get,url)

            objs.append(obj)

     

        pool.shutdown(wait=True)

     

        for obj in objs:

            res=obj.result()

            pasrse(res)

     

    b、进阶解决方案: 可以解决上述两个问题,但使得获取信息函数set和解析信息函数pasrse耦合到了一起

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

    import requests

    import os

    import time

    import random

     

    def get(url):

        print('%s GET %s' %(os.getpid(),url))

        response=requests.get(url)

        time.sleep(random.randint(1,3))

     

        if response.status_code == 200:

            pasrse(response.text)

     

    def pasrse(res):

        print('%s 解析结果为:%s' %(os.getpid(),len(res)))

     

    if __name__ == '__main__':

        urls=[

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.python.org',

     

        ]

        pool=ProcessPoolExecutor(4)

        for url in urls:

            pool.submit(get,url)

     

    c1、终极解决方案: 可以解决上述两个问题,同时使获取信息函数set和解析信息函数pasrse解耦合(进程版)

    主进程作为回调的执行者

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

    import requests

    import os

    import time

    import random

     

    def get(url):

        print('%s GET %s' %(os.getpid(),url))

        response=requests.get(url)

        time.sleep(random.randint(1,3))

     

        if response.status_code == 200:

            # 干解析的活

            return response.text

     

    def pasrse(obj):  #后续回调是obj会将自身传给pasrse,所以pasrse必须有且仅有一个参数

        res=obj.result()

        print('%s 解析结果为:%s' %(os.getpid(),len(res)))

     

    if __name__ == '__main__':

        urls=[

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.python.org',

        ]

     

        pool=ProcessPoolExecutor(4)

        for url in urls:

            obj=pool.submit(get,url)

            obj.add_done_callback(pasrse)

     

        print('主进程',os.getpid())

     

    c2、终极解决方案: 可以解决上述两个问题,同时使获取信息函数set和解析信息函数pasrse解耦合(线程版)

    哪个子进程空闲就由那个子进程作为回调的执行者

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor

    from threading import current_thread

    import requests

    import os

    import time

    import random

     

    def get(url):

        print('%s GET %s' %(current_thread().name,url))

        response=requests.get(url)

        time.sleep(random.randint(1,3))

     

        if response.status_code == 200:

            # 干解析的活

            return response.text

     

    def pasrse(obj):

        res=obj.result()

        print('%s 解析结果为:%s' %(current_thread().name,len(res)))

     

    if __name__ == '__main__':

        urls=[

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.baidu.com',

            'https://www.python.org',

        ]

        pool=ThreadPoolExecutor(4)

        for url in urls:

            obj=pool.submit(get,url)

            obj.add_done_callback(pasrse)

     

        print('主线程',current_thread().name)

     

    二、线程queue

    import queue

     

    q=queue.Queue(3) #队列:先进先出

    q.put(1)

    q.put(2)

    q.put(3)

    # q.put(4)       #阻塞

    print(q.get())

    print(q.get())

    print(q.get())

     

    q=queue.LifoQueue(3) #堆栈:后进先出

    q.put('a')

    q.put('b')

    q.put('c')

    print(q.get())

    print(q.get())

    print(q.get())

     

    q=queue.PriorityQueue(3) #优先级队列:可以以小元组的形式往队列里存值,第一个元素代表优先级,数字越小优先级越高

    q.put((10,'user1'))

    q.put((-3,'user2'))

    q.put((-2,'user3'))

    print(q.get())

    print(q.get())

    print(q.get())

     

    三、线程event

    a、案例一: 等待check重置event内的值后,connectevent.wait()后继续运行

    from threading import Event,current_thread,Thread

    import time

     

    event=Event()   #event内部维护着一个全局变量

     

    def check():

        print('%s 正在检测服务是否正常....' %current_thread().name)

        time.sleep(3)

        event.set() #改变event中的全局变量的值

     

    def connect():

        print('%s 等待连接...' %current_thread().name)

        event.wait() #等待全局变量的值被重置;如果括号中为1,即只等1

        print('%s 开始连接...' % current_thread().name)

     

    if __name__ == '__main__':

        t1=Thread(target=connect)

        t2=Thread(target=connect)

        t3=Thread(target=connect)

        c1=Thread(target=check)

        t1.start()

        t2.start()

        t3.start()

    c1.start()

     

    b、案例二:三次刷尝试后退出

    from threading import Event,current_thread,Thread

    import time

     

    event=Event()

     

    def check():

        print('%s 正在检测服务是否正常....' %current_thread().name)

        time.sleep(5)

        event.set()

     

    def connect():

        count=1

        while not event.is_set():

            if count ==  4:

                print('尝试的次数过多,请稍后重试')

                return

            print('%s 尝试第%s次连接...' %(current_thread().name,count))

            event.wait(1)

            count+=1

        print('%s 开始连接...' % current_thread().name)

     

    if __name__ == '__main__':

        t1=Thread(target=connect)

        t2=Thread(target=connect)

        t3=Thread(target=connect)

        c1=Thread(target=check)

        t1.start()

        t2.start()

        t3.start()

        c1.start()

     

    四、协程

    1、单线程下实现并发:协程 (为了提高效率;但不是说所有协程都会提升效率)

       并发指的多个任务看起来是同时运行的;并发实现的本质:切换+保存状态

       有效的协程在一定程度骗过CPU;通过自己内部协调,一遇到IO就切到自己的其他程序中,使得CPU以为这个程序一直在运行,从而使其更有可能处于就绪态或运行态,以更多的占用CPU

    2、实现并发的三种手段:

    a)单线程下的并发;由程序自己控制,相对速度快

    b)多线程下的并发;由操作系统控制,相对速度较慢

    c)多进程下的并发;由操作系统控制,相对速度慢

     

    3、基于yield保存状态,实现两个任务直接来回切换,即并发的效果 (但yield不会遇到阻塞自动切程序)

       PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.

     

    import time

    def consumer():

        '''任务1:接收数据,处理数据'''

        while True:

            x=yield

     

    def producer():

        '''任务2:生产数据'''

        g=consumer()

        next(g)

        for i in range(10000000):

            g.send(i)

     

    start=time.time()

    producer() #1.0202116966247559

    stop=time.time()

    print(stop-start)

     

    # 纯计算的任务并发执行

    import time

    def task1():

        res=1

        for i in range(1000000):

            res+=i

            yield

            time.sleep(10000)  #yield不会自动跳过阻塞

            print('task1')

     

    def task2():

        g=task1()

        res=1

        for i in range(1000000):

            res*=i

            next(g)

            print('task2')

     

    start=time.time()

    task2()

    stop=time.time()

    print(stop-start)

     

    五、单线程下实现遇到IO切换

    1 greenlet(封装yield,遇到IO不自动切)

    from greenlet import greenlet

    import time

     

    def eat(name):

        print('%s eat 1' %name)

        time.sleep(30)

        g2.switch('alex')  #只在第一次切换时传值

        print('%s eat 2' %name)

        g2.switch()

    def play(name):

        print('%s play 1' %name)

        g1.switch()

        print('%s play 2' %name)

     

    g1=greenlet(eat)

    g2=greenlet(play)

    g1.switch('egon')

     

    2 gevent模块(封装greenlet,不处理的话,遇到自己的IO才主动切)

    import gevent

     

    def eat(name):

        print('%s eat 1' %name)

        gevent.sleep(5)  #换成time.sleep(5),不会自动切

        print('%s eat 2' %name)

    def play(name):

        print('%s play 1' %name)

        gevent.sleep(3)

        print('%s play 2' %name)

     

    g1=gevent.spawn(eat,'egon')

    g2=gevent.spawn(play,'alex')

     

    # gevent.sleep(100)

    # g1.join()

    # g2.join()

    gevent.joinall([g1,g2])

     

    3 gevent模块(封装greenlet,处理的话,遇到其他IO也主动切)

    from gevent import monkey;monkey.patch_all()

    from threading import current_thread

    import gevent

    import time

     

    def eat():

        print('%s eat 1' %current_thread().name)

        time.sleep(5)

        print('%s eat 2' %current_thread().name)

    def play():

        print('%s play 1' %current_thread().name)

        time.sleep(3)

        print('%s play 2' %current_thread().name)

     

    g1=gevent.spawn(eat)

    g2=gevent.spawn(play)

     

    # gevent.sleep(100)

    # g1.join()

    # g2.join()

    print(current_thread().name)

    gevent.joinall([g1,g2])

  • 相关阅读:
    19.1.25 [LeetCode8]String to Integer (atoi)
    19.1.23 CJK Round 1A 2015
    19.1.22 CJK Qualification Round 2015
    【转载】超级弹丸论破2再见绝望学园攻略
    19.1.20 [LeetCode 7]Reverse Integer
    19.1.20 [LeetCode 6]ZigZag Conversion
    19.1.20 [LeetCode 5]Longest Palindromic Substring
    python socket-select io多路复用
    web框架 源码
    python socket
  • 原文地址:https://www.cnblogs.com/yangli0504/p/8970977.html
Copyright © 2011-2022 走看看