前言
源码:
Lib/concurrent/futures/thread.py
Lib/concurrent/futures/process.py
concurrent.futures:
异步执行可以由 ThreadPoolExecutor 或 ProcessPoolExecutor 来实现
两者都是实现抽像类 Executor 定义的接口
Future 方法对象说明
对象名 | 说明 |
---|---|
cancel() | 尝试去取消调用。如果调用当前正在执行,不能被取消。这个方法将返回False,否则调用将会被取消,方法将返回True |
cancelled() | 如果调用被成功取消返回True |
running() | 如果当前正在被执行不能被取消返回True |
done() | 如果调用被成功取消或者完成running返回True |
result(Timeout = None) | 拿到调用返回的结果。如果没有执行完毕就会去等待 |
exception(timeout=None) | 捕获程序执行过程中的异常 |
add_done_callback(fn) | 将fn绑定到future对象上。当future对象被取消或完成运行时,fn函数将会被调用 |
进程池、线程池使用案例
- 进程池与线程池调用方式、传参差不多,只是调用模块不同......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import os
import time
from concurrent.futures import ProcessPoolExecutor # 进程池模块
from concurrent.futures import ThreadPoolExecutor # 线程池模块
# 下面是以进程池为例, 线程池只是模块改一下即可
def task(name):
print("name: %s pis%s run" % (name, os.getpid()))
time.sleep(1)
if __name__ == "__main__":
# thread = ThreadPoolExecutor(2) # 设置进程池大小,默认等于cpu核数
pool = ProcessPoolExecutor(2) # 设置进程池大小,默认等于cpu核数
for i in range(5):
pool.submit(task, "进程%s" % i) # 异步提交(只是提交需要运行的线程不等待)
# 作用1:关闭进程池入口不能再提交了 作用2:相当于jion 等待进程池全部运行完毕
pool.shutdown(wait=True)
print("主进程")
异步调用与同步调用
concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用
同步调用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import os, time, random
from concurrent.futures import ProcessPoolExecutor # 进程池模块
# 1、同步调用: 提交完任务后、就原地等待任务执行完毕,拿到结果,再执行下一行代码(导致程序串行执行)
def task(name):
print("name: %s pis%s run" % (name,os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == "__main__":
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(task, "进程%s" % i).result() # 同步迪奥用,result(),相当于join 串行
pool.shutdown(wait=True)
print("主进程")
异步调用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
from concurrent.futures import ProcessPoolExecutor # 进程池模块
import os, time, random
def talk(name):
print("name: %s pis%s run" % (name,os.getpid()))
time.sleep(random.randint(1, 3))
if __name__ == "__main__":
pool = ProcessPoolExecutor(4)
for i in range(10):
pool.submit(talk, "进程%s" % i) # 异步调用,不需要等待
pool.shutdown(wait=True)
print("主进程")
回调机制
可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
#parse_page 拿到的是一个 future 对象 obj ,需要用 obj.result() 拿到结果
用法:
p.submit(这里异步调用).add_done_callback(方法)
回调例子
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import os
import time
import requests
from concurrent.futures import ThreadPoolExecutor # 线程池模块
def get(url):
print(f"GET {url}")
response = requests.get(url) # 下载页面
time.sleep(3) # 模拟网络延时
return {"url": url, "content": response.text} # 页面地址和页面内容
def parse(res):
# !取到res结果 【回调函数】带参数需要这样
res = res.result()
print(f"{res['url']} res is {len(res['content'])} Kb")
if __name__ == "__main__":
urls = {
"http://www.jd.com",
"http://www.baidu.com",
"http://www.cnblogs.com"
}
thread = ThreadPoolExecutor(2)
for i in urls:
# 【回调函数】执行完线程后,跟一个函数
thread.submit(get, i).add_done_callback(parse)
废弃代码 - 折叠
线程池
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import requests
from concurrent.futures import ThreadPoolExecutor
lis = [
"https://www.baidu.com",
"https://www.jd.com",
"https://www.cnblogs.com",
]
def response(url):
_res = requests.get(url)
print(_res.url)
# return _res
print("--- 主线程 ---")
executor = ThreadPoolExecutor(max_workers=3)
for url in lis:
future = executor.submit(response, url)
restful = future.running()
print(restful)
print("--- 主线程 | 结束 ---")
"""
打印:
--- 主线程 ---
True
True
True
--- 主线程 | 结束 ---
https://www.jd.com/
https://www.cnblogs.com/
https://www.baidu.com/
[Finished in 1.9s]
"""
进程池
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import time
from concurrent.futures import ProcessPoolExecutor
Maxlis =[
30000000,
31000000,
29000000
]
def count(Maxnum):
a = 0
start = time.time()
while True:
a += 1
if a == Maxnum:
break
print(" -> 结果: %s | 耗时 - %.3f" %(a, round(time.time()-start, 3)))
start = time.time()
print("--- 主线程 ---")
executor = ProcessPoolExecutor(max_workers=3)
for num in Maxlis:
future = executor.submit(count, num)
restful = future.running()
print(restful)
print(f"--- 主线程 | - 耗时 {round(time.time() - start, 2)} 结束 ---")
"""
打印:
--- 主线程 ---
True
False
False
--- 主线程 | - 耗时 1.92 结束 ---
-> 结果: 31000000 | 耗时 - 2.201
-> 结果: 30000000 | 耗时 - 2.221
-> 结果: 29000000 | 耗时 - 2.012
[Finished in 5.1s]
"""
submit 实现
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import time
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed
count = 0
maxsize = 10000000
lock = Lock()
def run():
global count
lock.acquire()
for _ in range(maxsize):
count += 1
lock.release()
print(count)
def result():
start_time = time.time()
with ThreadPoolExecutor(max_workers=3) as executor:
to_do = []
for site in range(3):
future = executor.submit(run)
to_do.append(future)
for future in as_completed(to_do):
future.result()
print(f"总耗时 - {round(time.time() - start_time, 2)} 秒")
result()
"""
打印:
10000000
20000000
30000000
总耗时 - 2.22 秒
[Done] exited with code=0 in 2.016 seconds
"""
submit 实现-2
# run() 方法跟上边的一样
# submit 调用方法不一样
def test_submit():
start_time = time.time()
test = ThreadPoolExecutor(max_workers=3)
for _ in range(3):
future = test.submit(run)
test.shutdown(wait=True)
print(f"总耗时 - {round(time.time() - start_time, 2)} 秒")
test_submit()
"""
打印:
10000000
20000000
30000000
总耗时 - 2.2 秒
[Done] exited with code=0 in 2.016 seconds
"""
回调函数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File : $file_name.py
# @Author : BenLam
# @Link : https://www.cnblogs.com/BenLam/
import os
import time
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
class Demo_1(object):
"""
$ Time is Money
$ 模仿批量请求网站,增加回调处理
$ 为了更好区分 futures, 特意加了 time.sleep 强制睡眠
$ response() 对象单线程执行就需要 30 秒
"""
def __init__(self):
self.lis = [
"https://www.baidu.com",
"https://www.jd.com",
"https://www.cnblogs.com",
]
def response(self, url):
time.sleep(10)
_res = requests.get(url)
return _res
def _restful(self, future, *args, **kwargs):
# 回调函数输出打印
_response = future.result()
print(f"当前线程名: - {threading.currentThread().name}|状态码: {_response.status_code}")
def main(self):
print("--- 主线程 ---")
start_time = time.time()
executor = ThreadPoolExecutor(max_workers=3)
for _ in self.lis:
# 增加回调函数 call_back(对象)
executor.submit(self.response, _).add_done_callback(self._restful)
executor.shutdown(wait=True)
print(f"--- 结束 | 总耗时: {round(time.time() - start_time, 3)} ---")
class Demo_2(object):
"""
$ 时间就是金钱,大家都懂
$ 多进程减法处理,增加回调处理
"""
def __init__(self):
self.lis = [10000, 20000, 30000]
def desc(self, num):
time.sleep(10)
while num > 0:
num -= 1
return num
def _restful(self, future, *args, **kwargs):
# 回调函数输出打印
_result = future.result()
print(f"当前进程: - {os.getpid()}|状态码: {_result}")
def main(self):
print("--- 主进程 ---")
start_time = time.time()
executor = ProcessPoolExecutor(max_workers=3)
for _ in self.lis:
executor.submit(self.desc, _).add_done_callback(self._restful)
executor.shutdown(wait=True)
print(f"--- 结束 | 总耗时: {round(time.time() - start_time, 3)} ---")
if __name__ == '__main__':
Demo_1().main()
"""
打印:
--- 主线程 ---
当前线程名: - ThreadPoolExecutor-0_1|状态码: 200
当前线程名: - ThreadPoolExecutor-0_2|状态码: 200
当前线程名: - ThreadPoolExecutor-0_0|状态码: 200
--- 结束 | 总耗时: 11.962 ---
[Finished in 12.7s]
"""
Demo_2().main()
"""
打印:
--- 主进程 ---
当前进程: - 9792|计算结果: 0
当前进程: - 9792|计算结果: 0
当前进程: - 9792|计算结果: 0
--- 结束 | 总耗时: 12.441 ---
[Finished in 13.2s]
"""
# [Finished in 24.1s]
Executor 死锁
- Executor 的子类使用线程池来异步执行调用,
- 如果使用不正确可能会造成死锁,
- submit 的 task 尽量不要调用 executor 和 futures, 不然很容易出现死锁
两个方法相互等待结束的死锁
from concurrent.futures import ThreadPoolExecutor
def wait_A():
time.sleep(8)
print(b.result())
return "bbbbbbbbbb"
def wait_B():
time.sleep(9)
print(a.result())
return "aaaaaaaaaa"
# submit 的 task 尽量不要调用 executor 和 futures, 不然很容易出现死锁
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_A)
b = executor.submit(wait_B)
"""
通过debug + 打断电,直观看到两个线程都在互相等待对方执行完毕
debug -
ThreadPoolExecutir-0_0 正在运行
ThreadPoolExecutir-0_1 正在运行
"""
调用内置函数的死锁
def wait_future():
# This will never complete because there is only one worker thread and
# it is executing this function.
f = executor.submit(abs, -5)
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_future)
"""
手动强制停止:
[Done] exited with code=1 in 25.838 seconds
"""
超时直接抛异常
- call_with_timeout 是定制一个超时方法,在规定时间内未完成会抛出异常
def call_with_timeout(func, *args, timeout=3):
"""
Executor API无法取消已在执行的调用。
future.cancel()只能取消尚未开始的通话。
如果你想要突然中止功能,
你应该使用除了concurrent.futures.ProcessPoolExecutor以外的东西。
future.result(timeout=timeout)
"""
executor = ThreadPoolExecutor(max_workers=1)
try:
future = executor.submit(func)
result = future.result(timeout=timeout)
finally:
executor.shutdown(wait=False)
call_with_timeout(wait_A)
"""
编译器打印:
Traceback (most recent call last):
File "concurrent.futures - submit.py", line 91, in <module>
call_with_timeout(wait_A)
File "concurrent.futures - submit.py", line 80, in call_with_timeout
result = future.result(timeout=timeout)
File "C:python36libconcurrentfutures\_base.py", line 434, in result
raise TimeoutError()
concurrent.futures._base.TimeoutError
[Done] exited with code=1 in 8.67 seconds
"""
map 方法
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(方法名字, 参数1, 参数2, 参数3)
- 例子
import os
from concurrent.futures import ThreadPoolExecutor
def test(n):
print(f"当前运行线程 - {os.getpid()} | - {n} ")
with ThreadPoolExecutor(max_workers=5) as executor:
executor.map(test, range(10))
executor.shutdown(wait=True)
"""
打印:
当前运行线程 - 5848 | - 0
当前运行线程 - 5848 | - 1
当前运行线程 - 5848 | - 2
当前运行线程 - 5848 | - 3
当前运行线程 - 5848 | - 4
当前运行线程 - 5848 | - 5
当前运行线程 - 5848 | - 6
当前运行线程 - 5848 | - 7
当前运行线程 - 5848 | - 8
当前运行线程 - 5848 | - 9
[Done] exited with code=0 in 0.661 seconds
"""