一、概述
1、非并发
- 程序由单个步骤序列构成
- 包含独立子任务的程序执行性能低
2、并发
- 异步、高效
- 分解子任务、简化流程与逻辑
3、进程process
- 一个程序的执行实例
- 每个进程有自己的地址空间、内存、数据栈及辅助数据
4、线程
- 同一进程内,可被并行激活的控制流
- 共享相同上下文(空间地址、数据结构)
- 特点
便于信息共享和通信
线程访问顺序差异会导致结果不一致(条件 race condition)
5、Python GIL 全局解释器锁
- Global Interpreter Lock
- Python 代码由虚拟机(解释器主循环)控制
- 主循环同时只能有一个控制线程执行
二、多线程
单线程示例:
import time
def func(n):
print('函数开始执行于:{}'.format(time.ctime()))
time.sleep(n)
print(f'函数执行结束于:{time.ctime()}')
def main():
print(f'【函数执行开始于:{time.ctime()}】')
func(4)#在这里是一个func函数运行完再运行下一个函数func,从输出可以看出
func(2)
print(f'【函数执行结束于:{time.ctime()}】')
if __name__ == '__main__':
main()
【函数执行开始于:Wed Sep 11 21:02:15 2019】
函数开始执行于:Wed Sep 11 21:02:15 2019
函数执行结束于:Wed Sep 11 21:02:19 2019
函数开始执行于:Wed Sep 11 21:02:19 2019
函数执行结束于:Wed Sep 11 21:02:21 2019
【函数执行结束于:Wed Sep 11 21:02:21 2019】
1、_thread
特点
- 没有控制进程结束机制
- 只有一个同步原语(锁)
- 功能少于threading模块
.start_new_thread(function, args, **kwargs=None) 开始线程
import time
import _thread
def func(n):
print('函数开始执行于:{}'.format(time.ctime()))
time.sleep(n)
print(f'函数执行结束于:{time.ctime()}')
def main():
print(f'【函数执行开始于:{time.ctime()}】')
_thread.start_new_thread(func, (4,)) #这是是开始运行func(4)之后,不等他结束,就开始运行下一个func(2)
_thread.start_new_thread(func, (2,))
time.sleep(4)
print(f'【函数执行结束于:{time.ctime()}】')
if __name__ == '__main__':
main()
【函数执行开始于:Sun Sep 15 17:23:17 2019】
函数开始执行于:Sun Sep 15 17:23:17 2019
函数开始执行于:Sun Sep 15 17:23:17 2019
函数执行结束于:Sun Sep 15 17:23:19 2019
函数执行结束于:Sun Sep 15 17:23:21 2019
【函数执行结束于:Sun Sep 15 17:23:23 2019】
2、threading 模块
.Thread线程类
- 构造
- .Thread(target=目标函数, args=(参数,))
- 自定义Thread派生类,重写run方法逻辑
- .start()启动线程
- .join()要求主线程等待
- .name 线程名字
import time
import threading #threading.Thread()实现多线程
def func(n):
print('{} 函数开始执行于:{}'.format(threading.current_thread().name, time.ctime()))
time.sleep(n)
print(f'{threading.current_thread().name} 函数执行结束于:{time.ctime()}')
def main():
print(f'【函数执行开始于:{time.ctime()}】')
threads = []
t1 = threading.Thread(target=func, args=(4,))
threads.append(t1)
t2 = threading.Thread(target=func, args=(2,))
threads.append(t2)
for t in threads:
t.start()
for t in threads:
t.join()
print(f'【函数执行结束于:{time.ctime()}】')
if __name__ == '__main__':
main()
【函数执行开始于:Sun Sep 15 17:45:59 2019】
Thread-22 函数开始执行于:Sun Sep 15 17:45:59 2019
Thread-23 函数开始执行于:Sun Sep 15 17:45:59 2019
Thread-23 函数执行结束于:Sun Sep 15 17:46:01 2019
Thread-22 函数执行结束于:Sun Sep 15 17:46:03 2019
【函数执行结束于:Sun Sep 15 17:46:03 2019】
.current_thread() 获取当前线程
import time
import threading #threading.Thread()实现多线程
def func(n):
print('{} 函数开始执行于:{}'.format(threading.current_thread().name, time.ctime()))
time.sleep(n)
print(f'{threading.current_thread().name} 函数执行结束于:{time.ctime()}')
class MyThread(threading.Thread): #通过定义一个类来执行
def __init__(self, func, args):
threading.Thread.__init__(self)
self.func = func
self.args = args
def run(self):
self.func(*self.args)
def main():
print(f'【函数执行开始于:{time.ctime()}】')
threads = []
t1 = MyThread(func, (4,))
threads.append(t1)
t2 = MyThread(func, (2,))
threads.append(t2)
for t in threads:
t.start()
for t in threads:
t.join()
print(f'【函数执行结束于:{time.ctime()}】')
if __name__ == '__main__':
main()
【函数执行开始于:Sun Sep 15 17:41:15 2019】
Thread-20 函数开始执行于:Sun Sep 15 17:41:15 2019
Thread-21 函数开始执行于:Sun Sep 15 17:41:15 2019
Thread-21 函数执行结束于:Sun Sep 15 17:41:17 2019
Thread-20 函数执行结束于:Sun Sep 15 17:41:19 2019
【函数执行结束于:Sun Sep 15 17:41:19 2019】
threading.Lock 同步原语:锁
- .acquire() 获得
- .release() 释放
- 支持上下文操作 with lock:
import threading
import time
import random
eggs = []
lock = threading.Lock() #创建锁
def put_egg(n, lst):
#lock.acquire() #关闭锁
with lock:
for i in range(1, n+1):
time.sleep(random.randint(0, 2))
lst.append(i)
#lock.release() #释放锁
def main():
threads = []
for i in range(3):
t = threading.Thread(target=put_egg, args=(5, eggs))
threads.append(t)
for i in threads:
i.start()
for i in threads:
i.join()
print(eggs)
if __name__ == '__main__':
main()
[1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
三、队列:queue 模块
Queue FIFO
- 构造实例 .Queue(maxsize=0)
- 放入数据项 .put(item, block=True, timeout=None)
- 获取数据项 .get(block=True, timeout=None)
- 声明当前队列任务处理完毕 .task_done()
- 队列所有项处理完毕前阻塞 .join()
import threading
import queue
import time
import random
def producer(data_queue):
for i in range(5):
time.sleep(0.5)
item = random.randint(1, 100)
data_queue.put(item)
print(f'{threading.current_thread().name} 在队列中放入数据项:{item}')
def consumer(data_queue):
while True:
try:
item = data_queue.get(timeout=3)
print(f'{threading.current_thread().name} 从队列中移除了:{item}')
except queue.Empty:
break
else:
data_queue.task_done()
def main():
q = queue.Queue()
threads = []
p = threading.Thread(target=producer, args=(q,))
p.start()
for i in range(2):
c = threading.Thread(target=consumer, args=(q,))
threads.append(c)
for t in threads:
t.start()
for t in threads:
t.join()
q.join()
if __name__ == '__main__':
main()
Thread-44 在队列中放入数据项:11Thread-45 从队列中移除了:11
Thread-44 在队列中放入数据项:62Thread-46 从队列中移除了:62
Thread-44 在队列中放入数据项:17Thread-45 从队列中移除了:17
Thread-44 在队列中放入数据项:50Thread-46 从队列中移除了:50
Thread-44 在队列中放入数据项:65Thread-45 从队列中移除了:65
LifoQueue LIFO
PriorityQueue 优先队列
四、多进程--multiprocessing模块
- 充分运用多核、多CPU的计算能力,适用于计算密集型任务
import time
import multiprocessing
def func(n):
print(f'{threading.current_process().name} 执行开始于:{time.ctime()}')
time.sleep(n)
print(f'{threading.current_process().name} 执行结束于:{time.ctime()}')
def main():
print(f'主函数运行于:{time.ctime()}')
processes = []
p1 = multiprocessing.Process(target=func, args=(4,))
processes.append(p1)
p2 = multiprocessing.Process(target=func, args=(2,))
processes.append(p2)
for p in processes:
p.start()
for p in processes:
p.join()
print(f'主函数结束于:{time.ctime()}')
if __name__ == '__main__':
main()
主函数运行于:Sun Sep 15 18:46:37 2019
主函数结束于:Sun Sep 15 18:46:37 2019
五、concurrent.futures 模块
- ThreadPoolExecutor 快速实现多线程
- ProcessPoolExecutor
import time
import concurrent.futures
numbers = list(range(1, 11))
def count(n):
for i in range(1000000):
i += 1
return i * n
def worker(x):
result = count(x)
print(f'数字:{x} 的计算结果是:{result}')
# 顺序执行
def sequential_execution():
start_time = time.clock()
for i in numbers:
worker(i)
print(f'顺序执行花费时间:{time.clock()-start_time} 秒')
# 线程池执行
def threading_execution():
start_time = time.clock()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for i in numbers:
executor.submit(worker, i)
print(f'线程池执行花费时间:{time.clock()-start_time} 秒')
# 进程池执行
def process_execution():
start_time = time.clock()
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
for i in numbers:
executor.submit(worker, i)
print(f'进程池执行花费时间:{time.clock()-start_time} 秒')
if __name__ == '__main__':
sequential_execution()
threading_execution()
process_execution()
顺序执行花费时间:0.5342738079998526 秒
线程池执行花费时间:1.0919637639999564 秒
进程池执行花费时间:0.16994214700025623 秒