zoukankan      html  css  js  c++  java
  • 异步回调,线程队列,协程

    异步回调

    1.以爬取网站数据为例

    ①异步提交任务,等所有任务执行完毕后,串行解析

    缺点:任务的返回值不能得到即使的处理,必须等到任务都完成后,一起拿到结果,串行解析

    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import random
    def get(url):
        print("%s 启动"%current_thread().name)
        time.sleep(random.randint(1,2))
        res = requests.get(url)
        if res.status_code == 200:
        	print("%s 结束" % current_thread().name)
        	return  res.content.decode("utf-8")
        # print(response.text)#返回文本
        # print(response.content.decode("utf-8"))#可以显示中文
        
    def parser(res):
        print("%s 解析结果为 %s"%(current_thread().name,len(res)))
        
    if __name__ == '__main__':
        tpool = ThreadPoolExecutor(4)
        urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.tmall.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.python.org",
    "https://www.apple.com"]
        objs = []
        for i in urls:
            obj = tpool.submit(get,i)#异步提交任务
            objs.append(obj)
        tpool.shutdown(wait = True)
        for obj in objs:#串行解析
            parser(obj.result())
    

    ②爬取和解析放在一个函数内,实现了并发解析,相当于给线程加了一个任务

    爬取和解析耦合性强

    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import random
    
    def get(url):
        print("%s 启动"%current_thread().name)
        time.sleep(random.randint(1,2))
        res = requests.get(url).content.decode("utf-8")
        print("%s 结束" % current_thread().name)
        parser(res)
        # print(response.text)#返回文本
        # print(response.content.decode("utf-8"))#可以显示中文
    def parser(res):
        print("%s 解析结果为 %s"%(current_thread().name,len(res)))
    if __name__ == '__main__':
        tpool = ThreadPoolExecutor(4)
        urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.tmall.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.python.org",
    "https://www.apple.com"]
        # objs = []
        for i in urls:
            obj = tpool.submit(get,i)
    

    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import random
    def get(url):
        print("%s 启动"%current_thread().name)
        time.sleep(random.randint(1,2))
        res = requests.get(url).content.decode("utf-8")
        print("%s 结束" % current_thread().name)
        return res
        # print(response.text)#返回文本
        # print(response.content.decode("utf-8"))#可以显示中文
        
        
    def parser(obj):#这里只能放对象
        res = obj.result()#对象拿到返回值后,才过来调用,所以不会有阻塞
        print("%s 解析结果为 %s"%(current_thread().name,len(res)))
        
        
    if __name__ == '__main__':
        tpool = ThreadPoolExecutor(4)
        urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.tmall.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.python.org",
    "https://www.apple.com"]
        for i in urls:
            obj = tpool.submit(get,i)
            obj.add_done_callback(parser)#将任务绑定方法,任务执行完毕后(拿到返回值),自动调用该方法(将拿到的返回值给方法)
     
     parser不会有阻塞,什么时候执行,其实是将对象传送给parser方法,对象执行完毕,拿到返回值,才会执行parser方法,故不会有阻塞
         
    

    主进程交给子进程一个任务,子进程在执行完后,发一个信号给主进程,主进程调用它自己的函数

    通常异步任务都会和回调函数一起使用

    使用add_done_callback()给future对象绑定一个回调函数

    注意在多进程中,回调函数是交给主进程执行,而多线程中,回调函数是谁有空交给谁执行,但一定不是主线程执行

    线程队列:与进程队列的区别,进程队列可以被多进程共享,而线程中的队列就是一个普通的容器不能进程共享

    进程队列是申请一片共享的内存空间

    #1.先进先出
    import queue
    q = queue.Queue()
    #2.后进先出
    q = queue.LiFoQueue()
    #3.优先级队列
    q = queue.PriorityQueue()
    参数元组(优先级,数值)
    优先级数值越小,优先级越高
    

    事件

    #event
    #用于协调多个线程间的工作
    #例如一个线程要执行某个操作,需要获取另一个线程的状态
    #多线程之间传送消息
    from threading import Event,Thread,current_thread
    import time
    e = Event()
    def check():
        print("%s正在检测服务器"%current_thread().name)
        time.sleep(3)
        e.set()
    
    def connect():
        print("%s 正在连接"%current_thread().name)
        e.wait()
        print("%s 连接成功"%current_thread().name)
    
    if __name__ == '__main__':
        t1 =Thread(target=check)
        t2 = Thread(target=check)
        c1 = Thread(target=connect)
        c2 = Thread(target=connect)
        t1.start()
        t2.start()
        c1.start()
        c2.start()
        
        
    def check():
        print("%s正在检测服务器"%current_thread().name)
        time.sleep(2)
        e.set()
    def connect():
        for i in range(3):
            if e.wait(1):
                print("%s 连接成功"%current_thread().name)
                break
        else:
            print("%s 连接失败"%current_thread().name)
    
    if __name__ == '__main__':
        t1 =Thread(target=check)
        c1 = Thread(target=connect)
        t1.start()
        c1.start()
     
    
    

    协程

    #单线程下实现并发
    #并发指的多个任务看起来同时运行的
    #并发实现的本质:切换加保存状态
    
    优点:
    协程的切换开销小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    单线程内就可以实现并发的效果,更大限度的利用cpu
    缺点:
    协程的本质是单线程下,无法利用多核,可以是一个程序开启多进程,每个进程内开启多个线程,每个线程内开启协程
    协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    

    greenlet模块

    不能实现遇到IO切,底层封装了yield

    greenlet底层封装了yield
    from greenlet import greenlet
    def eat(name):
        print("%s eat 1"%name)
        #import time
        #time.sleep(3)#greenlet 遇到IO并不会切换
        g2.switch("alex")
        print("%s eat 2"%name)
        g2.switch()
        
    def play(name):
        print("%s play 1"%name)
    	g1.switch()
        print("%s play 2"%name)
    
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch("egon")#第一次切的时候需要传参数
    #switch 转换
    

    gevent模块

    from gevent import monkey;monkey.patch_all()#打补丁,不打的话,无法识别IO操作
    import gevent
    from threading import current_thread
    def eat():
        print("%s eat 1"%current_thread().name)
        time.sleep(3)
        print("%s eat 2"%current_thread().name)
        
     def play():
        print("%s play 1"%current_thread().name)
        time.sleep(3)
        print("%s play 2"%current_thread().name)
     
    
     print(current_thread().name)
     g1 = gevent.spawn(eat)#提交任务
     g2 = gevent.spawn(play)
     gevent.joinall([g1,g2])#异步提交,没有这步的话,发起提交任务后,可能就退出程序了,连运行都没运行
    """执行结果:
    MainThread
    DummyThread-1 eat 1(dummy假的,假的线程1,2)
    DummyThread-2 play 1
    DummyThread-1 eat 2
    DummyThread-2 play 2
    """
    
    
    
    
  • 相关阅读:
    RMQ
    LCA 笔记
    LUCAS 定理
    topcoder 643 DIV2
    BZOJ 1071组队
    Codeforces Round #283 (Div. 2)
    topcoder 642
    Codeforces Round #278 (Div. 2)
    树链剖分
    Codeforces Round #277 (Div. 2)
  • 原文地址:https://www.cnblogs.com/robert-zhou/p/10222536.html
Copyright © 2011-2022 走看看