zoukankan      html  css  js  c++  java
  • day33 线程池进程池 协程 IO模型

     

    线程池/进程池

    进程池与线程池
    
    开进程开线程都需要消耗资源,只不过两者比较的情况线程消耗的资源比较少
    
    在计算机能够承受范围之内最大限度的利用计算机
    
    什么是池?
    	在保证计算机硬件安全的情况下最大限度的利用计算机
    	池其实是降低了程序的运行效率 但是保证了计算机硬件的安全
    	(硬件的发展跟不上软件的速度)
    用于存储线程/进程的容器  
    管理了,线程的创建于销毁,以及任务的分配   
    
    ​	1.创建池子
    
    ​	2.submit 提交任务  
    
    ​	3.shutdown 可以用于等待所有任务完成后销毁池 
    

    线程池:

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time
    
    pool = ThreadPoolExecutor(5)
    # 括号内可以传参数指定线程池内的线程个数
    #也可以不传  不传默认是当前所在计算机的cpu个数乘5
    
    def task(n):
        print(n)
        time.sleep(2)
        return n**3
    
    
    
    """
    提交任务的方式
    同步:提交任务后,还在原地等待任务的返回结果,期间不做任何事
    异步:会交任务后,不等待任务的返回结果,直接去执行下一段代码
    异步的返回结果怎么拿到???
    通过异步回调机制:
        当异步提交的任务有返回结果后,会自动触发回调函数的执性
    """
    
    # pool.submit(task,1)
    # 向线程池中提交任务  提交方式为 异步提交
    # 测试提交方式   若为同步,则会先打印1,等待2秒再打印主
    # print('主')
    # 结果显示一块打印,说明提交方式为异步
    
    
    # for i in range(10):
    #     res = pool.submit(task,i)
    #     print(res.result())
    #原地等待任务的返回结果
    
    # 获取对象    join方法 将并发变成了串行
    t_list = []
    for i in range(10):
        res = pool.submit(task,i)
        t_list.append(res)
    
    pool.shutdown()
    # 关闭池子 等待池子中所有的任务执行完毕后,代码才会往后走
    for p in t_list:
        print('>>>>',p.result())
    # 保证了任务是并发的提交数据
    线程池

    进程池

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import os ,time
    
    # pool = ThreadPoolExecutor(5)
    pool = ProcessPoolExecutor()
    # 开进程需要放在__main__下
    # 括号内可以传参数指定线程池内的线程个数
    #也可以不传  不传默认是当前所在计算机的cpu个数乘5
    
    def task(n):
        print(n)
        time.sleep(2)
        return n**3
    
    
    """
    提交任务的方式
    同步:提交任务后,还在原地等待任务的返回结果,期间不做任何事
    异步:会交任务后,不等待任务的返回结果,直接去执行下一段代码
    异步的返回结果怎么拿到???
    通过异步回调机制:
        当异步提交的任务有返回结果后,会自动触发回调函数的执性
    """
    
    # pool.submit(task,1)
    # 向线程池中提交任务  提交方式为 异步提交
    # 测试提交方式   若为同步,则会先打印1,等待2秒再打印主
    # print('主')
    # 结果显示一块打印,说明提交方式为异步
    
    
    # for i in range(10):
    #     res = pool.submit(task,i)
    #     print(res.result())
    #原地等待任务的返回结果
    
    # 获取对象    join方法 将并发变成了串行
    if __name__ == '__main__':
    
        t_list = []
        for i in range(10):
            res = pool.submit(task,i)
            t_list.append(res)
    
        pool.shutdown()
        # 关闭池子 等待池子中所有的任务执行完毕后,代码才会往后走
        for p in t_list:
            print('>>>>',p.result())
        # 保证了任务是并发的提交数据
    进程池

    验证进程池开启后,不会销毁进程

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time
    import os
    
    # pool = ThreadPoolExecutor(5)  # 括号内可以传参数指定线程池内的线程个数
    # # 也可以不传  不传默认是当前所在计算机的cpu个数乘5
    pool = ProcessPoolExecutor()  # 默认是当前计算机cpu的个数
    """
    池子中创建的进程/线程创建一次就不会再创建了
    至始至终用的都是最初的那几个
    这样的话节省了反复开辟进程/线程的资源
    """
    
    def task(n):
        print(n,os.getpid())  # 查看当前进程号
        time.sleep(2)
        return n**2
    
    """
    提交任务的方式
        同步:提交任务之后 原地等待任务的返回结果 期间不做任何事
        异步:提交任务之后 不等待任务的返回结果(异步的结果怎么拿???) 直接执行下一行代码
    """
    
    # pool.submit(task,1)  # 朝线程池中提交任务   异步提交
    # print('主')
    
    if __name__ == '__main__':
    
        t_list = []
        for i in range(20):
            res = pool.submit(task,i).add_done_callback(call_back)  # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数
            # print(res.result())  # 原地等待任务的返回结果
            t_list.append(res)
    
        # pool.shutdown()  # 关闭池子 等待池子中所有的任务执行完毕之后 才会往下运行代码
        # for p in t_list:
        #     print('>>>:',p.result())
    View Code

    异步提交 和拿到结果

    拿到结果的方法:

    1.通过形参可以得到,不推荐使用

    2.异步回调机制:add_done_callback(

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import os ,time
    
    # pool = ThreadPoolExecutor(5)
    pool = ProcessPoolExecutor(5)
    #进程池不传,默认CPU的个数
    # 开进程需要放在__main__下
    # 括号内可以传参数指定线程池内的线程个数
    #也可以不传  不传默认是当前所在计算机的cpu个数乘5
    
    """
    池子中创建的进程/线程创建一次就不会再创建了
    至始至终用的都是最初的那几个
    这样的话节省了反复开辟进程/线程的资源
    """
    # os.getpid()  查看当前进程号
    def task(n):
        print(n,os.getpid())
        time.sleep(2)
        return n**3
    
    def call_back(n):
        print('拿到了异步提交任务的返回结果:',n.result())
    
    """
    提交任务的方式
    同步:提交任务后,还在原地等待任务的返回结果,期间不做任何事
    异步:会交任务后,不等待任务的返回结果,直接去执行下一段代码
    异步的返回结果怎么拿到???
    通过异步回调机制:
        当异步提交的任务有返回结果后,会自动触发回调函数的执性
    """
    
    # pool.submit(task,1)
    # 向线程池中提交任务  提交方式为 异步提交
    # 测试提交方式   若为同步,则会先打印1,等待2秒再打印主
    # print('主')
    # 结果显示一块打印,说明提交方式为异步
    
    
    # for i in range(10):
    #     res = pool.submit(task,i)
    #     print(res.result())
    #原地等待任务的返回结果
    
    # 获取对象    join方法 将并发变成了串行
    if __name__ == '__main__':
    
        t_list = []
        for i in range(10):
            # 给每个对象添加异步回调机制
            res = pool.submit(task,i).add_done_callback(call_back)
            # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数
            t_list.append(res)
    
        # pool.shutdown()
        # # 关闭池子 等待池子中所有的任务执行完毕后,代码才会往后走
        # for p in t_list:
        #     print('>>>>',p.result())
        # 保证了任务是并发的提交数据
    View Code

    协程

    协程
       
       进程:资源单位
       线程:执行单位
       协程:单线程下实现并发
       
       并发
          切换+保存状态
          ps:看起来像同时执行的 就可以称之为并发
       
       协程:完全是程序员自己意淫出来的名词
          单线程下实现并发
       
       并发的条件?
          多道技术
             空间上的复用
             时间上的复用
                切换+保存状态
       
       程序员自己通过代码自己检测程序中的IO
       一旦遇到IO自己通过代码切换
       给操作系统的感觉是你这个线程没有任何的IO
       ps:欺骗操作系统 让它误认为你这个程序一直没有IO
          从而保证程序在运行态和就绪态来回切换
          提升代码的运行效率
         
       yield 保存上一次的结果
          
          切换+保存状态就一定能够提升效率吗???
             当你的任务是iO密集型的情况下  提升效率
             如果你的任务是计算密集型的   降低效率
            
     解决高并发:
        多进程 + 线程(协程)
    

      

    串行执行

    import time
    
    def func1():
        for i in range(1200000):
            i+1
    
    def fun2():
        for i in range(100000):
            i+1
    
    start = time.time()
    func1()
    fun2()
    stop = time.time()
    print(stop - start)
    View Code

    基于yield并发执行

    只能实现保存状态,不能实现IO

    import time
    def func1():
        while True:
            yield
    
    def func2():
        g = func1()
        for i in range(10000):
            i +1
            next(g)
    start = time.time()
    func2()
    stop = time.time()
    print(stop - start)
    View Code

    需要找到一个能够识别IO的一个工具

    gevent模块

    from gevent import monkey;monkey.patch_all()
    from gevent import spawn
    import time
    """
    注意gevent模块没办法自动识别time.sleep等io情况
    需要你手动再配置一个参数
    """
    def heng():
        print('')
        time.sleep(2)
        print('')
    
    def ha():
        print('')
        time.sleep(2)
        print('ha')
    
    def heihei():
        print('嗨嗨')
        time.sleep(2)
        print('嘿嘿')
    
    
    start = time.time()
    # spawn会检测所有的任务
    执行任务
    a1 = spawn(heng)
    a2 = spawn(ha)
    a3 = spawn(heihei)
    spqaw 是一个列表,括号来一个,就把他放在括号里,再来,再放.
    检测:当哼遇到IO时,就执行ha,立即执行无效果,得等待代码执行完毕后,在来执行代码.spawn有返回值,用值接收.
    a1.join()
    a2.join()
    a3.join()
    得等待spawn代码运行完成后,在执行下面的代码
    stop = time.time()
    print(stop - start)
    gevent模块

    用协程来实现TCP客户端并发

    服务端

    import socket
    from gevent import monkey;monkey.patch_all()
    from gevent import spawn
    
    server = socket.socket()
    server.bind(('127.0.0.1',8080))
    server.listen(5)
    
    
    
    def talk(conn):
        while True:
            try:
                data = conn.recv(1024)
                if not len(data):
                    break
                print(data.decode('utf'))
                conn.send(data.upper())
            except ConnectionResetError as a:
                print(a)
                break
        conn.close()
    
    
    
    def server11():
        while True:
            conn, addr = server.accept()
            spawn(talk,conn)
    
    
    if __name__ == '__main__':
        g1 = spawn(server11)
        g1.join()
    服务端

    客户端

    import socket
    from threading import Thread,current_thread
    
    def client():
        client = socket.socket()
        client.connect(('127.0.0.1',8080))
    
        while True:
            n = 0
            data = '%s %s'%(current_thread().name,n)
            client.send(data.encode('utf-8'))
            res = client.recv(1024)
            print(res.decode('utf-8'))
            n += 1
    
    for i in range(400):
        t = Thread(target=client)
        t.start()
    客户端

    IO模型

    Stevens在文章中一共比较了五种IO Model:

    blocking IO 阻塞IO

    nonblocking IO 非阻塞IO

    IO multiplexing IO多路复用

    signal driven IO 信号驱动IO

    asynchronous IO 异步IO

    由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

    阻塞IO

     阻塞IO模型
    
    ​ 我用一个用户名用来执行登陆操作,问题用户名需要用户输入,输入需要耗时, 如果输入没有完成,后续逻辑无法继续,所以默认的处理方式就是 等
    
    ​ 将当前进程阻塞住,切换至其他进程执行,等到按下回车键,拿到了一个用户名,再唤醒刚才的进程,将状态调整为就绪态
    
    以上处理方案 就称之为阻塞IO模型
    
    存在的问题:
    ​ 当执行到recv时,如果对象并没有发送数据,程序阻塞了,无法执行其他任务
    
    解决方案:
        
    ​ 多线程或多进程,
    ​ 当客户端并发量非常大的时候,服务器可能就无法开启新的线程或进程,如果不对数量加以限制 服务器就崩溃了
    
    
    ​ 线程池或进程池
    ​ 首先限制了数量 保证服务器正常运行,但是问题是,如果客户端都处于阻塞状态,这些线程也阻塞了
    
    
    ​ 协程:
    ​ 使用一个线程处理所有客户端,当一个客户端处于阻塞状态时可以切换至其他客户端任务
    View Code

    非阻塞IO模型

    阻塞IO模型在执行recv 和 accept 时 都需要经历wait_data
    
    ​ 非阻塞IO即 在执行recv 和accept时 不会阻塞 可以继续往下执行
    ​ 如何使用:
    ​ 将server的blocking设置为False 即设置非阻塞
    
    ​ 存在的问题 :
    ​ 这样一来 你的进程 效率 非常高 没有任何的阻塞
    ​ 很多情况下 并没有数据需要处理,但是我们的进程也需要不停的询问操作系统 会导致CPU占用过高
    ​ 而且是无意义的占用
    View Code

    案例:

    import socket
    import time
    
    server = socket.socket()
    server.bind(("192.168.13.103",1688))
    server.listen()
    server.setblocking(False) # 默认为阻塞 设置为False 表示非阻塞
    
    # 用来存储客户端的列表
    clients = []
    
    # 连接客户端的循环
    while True:
    try:
    client,addr = server.accept() # 接受三次握手信息
    # print("来了一个客户端了.... %s" % addr[1])
    # 有人链接成功了
    clients.append(client)
    except BlockingIOError as e:
    # print("还没有人连过来.....")
    # time.sleep(0.5)
    # 服务你的客人去
    for c in clients[:]:
    try: # 可能这个客户端还没有数据过来
    # 开始通讯任务
    data = c.recv(2048)
    c.send(data.upper())
    except BlockingIOError as e:
    print("这个客户端还不需要处理.....",)
    
    except ConnectionResetError:
    # 断开后删除这个客户端
    clients.remove(c)
    print(len(clients))
    服务端
    import socket
    
    client = socket.socket()
    client.connect(('127.0.0.1',1266))
    while True:
        msg = input('msg')
        if not msg:
            continue
        client.send(msg.encode('utf-8'))
        print(client.recv(2048).decode('utf-8'))
    客户端

    select多路复用

    多路指的是:多个socket对象 一个socket就是一个传输通道
    复用:意思是指使用同一个线程处理所有socket
    原理:
    在非阻塞IO模型中我们需要自己不断的询问操作系统是否有数据需要处理
    多路复用,使用select来监测是否有socket可以被使用
    

      

    import socket
    import select
    
    server = socket.socket()
    server.bind(('127.0.0.1', 1688))
    server.listen()
    
    # select最多检测1024个socket,超出直接报错,这是socket自身的问题,最终解决方案epoll
    rlist = [server] # 将需要检测(是否可读recv)的socket对象放到该列表中
    # accept也是一个读数据的操作,默认也会阻塞,也需要select检测
    wlist = [] # 将需要检测(是否可写send)的socket对象放到该列表中
    
    msgs = []
    while True:
    r_list, w_list, _ = select.select(rlist, wlist, []) # 会阻塞,等到有一个或者多个socket,可以被处理
    print(r_list, w_list)
    for soc in r_list:
    if soc == server:
    client, addr = server.accept()
    rlist.append(client)
    else:
    try:
    data = soc.recv(2048)
    if not data:
    soc.close()
    rlist.remove(soc)
    continue
    msgs.append((soc,data))
    except ConnectionResetError as e:
    soc.close()
    rlist.remove(soc)
    print('这个客户端下线')
    # client.recv()
    for soc in w_list:
    for i in msgs[:]:
    if i[0] == soc:
    soc.send(i[1])
    msgs.remove(i)
    wlist.remove(soc)
    使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。
    缺点:
    因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄
    服务端

    异步IO

     用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
    

      

     总

  • 相关阅读:
    Thumbnailator压缩图片
    dubbo序列化的一点注意
    Java编程思想读书笔记之内部类
    Hello World
    sql中where和having的区别
    Linux下服务器搭建
    maven中profile的激活方式
    <![CDATA[ ]]>
    linux下用xampp安装php集成环境,并修改各自端口号
    关于星号(**/*.java)
  • 原文地址:https://www.cnblogs.com/komorebi/p/11360661.html
Copyright © 2011-2022 走看看