zoukankan      html  css  js  c++  java
  • 协程,事件,队列,同步,异步,回调函数

    协程

    什么是协成?单个线程并发的处理多个任务,程序控制协成的切换+保持状态,协成的切换速度非常快,蒙蔽了操作系统的眼睛,让操作系统认为CPU一直在运行
    进程或线程都是由操作系统控制CPU来回切换,遇到阻塞就切换执行其他任务,协成是程序控制的,霸占CPU执行任务,会在操作系统控制CPU之前来回切换,操作系统就认为CPU一直在运作
    
    协程的优点:
        1.开销小
        2.运行速度快
        3.协程会长期霸占CPU只执行我程序里的所有任务
    协程的缺点:
        1.协程属于微并发,处理任务不易过多
        2.协程的本质是单线程,无法利用多核,可以一个程序开启多个进程,每个进程开启多个线程.每个线程开启协程
    协程处理io密集型比较好
    
    协程的特点:
        1.必须在只有一个单线程里实现并发
        2.修改共享数据不需要加锁
        3.保持状态
        4.一个协程遇到io操作就自动切换到其他协程 
        
    工作中:
    ​	一般在工作中我们都是进程+线程+协程的方式来实现并发,以达到最好的并发效果,如果是4核的cpu,一般起5个进程,每个进程中20个线程(5倍cpu数量),每个线程可以起500个协程,大规模爬取页面的时候,等待网络延迟的时间的时候,我们就可以用协程去实现并发。 并发数量 = 5 * 20 * 500 = 50000个并发,这是一般一个4cpu的机器最大的并发数。nginx在负载均衡的时候最大承载量就是5w个
    
    #当一个任务没有阻塞的时候
    import time
    def task():
        res = 1
        for i in range(1,1000000):
            res += 1
    def task1():
        res = 1
        for i in range(1,1000000):
            res -= 1
    strt = time.time()
    task()
    task1()
    end = time.time()
    print(f"串行执行效率:{end - strt}") #串行执行效率:0.07783079147338867
    
    
    #第一次接触协程是yield
    #没有io阻塞
    import time
    def task():
        res = 1
        for i in range(1,1000000):
            res += 1
            yield res
    def task1():
        g = task()
        res = 1
        for i in range(1,1000000):
            res -= 1
            next(g)
    strt = time.time()
    task()
    task1()
    end = time.time()
    print(f"协成执行效率:{end - strt}")#协成执行效率:0.21143341064453125
    纯计算密集型,没有io情况下,串行的执行效率高于协程
    
    
    # 并不是真正的阻塞
    import gevent
    def eat(name):
        print(f"{name} eat 1")   #1
        gevent.sleep(2)    #模拟的是gevent可以识别的阻塞
        print(f"{name} eat 2")   #4
    def play(name):
        print(f"{name} play 1")   #2
        gevent.sleep(1)
        print(f"{name} play 2")   #3
    g1 = gevent.spawn(eat,'八戒')
    g2 = gevent.spawn(play,name = '悟空')
    g1.join()
    g2.join()
    print("主")                    #5
    
    
    
    import threading
    from gevent import monkey
    import gevent
    import time
    monkey.patch_all()        # 打补丁:将下面的所有的任务的阻塞打上标记,遇到这个标记就切换
    
    def eat():
        print(f"线程1:{threading.current_thread().getName()}")    #1
        print('eat food 1')                                       #2
        time.sleep(2)                                             #3阻塞,去执行别的任务
        print('eat food 2')                                       #8
    
    def play():
        print(f"线程2:{threading.current_thread().getName()}")    #4
        print('play 1')                                           #5
        time.sleep(1)                                             #6阻塞,再去执行别的任务,上一个任务还在阻塞,继续向下执行
        print('play 2')                                           #7
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    gevent.joinall([g1,g2])
    print(f"{threading.current_thread().getName()}")              #9
    注意在各个任务中的阻塞时间
    

    Event事件

    添加全局变量,修改全局变量,实现一个线程在某一个节点让下一个线程继续工作
    import time
    from threading import Thread
    from threading import current_thread
    flag = False
    def task():
        print(f"{current_thread().name}检测服务器是否正常开启.....")
        time.sleep(2)
        global flag
        flag = True
    def task1():
        while 1:
            time.sleep(1)
            print(f"{current_thread().name}正在连接服务器....")
            if flag:
                print(f"{current_thread().name}连接成功")
                return
    
    if __name__ == '__main__':
        t1 = Thread(target=task1)
        t2 = Thread(target=task1)
        t3 = Thread(target=task1)
        t = Thread(target=task)
    
        t.start()
        t1.start()
        t2.start()
        t3.start()
        
        
    import time
    from threading import Thread
    from threading import current_thread
    from threading import Event
    import random
    event = Event()   #默认为False
    def task1():
        print(f"{current_thread().getName()} 检测服务器知否开启...") #获取线程名字
        time.sleep(random.randint(1,3))
    def task2():
        print(f"{current_thread().getName()}正在尝试连接服务器...")
        count = 1
        while count < 4:
            time.sleep(1)
            print(f"{current_thread().getName()}连接第{count}次")
            if count < 4:
                time.sleep(0.5)
                event.set() #将event状态修改为True
                print('连接成功')
            count += 1
    t1 = Thread(target=task1)
    t2 = Thread(target=task2)
    t1.start()
    t2.start()
    
    
    
    import time
    from threading import Thread
    from threading import current_thread
    from threading import Event
    import random
    event = Event()
    def check():
        print(f"{current_thread().name}检测服务器是否开启...")
        time.sleep(random.randint(1,3))
        event.set()
        print("连接成功")
    def connect():
        count = 1
        while not event.is_set(): #event.is_set()判断状态是True或False
            if count == 4:
                print('连接次数过多,已断开')
                break
            event.wait(1) #轮询检测event状态,如果为真,就向下执行,是个阻塞, # 只阻塞1秒,1秒之后如果还没有进行set 直接进行下一步操作.
            print(f"{current_thread().name}尝试连接{count}次")
            count += 1
        else:
            print(f"{current_thread().name}连接成功")
    t1 = Thread(target=check)
    t2 = Thread(target=connect)
    t1.start()
    t2.start()
    

    线程队列

    import queue
    # 先进先出原则
    q = queue.Queue(3)
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())
    # print(q.get(block=False)) #设置多取会报错
    q.get(timeout=2)#阻塞两秒,后报错
    # print(q.get())多去一个会阻塞
    
    
    
    # 后进先出
    q = queue.LifoQueue(4)
    q.put(1)
    q.put(2)
    q.put('alex')
    q.put('太白')
    
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())
    # 太白
    # alex
    # 2
    # 1
    
    优先级队列
    最小的先取出
    q = queue.PriorityQueue(4)
    q.put((1,'qq'))
    q.put((-2,'qq1'))
    q.put((0,'qq2'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    
    #(-2, 'qq1')
    #(0, 'qq2')
    #(1, 'qq')
    

    同步

    同步调用(提交完任务后,就在原地等待任务执行完后,拿到结果,再执行下一行代码)
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    def task(i):
        print(f"{os.getpid()}开始任务")
        time.sleep(random.randint(1,3))
        print(f"{os.getpid()}结束任务")
        return i
    if __name__ == '__main__':
        pool = ProcessPoolExecutor()
        for i in range(10):                #同时开启10个进程
            obj = pool.submit(task,i)      #类似于发布任务
            print(f"任务结果:{obj.result()}")  # 对象加result()就变成同步,获取的是运行状态.obj就为动态对象, (running,pending,finish)  动态对象.result()获取结果
                                           #obj.result() 必须等到这个任务完成后,返回了结果,在执行下一个任务
        pool.shutdown(wait=True)           #shutdown 让我的主进程等待进程池中所有子进程结束后,在执行,类似于join
                                           #在上一个进程池没有完成任务之前,不允许添加新的任务,一个任务通过一个函数实现,任务完成了他的返回值就是函数的返回值
        print("主")
      
    

    异步

    异步调用(提交完任务后,不再原地等待任务执行完)
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    def task(i):
        print(f"{os.getpid()}开始任务")
        time.sleep(random.randint(1,3))
        print(f"{os.getpid()}结束任务")
        return i
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor()
        for i in range(10):
            pool.submit(task,i)
        pool.shutdown(wait=True) #等待进程池中所有子进程结束后在向下执行,类似于join
        print("主")
        
    存在一个列表中,统一回收结果 ,把动态对象放在列表中,(容器)
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    def task(i):
        print(f"{os.getpid()}开始任务")
        time.sleep(random.randint(1,3))
        print(f"{os.getpid()}结束任务")
        return i
    
    if __name__ == '__main__':
        pool = ProcessPoolExecutor()
        l = []
        for i in range(10):
            obj = pool.submit(task,i)
            l.append(obj)
        pool.shutdown(wait=True)
        for i in l:
            print(i.result())
        print("主")
    虽然是同时发布了任务,但是在回收结果的时候,不能马上收到一个结束任务的返回值,只能等所有任务结束后统一收到结果 
    
    
    异步调用的如何取值?
    1.
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    import requests
    # 模拟的就是爬取多个源代码 一定有IO操作
    def task(url):
        ret = requests.get(url)
        if ret.status_code == 200:
            return ret.text
    def parse(content):
        return len(content)
    
    if __name__ == '__main__':
        ret = task('http://www.baidu.com')
        print(parse(ret))
    
        ret = task('http://www.JD.com')
        print(parse(ret))
    
        ret = task('http://www.taobao.com')
        print(parse(ret))
    
        ret = task('https://www.cnblogs.com/jin-xin/articles/7459977.html')
        print(parse(ret))
    #这样写为执行一个task函数,在执行一个parse函数,执行效率低,一个任务执行2秒,20个任务就是40秒,耗时
    #这两个函数为一个任务,当一个任务执行完成后再执行下一个任务,串行
    
    2.
    
    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import time
    import random
    import os
    import requests
    # 模拟的就是爬取多个源代码 一定有IO操作
    def task(url):
        ret = requests.get(url)
        if ret.status_code == 200:
            return ret.text
    def parse(content):
        return len(content)
    
    if __name__ == '__main__':
    #     开启一个线程池,并发执行
        url_list = ['http://www.baidu.com',
                    'http://www.JD.com',
                    'http://www.taobao.com',
                    'https://www.cnblogs.com/jin-xin/articles/7459977.html'
        ]
        pool = ThreadPoolExecutor(4)
        obj_list = []
    
        for url in url_list:
            obj = pool.submit(task,url)
            obj_list.append(obj)
        pool.shutdown(wait=True)
    #这个for循环把爬取的源码添加到一个类表中,对象,执行task函数
        for i in obj_list:
            print(parse(i.result()))
    循环取出类表中的元素,当做参数传入parse函数中,进行数据分析
    以上版本缺点:异步的发出10个任务,并发的执行任务,但是分析结果的流程是串行,没有做到结束一个任务就返回一个返回值,效率低
    
    3.
    
    def task(url):
        ret = requests.get(url)
        if ret.status_code == 200:
            return parse(ret.text)    #更改函数,直接返回并调用parse函数
    def parse(content):
        return len(content)
    
    if __name__ == '__main__':
    #     开启一个线程池,并发执行
        url_list = [
            'http://www.baidu.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.taobao.com',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'https://www.luffycity.com/',
            'https://www.cnblogs.com/jin-xin/articles/9811379.html',
            'https://www.cnblogs.com/jin-xin/articles/11245654.html',
            'https://www.sina.com.cn/'
        ]
        pool = ThreadPoolExecutor(4)
        obj_list = []
    
        for url in url_list:
            obj = pool.submit(task,url)
            obj_list.append(obj)
        pool.shutdown(wait=True)
    
        for i in obj_list:
            print(i.result())#直接返回结果,不用再调用函数,直接全部打印出来结果
    
    4.异步调用+回调函数**************************
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    import time
    import random
    import os
    import requests
    def task(url):
        '''模拟的就是爬取多个源代码 一定有IO操作'''
        ret = requests.get(url)
        if ret.status_code == 200:
            return ret.text
    def parse(obj):
        '''模拟对数据进行分析 一般没有IO'''
        print(len(obj.result()))
    if __name__ == '__main__':
        # 开启线程池,并发并行的执行
        url_list = [
            'http://www.baidu.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.taobao.com',
            'https://www.cnblogs.com/jin-xin/articles/7459977.html',
            'https://www.luffycity.com/',
            'https://www.cnblogs.com/jin-xin/articles/9811379.html',
            'https://www.cnblogs.com/jin-xin/articles/11245654.html',
            'https://www.sina.com.cn/'
        ]
        pool = ThreadPoolExecutor(4)
        for url in url_list:
            obj = pool.submit(task, url)
            obj.add_done_callback(parse)
            
            #add_done_callback回调函数,执行一个任务就返回一个函数的返回值,增加回调函数,发布完任务后直接执行本行,当同一进程执行完第一次任务后,主进程立即对其结果进行分析,不必等所有进程全部执行完再分析,提高了效率
    线程池设置4个线程,异步发起10个任务,每次执行4个任务,执行完任务的时间肯定有先后顺序,先执行完的,将parse分析代码的任务
    交给空闲线程去执行,然后这个线程再去执行其他任务,比爬取源码的任务就和分析数据的任务共同并发执行
    
    
    异步回收收取结果的两种方式?
    1.将所有任务的结果同意回收
    2.完成一个任务,返回一个结果
    
    异步+回调函数
    回调函数:按顺序接收每个任务的结果,进行下一步处理
    前提:
        异步处理的多个任务(io任务),回调函数处理的非io
    区别:
        多进程,由主进程执行回调函数的代码
        多线程,由空闲线程执行回调函数的代码
    
  • 相关阅读:
    51 nod 1046 A^B Mod C
    51nod 1027 大数乘法
    Subversion基础:概念、安装、配置和基本操作(转)
    IOS 网络请求中设置cookie
    iOS设备控制打印机输出文本
    Xcode6中如何添加pch文件
    iOS8 PUSH解决方法
    iOS8 Push Notifications
    xcode升级到6.0以后遇到的警告错误 原帖链接http://www.cocoachina.com/bbs/simple/?t112432.html
    xcode升级到6.0以后遇到的警告错误解决方法
  • 原文地址:https://www.cnblogs.com/tangjian219/p/11426743.html
Copyright © 2011-2022 走看看