目录
队列
import queue
q = queue.Queue(2)
q.put('蔡启龙')
# q.put('才气龙')
print(q.get()) # 蔡启龙
q.task_done()
q.join()
q = queue.LifoQueue() # last in first out 先进后出队列---堆栈
q.put('123')
q.put('456')
print(q.get()) # 456
q = queue.PriorityQueue() # 优先级队列,可以根据优先级取数据
q.put('2')
q.put('1')
q.put('3')
print(q.get()) # 1
# 通常放入的元组数据第一个值是int类型,数值小的先取出
线程定时器
import threading
import time
def task():
print('开始...')
time.sleep(3)
print('结束...')
t = threading.Timer(5, task) # 实例化得到线程定时器,interval---时间间隔,function---任务函数
t.start()
线程池和进程池
进程池和线程池---concurrent.futures
- 池的功能是限制进程数或线程数
- 什么时候限制
- 当并发的任务数量远远大于计算机所能承受的范围,即一次性无法开启过多的任务数量,就应该考虑限制进程数或线程数,保证服务器不崩溃
什么时候用进程池还是什么时候用线程池?
-
本质上是什么时候用多进程和什么是时候用多线程, 执行多个计算密集型任务时使用进程池, 执行多个I/O密集型任务时使用进程池
import concurrent.futures import threading import time import random def task(): print(f'任务{threading.current_thread()}开始...') print('*' * 50) time.sleep(1) print(f'任务{threading.current_thread()}结束...') print('*' * 50) return random.randint(1, 100) if __name__ == '__main__': thread_pool = concurrent.futures.ThreadPoolExecutor(5) # 线程池类,参数控制池子里有四个线程工作 future_lt = [] for i in range(20): # 循环一共向线程池提交20个任务 future = thread_pool.submit(task) # 向线程池提交一个task任务 # print(future.result()) # 获取一个任务执行结束的返回值,但会将任务提交方式变为同步提交 future_lt.append(future) thread_pool.shutdown() # 关闭线程池入口,但会将线程池中已经接受的任务全部执行完 for future in future_lt: # 在线程池所有任务结束后再处理所有任务结束的所有返回值 print(f'处理任务执行结束的返回值{future.result()}') # 处理一个任务执行结束的返回值
-
回调函数
import concurrent.futures import threading import time import random def task(): print(f'{threading.current_thread()}执行任务开始...') print('*' * 50) time.sleep(1) print(f'{threading.current_thread()}执行任务结束...') print('*' * 50) return f'{threading.current_thread()}执行任务的返回值', random.randint(1, 100) def parse(future): print(f'{threading.current_thread()}将{future.result()[0]}平方后结果为:{future.result()[1]**2}') if __name__ == '__main__': thread_pool = concurrent.futures.ThreadPoolExecutor(5) # 线程池类,参数控制池子里有四个线程工作 future_lt = [] for i in range(20): # 循环一共向线程池提交20个任务 future = thread_pool.submit(task) # 向线程池提交一个task任务 future.add_done_callback(parse) # 异步处理当前任务执行结束的返回值 # 1. 为当前任务绑定一个回调函数 # 2. 在当前任务结束会触发(调用)该回调函数,并将future对象当作第一个参数传入该回调函数 # 3. 回调函数会继续占用当前任务所在线程直至回调函数执行完毕
线程池和信号量的区别
- 线程池里面始终没有产生新的线程, 例如
concurrent.futures.ThreadPoolExecutor(5)
,表示所有的任务始终由线程池中固定的5个线程去并发执行 - 信号量可以有无数个线程并发执行任务,但是例如
threading.Semaphore(5)
,表示被执行的任务中被锁住的代码每次只能有5个线程去并发执行,而其他代码则是所有线程都可以并发去执行
协程
什么是协程
python的线程用的是操作系统原生的线程
- 单线程下模拟操作系统实现并发
- 并发:切换 + 保存状态
- 多线程:是操作系统实现的,如果遇到I/O会切换,执行时间过长也会切换,实现一个雨露均沾的效果
- 协程是程序员抽象出来的,操作系统中并没有协程的概念,也就是说在一个线程执行任务时,该任务如果遇到I/O,程序员自行控制该线程切换到别的任务上,从而减少I/O, 使得单线程下效率最高
什么样的协程是有意义的
-
遇到I/O时切换才有意义
# 多个计算密集型任务频繁切换反而比串行更消耗时间,这样的协程是没有意义的 import time def task1(): i = 0 while True: i += 1 # print(f'第{i}个阶段执行中...') yield # yield保存状态 def task2(): g = task1() # 固定一个生成器对象 for i in range(25000000): 10 ** 2 next(g) if __name__ == '__main__': start = time.time() task2() end = time.time() print(end - start) # 3.9937961101531982 def func1(): for i in range(25000000): 10 ** 2 start = time.time() func1() func1() end = time.time() print(end - start) # 1.2496771812438965
-
优点:
- 程序员自行控制的任务切换要比操作系统调度的切换快的多
-
缺点:
- 对比多线程: 需自行检测所有I/O, 但凡有一个阻塞整体都跟着阻塞
- 对比多进程: 无法利用多核优势
为什么要有协程
- 自行控制的任务切换要比操作系统调度的切换快的多, 从而降低了单个线程的I/O时间
gevent模块实现I/O监测和任务切换
import gevent
from gevent import monkey
import time
import random
monkey.patch_all() # 打补丁实现捕获非gevent的io
def task1():
print('task1开始...')
for i in range(25000000):
random.random() ** 2
time.sleep(2)
print('task1结束')
def task2():
print('task2开始...')
for i in range(25000000):
random.random() ** 2
time.sleep(3)
print('task2结束')
start = time.time()
g1 = gevent.spawn(task1)
g2 = gevent.spawn(task2)
g1.join()
g2.join()
end = time.time()
print(end - start) # 15.366976022720337 # 17.756819486618042---未打补丁