主要内容
1 . 队列
from multiprocessing import Queue# 是用于多进程的队列,就是专门用来做进程间通信(IPC)。 import queue# 是用于同一进程内的队列,不能做多进程之间的通信 q = queue.Queue() # 先进先出 q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) q = queue.LifoQueue()# 后进先出的队列 q.put(1) q.put(2) q.put(3) print(q.get()) q = queue.PriorityQueue() # 优先级队列,put()方法接收的是一个元组(),第一个位置是优先级,第二个位置是数据 # 优先级如果是数字,直接比较数值 # 如果是字符串,是按照 ASCII 码比较的。当ASCII码相同时,会按照先进先出的原则 # q.put((1,'abc')) # q.put((5,'qwe')) # q.put((-5,'zxc')) # print(q.get()) # print(q.get()) # print(chr(48))
2 . 线程池
a : 定义 : 在一个池子里,放固定数量的线程,这些线程等待任务,一旦有任务来,就有线程自发的去执行任务。
b : concurrent.futures 这个模块是异步调用的机制, concurrent.futures 提交任务都是用submit
c : shutdown 是等效于Pool中的close+join,是指不允许再继续向池中增加任务,然后让父进程(线程)等待池中所有进程执行完所有任务。
d : 把多个任务扔进池中的方法, 及拿结果的方式
@ for + submit ,拿结果用result方法,
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time from multiprocessing import Pool def func(num): sum = 0 for i in range(num): sum = sum + i ** 2 return sum if __name__ == '__main__': li = [] t = ThreadPoolExecutor(20) for i in range(100): # for + submit方式执行任务. re = t.submit(func, i) #<Future at 0x1c31cccb128 state=finished returned int> li.append(re) t.shutdown() #相当于close+ join [print(re.result()) for re in li]
@map(func , iterable) 方式去提交多个任务,结果是一个生成器, 采用__next的方法拿结果.
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time from multiprocessing import Pool def func(num): sum = 0 for i in range(num): sum = sum + i ** 2 return sum if __name__ == '__main__': t = ThreadPoolExecutor(20) re = t.map(func, range(100)) #<generator object Executor.map.<locals>.result_iterator at 0x000001B12D88BFC0> print(re) t.shutdown() print(re.__next__()) # 通过__next取值. print(re.__next__()) print(re.__next__())
e : 进程池线程池的效率对比
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import time def func(num): sum = 0 for i in range(num): for j in range(i): for x in range(j): sum += x ** 2 print(sum) if __name__ == '__main__': # pool的进程池的效率演示 p = Pool(5) start = time.time() for i in range(100): p.apply_async(func,args=(i,)) p.close() p.join() print('Pool进程池的效率时间是%s'%(time.time() - start)) # 多进程的效率演示 tp = ProcessPoolExecutor(5) start = time.time() for i in range(100): tp.submit(func, i) tp.shutdown() # 等效于 进程池中的 close + join print('进程池的消耗时间为%s' % (time.time() - start)) # 多线程的效率 tp = ThreadPoolExecutor(20) start = time.time() for i in range(100): tp.submit(func,i) tp.shutdown()# 等效于 进程池中的 close + join print('线程池的消耗时间为%s'%(time.time() - start))
结果 : 针对计算密集的程序来说
不管是pool的进程池还是ProcessPoolExecutor()的进程池, 执行效率相当
ThreadPoolExecutor 的效率要差好多.
所以, 当计算密集时, 使用多进程.
f : 线程池中的回调函数
from threading import Thread
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor
import time
def func(num):
time.sleep(1)
print('这是在子线程中 ', current_thread())
return num
def call_back(re):
time.sleep(1)
print('这是在回调函数中', current_thread())
if __name__ == '__main__':
t = ThreadPoolExecutor(5)
for i in range(100):
t.submit(func, i).add_done_callback(call_back)
t.shutdown()
print('这是在主线程中', current_thread())
# 线程池中的回调函数是子线程调用的,跟父进程没有关系, 不可以用os.getpid()查看, 因为多个线程共享同一个pid, 用current_thread实现
无论是Processpoolexecutor 还是 pool中的回调函数都是父进程调用的, 跟子进程没有关系.(可以用os.getpid()查看)