zoukankan      html  css  js  c++  java
  • 池-协程

    一、进程池、线程池

    什么是池?

    要在程序开始的时候,还没提交任务先创建几个线程或者进程

    放在一个池子里,这就是池。

     

    为什么要用池?

    如果先开好进程/线程,那么有任务之后就可以直接使用这个池中的进程或线程了

    并且开好的线程或者进程会一直存在池中,可以被多个任务反复利用,这样极大的减少了开启或关闭调度线程或进程的时间开销。

    池中的线程或进程个数控制了操作系统需要调度的任务个数,控制池中的单位,有利于提高操作系统的效率,减轻操作系统的负担。

    进程池:

    import os
    import time
    from concurrent.futures import ProcessPoolExecutor
    ​
    ​
    def func(i):
        print(f'子进程{i}——>start:{os.getpid()}')
        time.sleep(1)
        print(f'子进程{i}——>end:{os.getpid()}')
    ​
    ​
    if __name__ == '__main__':
        pp = ProcessPoolExecutor(2)     # 创建一个进程池,里面有两个进程
        for i in range(4):
            pp.submit(func, i)          # 把func任务放入池中去执行,i为传递的参数
            
    # 输出
    #(四个任务但总是只用进程池中的两个进程,其他的两个任务就排队等待前两个任务任意一个结束在去使用进程)
    子进程0——>start:14692
    子进程1——>start:6136
    子进程1——>end:6136
    子进程0——>end:14692
    子进程2——>start:6136
    子进程3——>start:14692
    子进程2——>end:6136
    子进程3——>end:14692

    线程池:

    import time
    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor
    ​
    ​
    def func(i):
        print(f'子线程{i}——>start:{current_thread().ident}')
        time.sleep(1)
        print(f'子线程{i}——>end:{current_thread().ident}')
    ​
    ​
    pp = ThreadPoolExecutor(2)  # 创建一个线程池,里面有两个线程
    for i in range(4):
        pp.submit(func, i)      # 把func任务放入池中去执行,i为传递的参数
        
    # 输出(和线程池一样)
    #(四个任务但总是只用线程池中的两个线程,其他的两个任务就排队等待前两个任务任意一个结束在去使用线程)
    子线程0——>start:15124
    子线程1——>start:1444
    子线程0——>end:15124
    子线程2——>start:15124
    子线程1——>end:1444
    子线程3——>start:1444
    子线程2——>end:15124
    子线程3——>end:1444

    获取任务结果:线程池和进程池差不多一样

    线程池举例:

    import time
    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor
    ​
    ​
    def func(i):
        time.sleep(1)
        count = i*(i+1)
        print(f'{i} * ( {i} + 1 ) = {count},线程号:{current_thread().ident}')
        return count
    ​
    ​
    pp = ThreadPoolExecutor(2)          # 创建一个线程池,里面有两个线程
    count_list = {}
    for i in range(5):                  # 异步非阻塞
        ret = pp.submit(func, i)        # 把func任务放入池中去执行,i为传递的参数
        count_list[f'任务{i}结果——>'] = ret    # 把所有任务的标号和结果放入这个字典
    # 同步阻塞:不管那个线程先结束,取值的时候都会等待第一个线程结束后取值后,在取第二个第三个...
    for c in count_list:                # 从字典中取出结果,它总会按照顺序取结果(同步阻塞)
        print(c, count_list[c].result())
        
    # 输出(同时只有两个线程在执行,其他的排队等待)
    0 * ( 0 + 1 ) = 0,线程号:15092
    1 * ( 1 + 1 ) = 2,线程号:10432
    任务0结果——> 0
    任务1结果——> 2
    2 * ( 2 + 1 ) = 6,线程号:15092
    3 * ( 3 + 1 ) = 12,线程号:10432
    任务2结果——> 6
    任务3结果——> 12
    4 * ( 4 + 1 ) = 20,线程号:15092
    任务4结果——> 20

    map:获取返回值

    只适合传递简单的参数,并且必须是一个可迭代的类型作为参数

    不管谁先结束还是会按照执行顺序获取结果(同步阻塞)

    import time
    import random
    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor
    ​
    ​
    def func(i):
        time.sleep(random.random())
        count = i*(i+1)
        print(f'{i} * ( {i} + 1 ) = {count},线程号:{current_thread().ident}')
        return f'任务{i}结果——> {count}'
    ​
    ​
    pp = ThreadPoolExecutor(2)          # 创建一个线程池,里面有两个线程
    ret = pp.map(func, range(4))        # 把func任务放入池中去执行,range(4)为传递的参数(必须是可迭代的)
    # 返回一个任务结果的生成器对象
    for key in ret:
        print(key)
        
    # 输出
    1 * ( 1 + 1 ) = 2,线程号:4024
    0 * ( 0 + 1 ) = 0,线程号:1664
    任务0结果——> 0
    任务1结果——> 2
    3 * ( 3 + 1 ) = 12,线程号:1664
    2 * ( 2 + 1 ) = 6,线程号:4024
    任务2结果——> 6
    任务3结果——> 12

    add_done_callback 回掉函数:效率最高

    可以对结果立即进行处理,而不是按照顺序接收结果,处理结果。

    (异步阻塞)ret这个任务会在执行完毕的瞬间立即触发print_func函数,并且把任务的返回值对象传递到print_func做参数

    就可以对结果立即进行处理,而不用按照顺序接收结果处理结果

    import time
    import random
    from threading import current_thread
    from concurrent.futures import ThreadPoolExecutor
    ​
    ​
    def func(i):
        time.sleep(random.random())
        number = i*(i+1)
        print(f'{i} * ( {i} + 1 ) = {number},线程号:{current_thread().ident}')
        return (i, number)
    ​
    ​
    def print_func(ret):
        index, count = ret.result()
        print(f'任务{index}结果——> {count}')
    ​
    ​
    pp = ThreadPoolExecutor(2)          # 创建一个线程池,里面有两个线程
    for i in range(4):                  # 创建4个任务
        ret = pp.submit(func, i)        # 把任务放入线程池执行
        ret.add_done_callback(print_func) 
        # ret这个任务会在执行完毕的瞬间立即触发print_func函数,并且把任务的返回值对象传递到print_func做参数(异步阻塞)
        
    # 输出
    1 * ( 1 + 1 ) = 2,线程号:7516
    任务1结果——> 2
    0 * ( 0 + 1 ) = 0,线程号:3620
    任务0结果——> 0
    3 * ( 3 + 1 ) = 12,线程号:3620
    任务3结果——> 12
    2 * ( 2 + 1 ) = 6,线程号:7516
    任务2结果——> 6

    二、协程

    协程的本质就是一条线程,多个任务在一条线程上来回切换,是操作系统不可见的。

    利用协程的概念实现的内容:规避IO操作,就达到了我们将一条线程中的IO操作降到最低的目的。

     

    切换并规避IO的两个模块

    gevent:利用了 greenlet 底层模块完成的切换 + 自动规避IO的功能

    asyncio:利用了 yield 底层语法完成的切换 + 自动规避IO的功能

     

    进程数据隔离数据不安全操作系统级别开销非常大能利用多核 
    线程 数据共享 数据不安全 操作系统级别 开销小 不能利用多核 一些和文件操作相关的IO 只有操作系统能感知到
    协程 数据共享 数据安全 用户级别 更小 不能利用多核 协程所有的切换都基于用户, 只有在用户级别能感知到的IO 才会用协程模块来做规避(socket,请求网页)

     

    协程有什么优点?

    减轻了操作系统的负担,单线程内就可以实现并发效果,最大限度的利用了cpu

    一条线程如果开了多个协程,那么给操作系统的印象是线程很忙,这样能多争取一些时间片来被CPU执行,程序的效率就提高了

    协程有什么缺点?

    协程的本质是单线程下的,无法利用多核。

    因为是单线程,因而一旦协程出现阻塞,将会阻塞整个线程。

     

    协程模块gevent:

    from gevent import monkey
    ​
    monkey.patch_all()      # 为了让gevent认识time是IO操作,必须写上。
    import gevent
    import time
    ​
    ​
    ​
    def func(i):
        print(f'任务{i}——>start')
        time.sleep(1)            # 带有io操作的内容写在函数里,然后提交func给gevent
        print(f'任务{i}——>end')
    ​
    ​
    g_list = []
    for i in range(3):
        g = gevent.spawn(func, i)
        g_list.append(g)
    ​
    gevent.joinall(g_list)      # 等待g_list列表里所有的任务结束
    # 输出
    任务0——>start
    任务1——>start
    任务2——>start
    任务0——>end
    任务1——>end
    任务2——>end
    ​
    """
    程序从上到下执行遇到spawn提交的函数,就去执行函数内容, 打印——>任务0——>start
    打印后遇到的IO操作,切出来又到了for循环,开始第2个任务,打印——>任务1——>start
    打印后又遇到IO操作,在切出来又到for循环,开始第3个任务,打印——>任务2——>start
    打印后又遇到IO操作,在切出来循环结束到了joinall,需要等待这三个遇到io的任务结束,又是阻塞
    在切换回任务1的IO操作之后,打印——>任务0——>end
    任务1结束出来又遇到joinall,在切换回任务1的IO操作之后,打印——>任务1——>end
    任务1结束出来又遇到joinall,在切换回任务2的IO操作之后,打印——>任务2——>end
    """

    gevent:扩展

    基于gevent协程实现socket并发

    服务端:

    import socket
    import gevent
    import time
    from gevent import monkey
    monkey.patch_all()      # 为了让gevent认识time和socket是IO操作,必须写上。
    ​
    sk = socket.socket()
    sk.bind(('127.0.0.1', 4444))
    sk.listen()
    ​
    ​
    def func(conn):
        msg = conn.recv(1024).decode('utf-8')
        while True:
            conn.send(msg.upper().encode('utf-8'))
            time.sleep(1)           # 这里也会被切换
    ​
    ​
    while True:
        conn, _ = sk.accept()       # accept阻塞等待链接这里会被切换
        gevent.spawn(func, conn)    # 接收到连接之后,就把任务提交给gevent执行
        
        
    # 当有连接来的时候,accept就会执行,然后把任务提交到gevent执行,遇到sleep的IO操作时
    # 又会切出来到accept,accept没有新来的连接就会在次切回sleep,然后再次发消息,当又执行到sleep时。
    # 又会切出来到accept,如果这时有新的链接接进来,就会就会再次执行accept,然后提交任务到gevent。
    # 然后又重复上面的切换,实现单线程并发

    客户端:

    import socket
    ​
    sk = socket.socket()
    sk.connect(('127.0.0.1', 4444))
    ​
    sk.send('hello'.encode('utf-8'))
    ​
    while True:
        msg = sk.recv(1024).decode('utf-8')
        print(msg)

    协程模块asyncio:

    import asyncio
    ​
    ​
    async def func(i):              # await 关键字必须写在一个async函数里
        print(f'任务{i}——> start')
        await asyncio.sleep(1)      # await后面接——>可能会发生阻塞的方法
        print(f'任务{i}——> end')
    ​
    ​
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait([func(1), func(2), func(3)]))
    ​
    # 输出(切换和gevent的方法差不多)
    任务1——> start
    任务3——> start
    任务2——> start
    任务1——> end
    任务3——> end
    任务2——> end
    学习之旅
  • 相关阅读:
    Advanced Configuration Tricks
    Reviewing the Blog Module
    Editing and Deleting Data
    Making Use of Forms and Fieldsets
    Understanding the Router
    SQL Abstraction and Object Hydration
    Preparing for Different Databases
    Java学习理解路线图
    Openstack学习历程_1_视频
    CentOS安装Nginx负载
  • 原文地址:https://www.cnblogs.com/XiaoYang-sir/p/14787822.html
Copyright © 2011-2022 走看看