zoukankan      html  css  js  c++  java
  • python------异步,同步,协程

    1.阻塞,非阻塞,异步,同步

    程序运行中表现的状态: 阻塞, 运行,就绪

    阻塞: 程序遇到IO阻塞. 程序遇到IO立马会停止(挂起), cpu马上切换,等到IO结束之后,在执行.

    非阻塞: 程序没有IO或者 遇到IO通过某种手段让cpu去执行其他的任务,尽可能的占用cpu.

    异步,同步:

    站在任务发布的角度.

    同步: 任务发出去之后,等待,直到这个任务最终结束之后,给我一个返回值,我在发布下一个任务.

    异步: 所有的任务同时发出, 我就继续下一行. 结果???

    2.异步+ 调用机制

    爬虫:

    浏览器做的事情很简单:

    ​ 浏览器 封装头部 发一个请求 ---> www.taobao.com(127.42.34.56) ---> 服务器获取到请求信息,分析正确 ----> 给你返回一个文件.---> 浏览器将这个文件的代码渲染,就成了你看的样子:

    文件:

    1564140657872

    爬虫: 利用requests模块功能模拟浏览器封装头,给服务器发送一个请求,骗过服务器之后,服务器也给你返回一个文件. 爬虫拿到文件,进行数据清洗获取到你想要的信息.

    爬虫: 分两步,

    ​ 第一步: 爬取服务端的文件(IO阻塞).

    ​ 第二步: 拿到文件,进行数据分析,(非IO,IO极少)

    1564140711863

    import requests
    from concurrent.futures import ProcessPoolExecutor
    from multiprocessing import Process
    import time
    import random
    import os
    def get(url):
        response = requests.get(url)
        print(f'{os.getpid()} 正在爬取:{url}')
        # time.sleep(random.randint(1,3))
        if response.status_code == 200:
            return response.text
    def parse(obj):
        '''
         对爬取回来的字符串的分析
         简单用len模拟一下.
         :param text:
         :return:
        '''
        time.sleep(1)
        print(f'{os.getpid()} 分析结果:{len(obj.result())}')
    if __name__ == '__main__':
        url_list = [
            'http://www.taobao.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.JD.com',
            'http://www.baidu.com',
            'https://www.cnblogs.com/jin-xin/articles/11232151.html',
            'https://www.cnblogs.com/jin-xin/articles/10078845.html',
            'http://www.sina.com.cn',
            'https://www.sohu.com',
            'https://www.youku.com',
        ]
        start_time = time.time()
        pool = ProcessPoolExecutor(4)
        for url in url_list:
            obj = pool.submit(get, url)
            obj.add_done_callback(parse)  # 增加一个回调函数
            # 现在的进程完成的还是网络爬取的任务,拿到了返回值之后,结果丢给回调函数add_done_callback,
            # 回调函数帮助你分析结果
            # 进程继续完成下一个任务.
        pool.shutdown(wait=True)
        print(f'主: {time.time() - start_time}')
    # 回调函数是主进程帮助你实现的, 回调函数帮你进行分析任务. 明确了进程的任务: 只有一个网络爬取.
    # 分析任务: 回调函数执行了.对函数之间解耦.
    # 极值情况: 如果回调函数是IO任务,那么由于你的回调函数是主进程做的,所以有可能影响效率.
    # 回调不是万能的,如果回调的任务是IO,
    # 那么异步 + 回调机制 不好.此时如果你要效率只能牺牲开销,再开一个线程进程池.
    # 异步就是回调! 这个是错的!! 异步,回调是两个概念.
    # 如果多个任务,多进程多线程处理的IO任务.
    # 1. 剩下的任务 非IO阻塞.  异步 + 回调机制
    # 2. 剩下的任务 IO << 多个任务的IO  异步 + 回调机制
    # 3. 剩下的任务 IO >= 多个任务的IO  第二种解决方式,或者两个进程线程池.
    

    3.线程Queue.py

    import queue
    q = queue.Queue(3)
    q.put(1)
    q.put(2)
    q.put('太白')
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    # LIFO 栈.
    import queue
    q = queue.LifoQueue()
    q.put(1)
    q.put(3)
    q.put('barry')
    print(q.get())
    print(q.get())
    print(q.get())
    
    # 优先级队列
    # 需要元组的形式,(int,数据) int 代表优先级,数字越低,优先级越高.
    import queue
    q = queue.PriorityQueue(3)
    q.put((10, '垃圾消息'))
    q.put((-9, '紧急消息'))
    q.put((3, '一般消息'))
    print(q.get())
    print(q.get())
    print(q.get())
    

    4.事件Event(线程进程)

    并发的执行某个任务 .多线程多进程,几乎同时执行.

    一个线程执行到中间时通知另一个线程开始执行.

    import time
    from threading import Thread
    from threading import current_thread
    from threading import Event
    event = Event()  # 默认是False
    def task():
        print(f'{current_thread().name} 检测服务器是否正常开启....')
        time.sleep(3)
        event.set()  # 改成了True
    def task1():
        print(f'{current_thread().name} 正在尝试连接服务器')
        # event.wait()  # 轮询检测event是否为True,当其为True,继续下一行代码. 阻塞.
        event.wait(1)
        # 设置超时时间,如果1s中以内,event改成True,代码继续执行.
        # 设置超时时间,如果超过1s中,event没做改变,代码继续执行.
        print(f'{current_thread().name} 连接成功')
    if __name__ == '__main__':
        t1 = Thread(target=task1,)
        t2 = Thread(target=task1,)
        t3 = Thread(target=task1,)
        t = Thread(target=task)
        t.start()
        t1.start()
        t2.start()
        t3.start()
    

    5.协程的初识

    一个线程实现并发.

    并发,并行,串行:

    串行: 多个任务执行时,第一个任务从开始执行,遇到了IO等待,等待IO阻塞结束之后,继续执行下一个任务.

    并行: 多核,多个线程或者进程同时执行. 4个cpu,同时执行4个任务.

    并发: 多个任务看起来是同时执行, cpu在多个任务之间来回切换(遇到IO阻塞,计算密集型执行时间过长).

    并发的本质:

    1. 遇到IO阻塞,计算密集型执行时间过长 切换.
    2. 保持原来的状态.

    一个线程实现并发.

    多进程: 操作系统控制 多个进程的多个任务切换 + 保持状态.

    多线程: 操作系统控制 多个线程的多个任务切换 + 保持状态.

    协程: 程序控制 一个线程的多个任务的切换以及保持状态.

    协程: 微并发, 处理任务不宜过多.

    协程它会调度cpu,如果协程管控的任务中,遇到阻塞,它会快速的(比操作系统快)切换到另一个任务,并且能将上一个任务挂起(保持状态,),让操作系统以为cpu一直在工作.

    之前我们学过协程?yield 就是一个协程,

    import time
    def func1():
        for i in range(11):
            yield
            print('这是我第%s次打印啦' % i)
            time.sleep(1)
    def func2():
        g = func1()
        #next(g)
        for k in range(10):
            print('哈哈,我第%s次打印了' % k)
            time.sleep(1)
            next(g)
    #不写yield,下面两个任务是执行完func1里面所有的程序才会执行func2里面的程序,
    # 有了yield,我们实现了两个任务的切换+保存状态
    # func1()
    func2()
    

    yield 虽然可以实现两个任务来回切换,并且能够保存原来的状态,而且还是一个线程,但是 他只能遇到yield切换,遇到Io还是阻塞.

    计算密集型:串行与协程的效率对比

    # 计算密集型:串行与协程的效率对比
    # import time
    # def task1():
    #     res = 1
    #     for i in range(1,100000):
    #         res += i
    # def task2():
    #     res = 1
    #     for i in range(1,100000):
    #         res -= i
    # start_time = time.time()
    # task1()
    # task2()
    # print(f'串行消耗时间:{time.time()-start_time}')  # 串行消耗时间:0.012000560760498047
    import time
    def task1():
        res = 1
        for i in range(1, 100000):
            res += i
            yield res
    def task2():
        g = task1()
        res = 1
        for i in range(1, 100000):
            res -= i
            next(g)
    start_time = time.time()
    task2() 
    print(f'协程消耗时间:{time.time() - start_time}')  # 协程消耗时间:0.0260012149810791
    

    协程的优点 :

    #1. 协程的切换开销更小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级
    #2. 单线程内就可以实现并发的效果,最大限度地利用cpu
    #3. 修改共享数据不需加锁
    

    协程的缺点:

    #1. 协程的本质是单线程下,无法利用多核,可以是一个程序开启多个进程,每个进程内开启多个线程,每个线程内开启协程
    #2. 协程指的是单个线程,因而一旦协程出现阻塞,将会阻塞整个线程
    

    多线程并发: 一个进程如果要是开4个线程,最多可以处理30个任务.

    多协程并发: 一个进程开启4个线程,然后我将4个线程设置4个协程,每个协程可以执行30个任务.120个任务.(了解)

    6.协程----Greenlet

     
      #真正的协程模块就是使用greenlet完成的切换
    from greenlet import greenlet
    def eat(name):
        print('%s eat 1' %name)  #2
        g2.switch('taibai')   #3
        print('%s eat 2' %name) #6
        g2.switch() #7
    def play(name):
        print('%s play 1' %name) #4
        g1.switch()      #5
        print('%s play 2' %name) #8
    g1=greenlet(eat)
    g2=greenlet(play)
    g1.switch('taibai')#可以在第一次switch时传入参数,以后都不需要  1
    复制代码
    

    用法:

    g1=gevent.spawn(func,1,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的,spawn是异步提交任务
    g2=gevent.spawn(func2)
    g1.join() #等待g1结束
    g2.join() #等待g2结束  有人测试的时候会发现,不写第二个join也能执行g2,是的,协程帮你切换执行了,但是你会发现,如果g2里面的任务执行的时间长,但是不写join的话,就不会执行完等到g2剩下的任务了
    #或者上述两步合作一步:gevent.joinall([g1,g2])
    g1.value#拿到func1的返回值
    复制代码
    

    2.遇到IO阻塞时会自动切换任务

    import gevent
    def eat(name):
        print('%s eat 1' %name)
        gevent.sleep(2)
        print('%s eat 2' %name)
    def play(name):
        print('%s play 1' %name)
        gevent.sleep(1)
        print('%s play 2' %name)
    g1=gevent.spawn(eat,'egon')
    g2=gevent.spawn(play,name='egon')
    g1.join()
    g2.join()
    #或者gevent.joinall([g1,g2])
    print('主')
    遇到I/O切换
    

    3.上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞,而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了.from gevent import monkey;monkey.patch_all()必须放到被打补丁者的前面,如time,socket模块之前.或者我们干脆记忆成:要用gevent,需要将from gevent import monkey;monkey.patch_all()放到文件的开头

    from gevent import monkey
    monkey.patch_all() #必须写在最上面,这句话后面的所有阻塞全部能够识别了
    import gevent  #直接导入即可
    import time
    def eat():
        #print()  
        print('eat food 1')
        time.sleep(2)  #加上mokey就能够识别到time模块的sleep了
        print('eat food 2')
    def play():
        print('play 1')
        time.sleep(1)  #来回切换,直到一个I/O的时间结束,这里都是我们个gevent做得,不再是控制不了的操作系统了。
        print('play 2')
    g1=gevent.spawn(eat)
    g2=gevent.spawn(play_phone)
    gevent.joinall([g1,g2])
    print('主')
    
  • 相关阅读:
    PC software
    pdf.js查看器 在线预览 office文件、图片、文本文件
    核算两个时间之间时长
    Flink sql 之 join 与 StreamPhysicalJoinRule (源码解析)
    Flink sql 之 TopN 与 StreamPhysicalRankRule (源码解析)
    Flink Sql 之 Calcite Volcano优化器(源码解析)
    Keepalive高可用部署
    Keepalive配置文件
    Erlang环境安装
    ZooKeeper集群
  • 原文地址:https://www.cnblogs.com/hql1117/p/11252578.html
Copyright © 2011-2022 走看看