zoukankan      html  css  js  c++  java
  • 协程

    协程

    昨日内容回顾

    GIL

    1.GIL
    全局解释器锁     只在Cpython解释器中
    由于Cpython内存管理不是线程安全的!
    	
    2.内存管理中的垃圾回收机制分为三种:
        2.1.引用计数
        2.2.标记清除
        2.3.分代回收
    	 
    3.线程是直接能够被cpu执行吗?
    线程就是我们写的代码,只是我们能识别,也就是高级语言中的解释型,计算机识别不了,所以线程并不是直接被cpu直接执行,而是通过解释器, 所以必须先抢解释器才能被cpu执行
    

    解释器是被GIL锁住,同一个进程下的多个线程在同一时刻只能有一个线程被执行,也就是说同一个进程下的多个线程要想运行就必须先抢到解释器,抢到解释器的线程中如果存在互斥锁阻塞(acquire,release)也会先停下来等,等的时间是cpu自己已经存在好的,如果线程中存在(input,accept等其他io阻塞,是不会等,直接切换到其他线程的运行。

    GIL锁住解释器,不管是单核还是多核下都不能实现并行,但是可以实现并发。微观下是串行,宏观下是并发。线程在双核下也可以实现并行。

    死锁与递归锁

    死锁与递归锁
    即便你记住了没acquire一次就release一次的操作,也会产生死锁现象
    递归锁:可以连续的acquire(),每acquire()一次计数加一
    

    线程q

    线程q
    消息队列
    普通的q  队列  先进先出
    LIFO    堆栈   先进后出
    优先级q		q.put((数字,数据)) 数字越小优先级越高
    

    socket服务端实现并发

    (无论是开线程还是开进程其实都消耗资源,开线程消耗的资源比开进程的小)

    服务端
    import socket
    from threading import Thread
    """
    服务端:
        1.固定的ip和port
        2.24小时不间断提供服务
        3.支持高并发
    """
    server = socket.socket()
    server.bind(('127.0.0.1',8080))
    server.listen(5)  # 半连接池
    
    
    def communicate(conn):
        while True:
            try:
                data = conn.recv(1024)  # 阻塞
                if len(data) == 0:break
                print(data)
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    while True:
        conn,addr = server.accept()  # 阻塞
        print(addr)
        t = Thread(target=communicate, args=(conn,))
        t.start()
    
    客户端
    import socket
    
    
    client = socket.socket()
    client.connect(('127.0.0.1',8080))
    
    while True:
        info = input('>>>:').encode('utf-8')
        if len(info) == 0:continue
        client.send(info)
        data = client.recv(1024)
        print(data)
    

    进程池线程池

    池:
    为了减缓计算机硬件的压力,避免计算机硬件设备崩溃
    		
    虽然减轻了计算机硬件的压力,但是一定程度上降低了持续的效率
    	
    进程池线程池:
    线程不可能无限制的开下去,总要消耗和占用资源
    为了限制开设的进程数和线程数,从而保证计算机硬件的安全
    		
    '''
    - concurrent.futures模块导入
    - 线程池创建(线程数=cpu核数*5左右)
    - submit提交任务(提交任务的两种方式)
    - 异步提交的submit返回值对象
    - shutdown关闭池并等待所有任务运行结束
    - 对象获取任务返回值
    - 进程池的使用,验证进程池在创建的时候里面固定有指定的进程数
    - 异步提交回调函数的使用
    '''
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time
    pool = ThreadPoolExecutor(55)
    
    def task(n):
        print(n)
        time.sleep(2)
        return n**2
    for i in range(20):
        future = pool.submit(task, 1) #异步  submit()是函数直接就运行,返回一个对象
        print(future.result()) #异步提交任务的结果,这也是阻塞,在原定等着你给我结果,
        #以上两句说明,本来submit是异步,提交完任务之后就走人,不需要等待拿到返回值,而
        # future.result()是要取结果,阻塞,把submit硬生生的由异步变成同步
    print('主')
    
    
    # 以上代码在异步中得到结果的方式非常不合理,请看以下代码
    
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    pool = ThreadPoolExecutor(5)
    def task(n):
        print(n)
        time.sleep(2)
        return n**2
    t_list = []
    for i in range(20):
        future = pool.submit(task, i)
        t_list.append(future)
    for p in t_list:
        print('>>', p.result())
    print('主')
    
    
    
    #等待所有的线程执行完毕之后,才获取值(join方法,event方法,shutdown)
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    pool = ThreadPoolExecutor(5)
    
    def task(n):
        print(n)
        time.sleep(2)
        return n**2
    
    t_list = []
    for i in range(20):
        future = pool.submit(task, i)
        t_list.append(future)
    
    pool.shutdown()# 关闭池子并且等待池子中所有的任务运行完毕
    for p in t_list:
        print('>>', p.result())
    print('主')
    
    # 以上方法太麻烦
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    import time
    import os
    
    
    
    
    # 最终答案
    # 不知道参数的情况,默认是当前计算机cpu个数乘以5,也可以指定线程个数,但是参数也不能超过cpu*5的个数,因为
    pool = ProcessPoolExecutor(5)  # 创建了一个池子,池子里面有20个线程
    
    
    def task(n):
        print(n, os.getpid())
        time.sleep(2)
        return n**2
    
    
    def call_back(n):
        print('我拿到了结果:%s' % n.result())
    """
    提交任务的方式
        同步:提交任务之后,原地等待任务的返回结果,再继续执行下一步代码
        异步:提交任务之后,不等待任务的返回结果(通过回调函数拿到返回结果并处理),直接执行下一步操作
    """
    
    
    # 回调函数:异步提交之后一旦任务有返回结果,自动交给另外一个去执行
    if __name__ == '__main__':
        # pool.submit(task,1)
        t_list = []
        for i in range(20):
            future = pool.submit(task, i).add_done_callback(call_back)  # 异步提交任务
            t_list.append(future)
    
        # pool.shutdown()  # 关闭池子并且等待池子中所有的任务运行完毕
        # for p in t_list:
        #     print('>>>:',p.result())
        print('主')
    
    

    协程理论

    - 进程:资源单位
    - 线程:执行单位
    - 协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省IO),这里注意区分操作系统的切换+保存状态是针对多个线程而言,而我们现在是想在单个线程下自己手动实现操作系统的切换+保存状态的功能
    
    注意协程这个概念完全是程序员自己想出来的东西,它对于操作系统来说根本不存在。操作系统只知道进程和线程。并且需要注意的是并不是单个线程下实现切换+保存状态就能提升效率,因为你可能是没有遇到io也切,那反而会降低效率
    
    再回过头来想上面的socket服务端实现并发的例子,单个线程服务端在建立连接的时候无法去干通信的活,在干通信的时候也无法去干连接的活。这两者肯定都会有IO,如果能够实现通信io了我就去干建连接,建连接io了我就去干通信,那其实我们就可以实现单线程下实现并发
    
    将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>> 实现高并发!!!
    '''
    协程:
    		进程:资源单位(车间)
    		线程:最小执行单位(流水线)
    		协程:单线程下实现并发
    		
    	并发:看上去像同时执行就可以称之为并发
    	
    	多道技术:
    		空间上的复用
    		时间上的复用
    	核心:切换+保存状态
    	
    	协程:完全是我们高技术的人自己编出来的名词
    		通过代码层面自己监测io自己实现切换,让操作系统误认为
    		你这个线程没有io
    		
    	切换+保存状态就一定能够提升你程序的效率吗?
    		不一定
    		当你的任务是计算密集型,反而会降低效率
    		如果你的任务是IO密集型,会提升效率
    	
    	yield
    	
    	
    	协程:单线程下实现并发
    		如果你能够自己通过代码层面监测你自己的io行为
    		并且通过代码实现切换+保存状态
    		
    		单线程实现高并发
    	开多进程
    	多个进程下面再开多线程
    	多个线程下开协程
    	实现高并发
    	'''
    
    

    三者都是实现并发的手段

    # yield能够实现保存上次运行状态,但是无法识别遇到io才切换
    
    #串行执行
    import time
    
    def func1():
        for i in range(10000000):
            i+1
    
    def func2():
        for i in range(10000000):
            i+1
    
    start = time.time()
    func1()
    func2()
    stop = time.time()
    print(stop - start)
    
    
    #基于yield并发执行
    import time
    def func1():
        while True:
            10000000+1
            yield
    
    def func2():
        g=func1()
        for i in range(10000000):
            # time.sleep(100)  # 模拟IO,yield并不会捕捉到并自动切换
            i+1
            next(g)
    
    start=time.time()
    func2()
    stop=time.time()
    print(stop-start)
    

    yield并不能帮我们自动捕获到io行为才切换,那什么模块可以呢?

    3.gevent模块

    一个spawn就是一个帮你管理任务的对象

    gevent模块不能识别它本身以外的所有的IO行为,但是它内部封装了一个模块,能够帮助我们识别所有的IO行为

    from gevent import monkey;monkey.patch_all()  # 检测所有的IO行为
    from gevent import spawn,joinall  # joinall列表里面放多个对象,实现join效果
    import time
    
    def play(name):
        print('%s play 1' %name)
        time.sleep(5)
        print('%s play 2' %name)
    
    def eat(name):
        print('%s eat 1' %name)
        time.sleep(3)
        print('%s eat 2' %name)
    
    
    start=time.time()
    g1=spawn(play,'刘清正')
    g2=spawn(eat,'刘清正')
    
    # g1.join()
    # g2.join()
    joinall([g1,g2])
    print('主',time.time()-start)  # 单线程下实现并发,提升效率
    

    4.协程实现服务端客户端通信

    链接和通信都是io密集型操作,我们只需要在这两者之间来回切换其实就能实现并发的效果

    服务端监测链接和通信任务,客户端起多线程同时链接服务端

    # 服务端
    from gevent import monkey;monkey.patch_all()
    from socket import *
    from gevent import spawn
    
    
    def communicate(conn):
        while True:
            try:
                data = conn.recv(1024)
                if len(data) == 0: break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
        
    
    def server(ip, port, backlog=5):
        server = socket(AF_INET, SOCK_STREAM)
        server.bind((ip, port))
        server.listen(backlog)
    
        while True:  # 链接循环
            conn, client_addr = server.accept()
            print(client_addr)
    
            # 通信
            spawn(comunicate,conn)
    
    
    if __name__ == '__main__':
        g1=spawn(server,'127.0.0.1',8080)
        g1.join()
    
        
    # 客户端
    from threading import Thread, current_thread
    from socket import *
    
    
    def client():
        client = socket(AF_INET, SOCK_STREAM)
        client.connect(('127.0.0.1', 8080))
    
        n = 0
        while True:
            msg = '%s say hello %s' % (current_thread().name, n)
            n += 1
            client.send(msg.encode('utf-8'))
            data = client.recv(1024)
            print(data.decode('utf-8'))
    
    
    if __name__ == '__main__':
        for i in range(500):
            t = Thread(target=client)
            t.start()
    # 原本服务端需要开启500个线程才能跟500个客户端通信,现在只需要一个线程就可以扛住500客户端
    # 进程下面开多个线程,线程下面再开多个协程,最大化提升软件运行效率
    

    IO模型

    • 阻塞IO

    • 非阻塞IO(服务端通信针对accept用s.setblocking(False)加异常捕获,cpu占用率过高)

    • IO多路复用

      在只检测一个套接字的情况下,他的效率连阻塞IO都比不上。因为select这个中间人增加了环节。

      但是在检测多个套接字的情况下,就能省去wait for data过程

    • 异步IO

  • 相关阅读:
    Docker系列二:Docker的基本结构
    Codeforces 1013
    树形结构升级
    Codeforces 402 and 403 (Div. 2 and 1)
    Codeforces 342
    图论基础
    树形结构基础
    Codeforces 788 (Div. 1)
    NOIP/CSP-J/S初赛集锦
    树的重心
  • 原文地址:https://www.cnblogs.com/huangxuanya/p/10840897.html
Copyright © 2011-2022 走看看