from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread import time import os ''' 池子中创建的进程/线程创建一次就不会再创建了,这样的话节省了反复开辟进程/线程的资源 ''' # pool = ProcessPoolExecutor(5) # 默认是当前计算机cpu的个数 pool = ThreadPoolExecutor(5) # 括号内可以传参数指定线程池内的线程个数,不传默认是当前所在计算机的cpu个数乘5 def test(n): # print(n,os.getpid()) # 查看当前进程号 print(n,current_thread().name) # 查看当前线程号 time.sleep(1) return n**2 def call_back(n): print('haha',n.result()) """ 提交任务的方式 同步:提交任务之后 原地等待任务的返回结果 期间不做任何事 异步:提交任务之后 不等待任务的返回结果,直接执行下一行代码 """
# pool.submit(task,1) # 朝线程池中提交任务 异步提交
# print('主') # 异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行 if __name__ == '__main__': for i in range(20): pool.submit(test,i).add_done_callback(call_back) # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数 pool.shutdown() #关闭池子 等待池子中所有的任务执行完毕之后 才会往下运行代码
协程 #串行执行 0.8540799617767334 mport time def func1(): for i in range(1000000): i + 1 def func2(): for i in range(1000000): i + 1 start = time.time() func1() func2() stop = time.time() print(stop-start) #基于yield并发执行 import time def func1(): while True: 10000000+1 yield def func2(): g=func1() for i in range(5): #time.sleep(1) # 模拟IO,yield并不会捕捉到并自动切换 i+1 next(g) start=time.time() func2() stop=time.time() print(stop-start)
gevent模块是能够识别IO的一个工具
from gevent import monkey;monkey.patch_all() # 能够识别IO from gevent import spawn # 切换cpu import time def ha(): print('ha') time.sleep(1) print('ha') def hei(): print('hei') time.sleep(1) print('hei') start = time.time() g1 = spawn(ha) #spawn 会检测所有任务 g2 = spawn(hei) g1.join() g2.join() print(time.time()-start)
通过协程实现并发
客户端 import socket from threading import Thread,current_thread def test(): client = socket.socket() client.connect(('127.0.0.1',8080)) n = 0 while True: data = '%s %s'%(current_thread().name,n) client.send(data.encode('utf-8')) res = client.recv(1024) print(res.decode('utf-8')) n += 1 for i in range(400): t = Thread(target=test) t.start() 服务端 from gevent import monkey;monkey.patch_all() import socket from gevent import spawn server = socket.socket() server.bind(('127.0.0.1',8080)) server.listen(5) def task(conn): while True: try: res = conn.recv(1024) if len(res) == 0: break print(res.decode('utf-8')) conn.send(res.upper()) except ConnectionResetError as e: print(e) break conn.close() def server1(): while True: conn, addr = server.accept() spawn(task,conn) if __name__ == '__main__': g = spawn(server1) g.join()