zoukankan      html  css  js  c++  java
  • 队列, 进程池与线程池, 协程

    队列

    import queue
    
    q = queue.Queue(2)
    q.put('蔡启龙')
    # q.put('才气龙')
    print(q.get())  # 蔡启龙
    q.task_done()
    q.join()
    
    q = queue.LifoQueue()  # last in first out 先进后出队列---堆栈
    q.put('123')
    q.put('456')
    print(q.get())  # 456
    
    q = queue.PriorityQueue()  # 优先级队列,可以根据优先级取数据
    q.put('2')
    q.put('1')
    q.put('3')
    print(q.get())  # 1
    # 通常放入的元组数据第一个值是int类型,数值小的先取出
    
    

    线程定时器

    import threading
    import time
    
    
    def task():
        print('开始...')
        time.sleep(3)
        print('结束...')
    
    
    t = threading.Timer(5, task)  # 实例化得到线程定时器,interval---时间间隔,function---任务函数
    t.start()
    
    

    线程池和进程池

    进程池和线程池---concurrent.futures

    • 池的功能是限制进程数或线程数
    • 什么时候限制
      • 当并发的任务数量远远大于计算机所能承受的范围,即一次性无法开启过多的任务数量,就应该考虑限制进程数或线程数,保证服务器不崩溃

    什么时候用进程池还是什么时候用线程池?

    • 本质上是什么时候用多进程和什么是时候用多线程, 执行多个计算密集型任务时使用进程池, 执行多个I/O密集型任务时使用进程池

      import concurrent.futures
      import threading
      import time
      import random
      
      
      def task():
          print(f'任务{threading.current_thread()}开始...')
          print('*' * 50)
          time.sleep(1)
          print(f'任务{threading.current_thread()}结束...')
          print('*' * 50)
          return random.randint(1, 100)
      
      
      if __name__ == '__main__':
          thread_pool = concurrent.futures.ThreadPoolExecutor(5)  # 线程池类,参数控制池子里有四个线程工作
      
          future_lt = []
      
          for i in range(20):  # 循环一共向线程池提交20个任务
              future = thread_pool.submit(task)  # 向线程池提交一个task任务
              # print(future.result())  # 获取一个任务执行结束的返回值,但会将任务提交方式变为同步提交
              future_lt.append(future)
      
          thread_pool.shutdown()  # 关闭线程池入口,但会将线程池中已经接受的任务全部执行完
      
          for future in future_lt:  # 在线程池所有任务结束后再处理所有任务结束的所有返回值
              print(f'处理任务执行结束的返回值{future.result()}')  # 处理一个任务执行结束的返回值
      
      
    • 回调函数

      import concurrent.futures
      import threading
      import time
      import random
      
      
      def task():
          print(f'{threading.current_thread()}执行任务开始...')
          print('*' * 50)
          time.sleep(1)
          print(f'{threading.current_thread()}执行任务结束...')
          print('*' * 50)
          return f'{threading.current_thread()}执行任务的返回值', random.randint(1, 100)
      
      
      def parse(future):
          print(f'{threading.current_thread()}将{future.result()[0]}平方后结果为:{future.result()[1]**2}')
      
      
      if __name__ == '__main__':
          thread_pool = concurrent.futures.ThreadPoolExecutor(5)  # 线程池类,参数控制池子里有四个线程工作
      
          future_lt = []
      
          for i in range(20):  # 循环一共向线程池提交20个任务
              future = thread_pool.submit(task)  # 向线程池提交一个task任务
      
              future.add_done_callback(parse)  # 异步处理当前任务执行结束的返回值
              # 1. 为当前任务绑定一个回调函数
              # 2. 在当前任务结束会触发(调用)该回调函数,并将future对象当作第一个参数传入该回调函数
              # 3. 回调函数会继续占用当前任务所在线程直至回调函数执行完毕
      
      

    线程池和信号量的区别

    • 线程池里面始终没有产生新的线程, 例如concurrent.futures.ThreadPoolExecutor(5) ,表示所有的任务始终由线程池中固定的5个线程去并发执行
    • 信号量可以有无数个线程并发执行任务,但是例如threading.Semaphore(5) ,表示被执行的任务中被锁住的代码每次只能有5个线程去并发执行,而其他代码则是所有线程都可以并发去执行

    协程

    什么是协程

    python的线程用的是操作系统原生的线程

    • 单线程下模拟操作系统实现并发
      • 并发:切换 + 保存状态
      • 多线程:是操作系统实现的,如果遇到I/O会切换,执行时间过长也会切换,实现一个雨露均沾的效果
    • 协程是程序员抽象出来的,操作系统中并没有协程的概念,也就是说在一个线程执行任务时,该任务如果遇到I/O,程序员自行控制该线程切换到别的任务上,从而减少I/O, 使得单线程下效率最高

    什么样的协程是有意义的

    • 遇到I/O时切换才有意义

      # 多个计算密集型任务频繁切换反而比串行更消耗时间,这样的协程是没有意义的
      import time
      
      
      def task1():
          i = 0
          while True:
              i += 1
              # print(f'第{i}个阶段执行中...')
      
              yield  # yield保存状态
      
      
      def task2():
          g = task1()  # 固定一个生成器对象
          for i in range(25000000):
              10 ** 2
              next(g)
      
      
      if __name__ == '__main__':
          start = time.time()
          task2()
          end = time.time()
          print(end - start)  # 3.9937961101531982
      
      
      def func1():
          for i in range(25000000):
              10 ** 2
      
      
      start = time.time()
      func1()
      func1()
      end = time.time()
      print(end - start)  # 1.2496771812438965
      
      
    • 优点:

      • 程序员自行控制的任务切换要比操作系统调度的切换快的多
    • 缺点:

      • 对比多线程: 需自行检测所有I/O, 但凡有一个阻塞整体都跟着阻塞
      • 对比多进程: 无法利用多核优势

    为什么要有协程

    • 自行控制的任务切换要比操作系统调度的切换快的多, 从而降低了单个线程的I/O时间

    gevent模块实现I/O监测和任务切换

    import gevent
    from gevent import monkey
    import time
    import random
    
    monkey.patch_all()  # 打补丁实现捕获非gevent的io
    
    
    def task1():
        print('task1开始...')
        for i in range(25000000):
            random.random() ** 2
    
        time.sleep(2)
        print('task1结束')
    
    
    def task2():
        print('task2开始...')
        for i in range(25000000):
            random.random() ** 2
    
        time.sleep(3)
        print('task2结束')
    
    
    start = time.time()
    
    g1 = gevent.spawn(task1)
    g2 = gevent.spawn(task2)
    g1.join()
    g2.join()
    
    end = time.time()
    print(end - start)  # 15.366976022720337  # 17.756819486618042---未打补丁
    
    
  • 相关阅读:
    ES6和Node.js的import和export
    重写Router.prototype.push后还报NavigationDuplicated错误的解决方法
    nightwatch对前端做自动化测试
    使用video.js 7在html中播放rtmp视频流
    UEFI开发环境搭建
    类的静态成员
    const成员函数
    类和结构
    最长递增子序列
    C语言将十六进制字符串转化成十六进制
  • 原文地址:https://www.cnblogs.com/-406454833/p/11753288.html
Copyright © 2011-2022 走看看