zoukankan      html  css  js  c++  java
  • 学习整理 进程池、线程池和协程

    学习整理 进程池、线程池和协程

    线程queue

    queue队列 :使用import queue,用法与进程Queue一样

    from queue import Queue,LifoQueue,PriorityQueue
    
    q=Queue()#先进先出
    
    q.put('first')
    q.put('second')
    print(q.get())
    print(q.get())
    
    
    q=LifoQueue()#实现堆栈,先进后出
    q.put('first')
    q.put('second')
    q.put('third')
    print(q.get())
    print(q.get())
    print(q.get())
    
    
    
    q=PriorityQueue()#可以根据优先级取数据
    q.put((20,'a'))
    q.put((10,'b'))
    q.put((15,'c'))
    print(q.get())
    print(q.get())
    print(q.get())
    

    线程定时器

    from threading import Thread,Timer
    import time
    
    
    def task():
        print('线程执行了')
        time.sleep(2)
        print('线程结束了')
    
    
    t = Timer(4,task) # 过了4s后开启了一个线程
    t.start()
    

    socket多线程

    服务端

    import socket
    from threading import Thread
    
    
    def talk(conn):
        while True:
            try:
                msg=conn.recv(1024)
                if len(msg) == 0:break
                conn.send(msg.upper())
            except ConnectionResetError:
                print('客户端关闭了一个链接')
                break
        conn.close()
    
    def sever_demo():
        server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        server.bind(('127.0.0.1',8081))
        server.listen(5)
    
        while True:
            conn,addr=server.accept()
            t = Thread(target=talk,args=(conn,))
            t.start()
    
    if __name__ == '__main__':
        sever_demo()
    
    

    客户端

    import socket
    from threading import Thread,currentThread
    
    def client_demo():
        client=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
        client.connect(('127.0.0.1',8081))
        while True:
            msg = f'{currentThread().name}'
            if len(msg) == 0: continue
            client.send(msg.encode('utf-8'))
            feedback = client.recv(1024)
            print(feedback.decode('utf-8'))
    
        client.close()
    
    if __name__ == '__main__':
        for i in range(20):
            t = Thread(target=client_demo)
            t.start()
    
    
    

    进程池和线程池

    进程池线程池:
    池的功能限制进程数或线程数.
    什么时候限制?
    当并发的任务数量远远大于计算机所能承受的范围,即无法一次性开启过多的任务数量
    我就应该考虑去限制我进程数或线程数,从保证服务器不崩.

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import currentThread
    from multiprocessing import current_process
    import time
    
    def task(i):
        print(f'{currentThread().name} 在执行任务 {i}')
        # print(f'进程 {current_process().name} 在执行任务 {i}')
        time.sleep(1)
        return i**2
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(4) # 池子里只有4个线程
        # pool = ProcessPoolExecutor(4) # 池子里只有4个线程
        fu_list = []
        for i in range(20):
            # pool.submit(task,i) # task任务要做20次,4个线程负责做这个事
            future = pool.submit(task,i) # task任务要做20次,4个进程负责做这个事
            # print(future.result()) # 如果没有结果一直等待拿到结果,导致了所有的任务都在串行
            fu_list.append(future)
        pool.shutdown() # 关闭了池的入口,会等待所有的任务执行完,结束阻塞.
        for fu in fu_list:
            print(fu.result())
    

    理解为提交任务的两种方式
    同步: 提交了一个任务,必须等任务执行完了(拿到返回值),才能执行下一行代码,

    异步: 提交了一个任务,不要等执行完了,可以直接执行下一行代码.

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import currentThread
    from multiprocessing import current_process
    import time
    
    def task(i):
        print(f'{currentThread().name} 在执行任务 {i}')
        # print(f'进程 {current_process().name} 在执行任务 {i}')
        time.sleep(1)
        return i**2
    
    
    def parse(future):
        # 处理拿到的结果
        print(future.result())
    
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(4) # 池子里只有4个线程
        # pool = ProcessPoolExecutor(4) # 池子里只有4个线程
        fu_list = []
        for i in range(20):
            # pool.submit(task,i) # task任务要做20次,4个线程负责做这个事
            future = pool.submit(task,i) # task任务要做20次,4个进程负责做这个事
            future.add_done_callback(parse)
            # 为当前任务绑定了一个函数,在当前任务执行结束的时候会触发这个函数,
            # 会把future对象作为参数传给函数
            # 这个称之为回调函数,处理完了回来就调用这个函数.
    
    
            # print(future.result()) # 如果没有结果一直等待拿到结果,导致了所有的任务都在串行
    
        # pool.shutdown() # 关闭了池的入口,会等待所有的任务执行完,结束阻塞.
        # for fu in fu_list:
        #     print(fu.result())
    

    协程

    python的线程用的是操作系统原生的线程

    协程:单线程下实现并发

    并发:切换+保存状态
    多线程:操作系统帮你实现的,如果遇到io切换,执行时间过长也会切换,实现一个雨露均沾的效果.
    什么样的协程是有意义的?
    遇到io切换的时候才有意义
    具体:
    协程概念本质是程序员抽象出来的,操作系统根本不知道协程存在,也就说来了一个线程我自己遇到io 我自己线程内部直接切到自己的别的任务上了,操作系统跟本发现不了,
    也就是实现了单线程下效率最高.

    优点:
    自己控制切换要比操作系统切换快的多
    缺点:HHHHGH
    对比多线程
    自己要检测所有的io,但凡有一个阻塞整体都跟着阻塞.
    对比多进程
    无法利用多核优势.

    为什么要有协程(遇到io切换)?
    自己控制切换要比操作系统切换快的多.降低了单个线程的io时间,

    import time
    def eat():
         print('eat 1')
         # 疯狂的计算呢没有io
         time.sleep(2)
         # for i in range(100000000):
         #     i+1
     def play():
         print('play 1')
         # 疯狂的计算呢没有io
         time.sleep(3)
         # for i in range(100000000):
         #     i+1
     play()
     eat() # 5s
    
        
     import time
     def func1():
         while True:
             1000000+1
             yield
    
     def func2():
         g = func1()
         for i in range(100000000):
             i+1
             next(g)
    
     start = time.time()
     func2()
     stop = time.time()
     print(stop - start) # 14.774465560913086
        
        
    对比通过yeild切换运行的时间反而比串行更消耗时间,这样实现的携程是没有意义的。
     import time
    
     def func1():
         for i in range(100000000):
             i+1
     def func2():
         for i in range(100000000):
             i+1
    
     start = time.time()
     func1()
     func2()
     stop = time.time()
     print(stop - start) # 8.630893230438232
    

    Gevent介绍

    安装

    pip3 install gevent

    Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。

    用法

    g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的

    g2=gevent.spawn(func2)

    g1.join() #等待g1结束

    g2.join() #等待g2结束

    或者上述两步合作一步:gevent.joinall([g1,g2])

    g1.value#拿到func1的返回值

    from gevent import monkey;monkey.patch_all()
    import gevent
    
    import time
    def eat():
        print('eat 1')
        time.sleep(2)
        print('eat 2')
    def play():
        print('play 1')
        # 疯狂的计算呢没有io
        time.sleep(3)
        print('play 2')
    
    start = time.time()
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    g1.join()
    g2.join()
    end = time.time()
    print(end-start)
    
  • 相关阅读:
    算法-经典趣题-寻找假银币
    一天一个 Linux 命令(3):cat 命令
    算法-经典趣题-青蛙过河
    常用数据库有哪些?
    SpringBoot2.0入门教程(一) 快速入门,项目构建HelloWorld示例
    一天一个 Linux 命令(2):ls 命令
    算法-经典趣题-爱因斯坦阶梯问题
    一天一个 Linux 命令(1):vim 命令
    什么是开发环境、测试环境、UAT环境、仿真环境、生产环境?
    算法-经典趣题-渔夫捕鱼
  • 原文地址:https://www.cnblogs.com/zhangmingyong/p/11552294.html
Copyright © 2011-2022 走看看