1.多进程
import multiprocessing as mp
from multiprocessing import Process
import time
def func(message):
time.sleep(1)
print('PID-{}: '.format(mp.current_process().pid), message)
if __name__ == '__main__':
for item in ['A', 'B', 'C', 'D']:
p = Process(target=func, args=(item,))
p.start() # 启动进程
p.join() # 阻塞等待
# PID-1684: A
# PID-4112: B
# PID-15832: C
# PID-5188: D
2.进程池
#1 Pool进程池
from multiprocessing import Pool
import multiprocessing as mp
import time
def func(message):
time.sleep(1)
print('PID-{}: '.format(mp.current_process().pid), message)
if __name__ == '__main__':
p = Pool(2)
for i in range(5):
p.apply_async(func, args=(i,))
p.close() #调用后不能添加新的Process
p.join() #阻塞等待
# PID-4368: 0
# PID-15972: 1
# PID-4368: 2
# PID-15972: 3
# PID-4368: 4
#2 ProcessPoolExecutor进程池
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def square(n):
time.sleep(1)
return n * n
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=3) as executor:
tasks = [executor.submit(square, num) for num in range(10)]
for future in as_completed(tasks):
print(future.result())
# 0 1 4 9 16 25 36 49 64 81
3.进程间通信
from multiprocessing import Process, Queue
import os
# 往队列写数据
def write(q):
for value in ['A', 'B', 'C', 'D']:
print('Process %s put %s to queue.' % (os.getpid(), value))
q.put(value)
# 从队列取数据
def read(q):
while True:
value = q.get(True)
print('Process %s get %s from queue.' % (os.getpid(), value))
if __name__ == '__main__':
q = Queue() # 父进程创建Queue
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start()
pr.start()
pw.join() # 等待pw结束
pr.terminate() # 终止pr进程
# Process 16304 put A to queue.
# Process 16304 put B to queue.
# Process 16304 put C to queue.
# Process 14676 get A from queue.
# Process 16304 put D to queue.
# Process 14676 get B from queue.
# Process 14676 get C from queue.
# Process 14676 get D from queue.
4.多线程
import threading
lock = threading.Lock()
num = 0
def func(n):
lock.acquire()
global num
num += 1
print('{}: '.format(n), num)
lock.release()
t1 = threading.Thread(target=func, args=('Tom',), name='Thread1', daemon=True)
t2 = threading.Thread(target=func, args=('Bob',), name='Thread2', daemon=True)
t1.start() # 启动线程
t2.start()
t1.join() # 阻塞等待线程
t2.join()
# Tom: 1
# Bob: 2
5.线程池
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def square(n):
time.sleep(1)
return n * n
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=3) as executor:
tasks = [executor.submit(square, num) for num in range(10)]
for future in as_completed(tasks):
print(future.result())
# 0 1 4 16 9 25 36 49 64 81
6.线程间通信
import threading
import queue
import random
import time
# 从队列取数据
def myqueue(queue):
while not queue.empty():
item = queue.get()
if item is None:
break
print("{} removed {} from the queue".format(threading.current_thread(), item))
queue.task_done()
time.sleep(2)
# 往队列放数据
q = queue.Queue()
for i in range(5):
q.put(i)
# 开启线程
threads = []
for i in range(4):
thread = threading.Thread(target=myqueue, args=(q,))
thread.start()
threads.append(thread)
# 等待线程
for thread in threads:
thread.join()
# <Thread(Thread-6, started 15372)> removed 0 from the queue
# <Thread(Thread-7, started 15556)> removed 1 from the queue
# <Thread(Thread-8, started 14688)> removed 2 from the queue
# <Thread(Thread-9, started 12408)> removed 3 from the queue
# <Thread(Thread-6, started 15372)> removed 4 from the queue
7.异步编程
import aiohttp
import asyncio
urls = ['https://www.baidu.com'] * 100
htmls = []
# 信号量,用于控制协程数量
sem = asyncio.Semaphore(10)
# 使用async/await关键字编写异步代码
async def getHtml(url):
async with(sem):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
html = await resp.text()
htmls.append(html)
def main():
# 获取事件循环
loop = asyncio.get_event_loop()
# 把所有任务放到一个列表中
tasks = [getHtml(url) for url in urls]
# 激活协程
loop.run_until_complete(asyncio.wait(tasks))
# 关闭事件循环
loop.close()
if __name__ == '__main__':
main()