zoukankan      html  css  js  c++  java
  • 异步回调,线程队列,协程

    异步回调

    1.以爬取网站数据为例

    ①异步提交任务,等所有任务执行完毕后,串行解析

    缺点:任务的返回值不能得到即使的处理,必须等到任务都完成后,一起拿到结果,串行解析

    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import random
    def get(url):
        print("%s 启动"%current_thread().name)
        time.sleep(random.randint(1,2))
        res = requests.get(url)
        if res.status_code == 200:
        	print("%s 结束" % current_thread().name)
        	return  res.content.decode("utf-8")
        # print(response.text)#返回文本
        # print(response.content.decode("utf-8"))#可以显示中文
        
    def parser(res):
        print("%s 解析结果为 %s"%(current_thread().name,len(res)))
        
    if __name__ == '__main__':
        tpool = ThreadPoolExecutor(4)
        urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.tmall.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.python.org",
    "https://www.apple.com"]
        objs = []
        for i in urls:
            obj = tpool.submit(get,i)#异步提交任务
            objs.append(obj)
        tpool.shutdown(wait = True)
        for obj in objs:#串行解析
            parser(obj.result())
    

    ②爬取和解析放在一个函数内,实现了并发解析,相当于给线程加了一个任务

    爬取和解析耦合性强

    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import random
    
    def get(url):
        print("%s 启动"%current_thread().name)
        time.sleep(random.randint(1,2))
        res = requests.get(url).content.decode("utf-8")
        print("%s 结束" % current_thread().name)
        parser(res)
        # print(response.text)#返回文本
        # print(response.content.decode("utf-8"))#可以显示中文
    def parser(res):
        print("%s 解析结果为 %s"%(current_thread().name,len(res)))
    if __name__ == '__main__':
        tpool = ThreadPoolExecutor(4)
        urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.tmall.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.python.org",
    "https://www.apple.com"]
        # objs = []
        for i in urls:
            obj = tpool.submit(get,i)
    

    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import random
    def get(url):
        print("%s 启动"%current_thread().name)
        time.sleep(random.randint(1,2))
        res = requests.get(url).content.decode("utf-8")
        print("%s 结束" % current_thread().name)
        return res
        # print(response.text)#返回文本
        # print(response.content.decode("utf-8"))#可以显示中文
        
        
    def parser(obj):#这里只能放对象
        res = obj.result()#对象拿到返回值后,才过来调用,所以不会有阻塞
        print("%s 解析结果为 %s"%(current_thread().name,len(res)))
        
        
    if __name__ == '__main__':
        tpool = ThreadPoolExecutor(4)
        urls = ["https://www.baidu.com",
    "https://www.sina.com",
    "https://www.tmall.com",
    "https://www.taobao.com",
    "https://www.jd.com",
    "https://www.python.org",
    "https://www.apple.com"]
        for i in urls:
            obj = tpool.submit(get,i)
            obj.add_done_callback(parser)#将任务绑定方法,任务执行完毕后(拿到返回值),自动调用该方法(将拿到的返回值给方法)
     
     parser不会有阻塞,什么时候执行,其实是将对象传送给parser方法,对象执行完毕,拿到返回值,才会执行parser方法,故不会有阻塞
         
    

    主进程交给子进程一个任务,子进程在执行完后,发一个信号给主进程,主进程调用它自己的函数

    通常异步任务都会和回调函数一起使用

    使用add_done_callback()给future对象绑定一个回调函数

    注意在多进程中,回调函数是交给主进程执行,而多线程中,回调函数是谁有空交给谁执行,但一定不是主线程执行

    线程队列:与进程队列的区别,进程队列可以被多进程共享,而线程中的队列就是一个普通的容器不能进程共享

    进程队列是申请一片共享的内存空间

    #1.先进先出
    import queue
    q = queue.Queue()
    #2.后进先出
    q = queue.LiFoQueue()
    #3.优先级队列
    q = queue.PriorityQueue()
    参数元组(优先级,数值)
    优先级数值越小,优先级越高
    

    事件

    #event
    #用于协调多个线程间的工作
    #例如一个线程要执行某个操作,需要获取另一个线程的状态
    #多线程之间传送消息
    from threading import Event,Thread,current_thread
    import time
    e = Event()
    def check():
        print("%s正在检测服务器"%current_thread().name)
        time.sleep(3)
        e.set()
    
    def connect():
        print("%s 正在连接"%current_thread().name)
        e.wait()
        print("%s 连接成功"%current_thread().name)
    
    if __name__ == '__main__':
        t1 =Thread(target=check)
        t2 = Thread(target=check)
        c1 = Thread(target=connect)
        c2 = Thread(target=connect)
        t1.start()
        t2.start()
        c1.start()
        c2.start()
        
        
    def check():
        print("%s正在检测服务器"%current_thread().name)
        time.sleep(2)
        e.set()
    def connect():
        for i in range(3):
            if e.wait(1):
                print("%s 连接成功"%current_thread().name)
                break
        else:
            print("%s 连接失败"%current_thread().name)
    
    if __name__ == '__main__':
        t1 =Thread(target=check)
        c1 = Thread(target=connect)
        t1.start()
        c1.start()
     
    
    

    协程

    #单线程下实现并发
    #并发指的多个任务看起来同时运行的
    #并发实现的本质:切换加保存状态
    
    优点:
    协程的切换开销小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    单线程内就可以实现并发的效果,更大限度的利用cpu
    缺点:
    协程的本质是单线程下,无法利用多核,可以是一个程序开启多进程,每个进程内开启多个线程,每个线程内开启协程
    协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    

    greenlet模块

    不能实现遇到IO切,底层封装了yield

    greenlet底层封装了yield
    from greenlet import greenlet
    def eat(name):
        print("%s eat 1"%name)
        #import time
        #time.sleep(3)#greenlet 遇到IO并不会切换
        g2.switch("alex")
        print("%s eat 2"%name)
        g2.switch()
        
    def play(name):
        print("%s play 1"%name)
    	g1.switch()
        print("%s play 2"%name)
    
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch("egon")#第一次切的时候需要传参数
    #switch 转换
    

    gevent模块

    from gevent import monkey;monkey.patch_all()#打补丁,不打的话,无法识别IO操作
    import gevent
    from threading import current_thread
    def eat():
        print("%s eat 1"%current_thread().name)
        time.sleep(3)
        print("%s eat 2"%current_thread().name)
        
     def play():
        print("%s play 1"%current_thread().name)
        time.sleep(3)
        print("%s play 2"%current_thread().name)
     
    
     print(current_thread().name)
     g1 = gevent.spawn(eat)#提交任务
     g2 = gevent.spawn(play)
     gevent.joinall([g1,g2])#异步提交,没有这步的话,发起提交任务后,可能就退出程序了,连运行都没运行
    """执行结果:
    MainThread
    DummyThread-1 eat 1(dummy假的,假的线程1,2)
    DummyThread-2 play 1
    DummyThread-1 eat 2
    DummyThread-2 play 2
    """
    
    
    
    
  • 相关阅读:
    HDU 1863 畅通工程(Kruskal)
    HDU 1879 继续畅通工程(Kruskra)
    HDU 1102 Constructing Roads(Kruskal)
    POJ 3150 Cellular Automaton(矩阵快速幂)
    POJ 3070 Fibonacci(矩阵快速幂)
    ZOJ 1648 Circuit Board(计算几何)
    ZOJ 3498 Javabeans
    ZOJ 3490 String Successor(模拟)
    Java实现 LeetCode 749 隔离病毒(DFS嵌套)
    Java实现 LeetCode 749 隔离病毒(DFS嵌套)
  • 原文地址:https://www.cnblogs.com/robert-zhou/p/10222536.html
Copyright © 2011-2022 走看看