1.线程池:
实例化线程池 ThreadPoolExcutor 起 5*cpu_count
异步提交任务 submit/map
阻塞直到任务结束 shutdown
获取子线程的返回值 result
回调函数 add_done_callback
起线程池推荐使用concurrent futrues
工作中提高效率,往往需要:
多进程 cpu_count
多线程 cpu_cpunt * 5
起单个线程
import time from concurrent.futures import ThreadPoolExecutor def func(i): print('thread',i) time.sleep(1) print('thread %s end',i) tp = ThreadPoolExecutor(5) tp.submit(func,1) tp.shutdown() print('主线程')
起多个线程
import time from concurrent.futures import ThreadPoolExecutor from threading import current_thread def func(i): print('thread',i) time.sleep(1) print('thread %s end',i,current_thread()) tp = ThreadPoolExecutor(5) #5个线程 for i in range(20): #20个任务 tp.submit(func,i) tp.shutdown() print('主线程')
获取线程池返回值
import time from concurrent.futures import ThreadPoolExecutor from threading import current_thread def func(i): print('thread',i) time.sleep(1) print('thread %s end',i,current_thread()) return i*'*' tp = ThreadPoolExecutor(5) #5个线程 ret_1 = [] for i in range(20): #20个任务 ret = tp.submit(func,i) # print(ret)#<Future at 0x1c984d42b00 state=pending>类似回执 # print(ret.result()) 这样取值,异步变同步 ret_1.append(ret) for ret in ret_1: print(ret.result()) tp.shutdown() print('主线程')
map简化操作
import time from concurrent.futures import ThreadPoolExecutor from threading import current_thread def func(i): print('thread',i) time.sleep(1) print('thread %s end',i,current_thread()) return i*'*' tp = ThreadPoolExecutor(5) #5个线程 res = tp.map(func,range(10))#map没有办法添加回调函数
for i in res: print(i)
线程池回调函数:与进程不同,线程回调函数由子线程实现
import time from concurrent.futures import ThreadPoolExecutor from threading import current_thread def func(i): print('thread',i,current_thread()) time.sleep(1) print('thread %s end' % i) return i*'*' def call_back(arg): print(arg.result()) tp = ThreadPoolExecutor(5) for i in range(20): ret = tp.submit(func,i).add_done_callback(call_back) print('主线程')
进程池回调函数
import time from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from threading import current_thread def func(i): print('thread',i,current_thread()) time.sleep(1) print('thread %s end' % i) return i*'*' def call_back(arg): print(arg.result()) if __name__ == '__main__': tp = ProcessPoolExecutor(5) for i in range(20): ret = tp.submit(func,i).add_done_callback(call_back) print('主线程')
2.协程 :纤程,比线程更小的单位,cpu没有办法看到
一条线程在多个任务之间来回切换
切换这个动作是耗费时间的
对于cpu,操作系统来说,协程是不存在的
它们只能看到线程
yield
def consumer(): p = producer() for num in p: print(num) def producer(): for i in range(1000): yield i consumer()
协程的优势:
更好的利用协程
1
一个线程的执行明确的切分开
两个任务,帮助你记住那个任务执行到那个位置上了,并且实现安全的切换
当一个任务不得不陷入阻塞的时候,在这个任务阻塞的时候,切换到另一个任务继续执行
你的程序只要还有任务需要执行,你的当前线程永远不会阻塞
2
利用协程在多个任务陷入阻塞的时候进行切换来保证一个线程在处理多个任务的时候总是忙碌
能够更加充分的利用cpu,抢占跟多的时间片
3
无论是进程或是线程,都是由操作系统来进行切换的,开启过多的进程或线程,会给操作系统的调度带来压力
如果开启的是协程,,协程在程序之间的切换操作系统感知不到,无论开启多少个协程,对于操作系统来说都是开启一个线程
系统的调度不会感到有压力
协程的本质就是一条线程,不会产生数据安全问题
3,协程模块:
greenlet gevent的底层,协程,切换的模块
gevent 直接用的,gevent能够提供更全面的功能
from greenlet import greenlet def eat(): print('eatting1') g2.switch() #切换 print('eatting2') def play(): print('playing1') print('playing2') # g1.switch() g1 = greenlet(eat) g2 = greenlet(play) g1.switch()
gevent 切换
def eat(): print('eatting1') time.sleep(1) print('eatting2') def play(): print('palying1') time.sleep(1) print('palying2') g1 = gevent.spawn(eat) #自动的检测阻塞事件,遇见阻塞了就会进行切换,不然不执行 g2 = gevent.spawn(play) #有些阻塞gevent不认识 g1.join() #阻塞知道g1结束, g2.join()
from gevent import monkey
monkey.patch_all()
记录一下模块,使gevent都认识
from gevent import monkey #记录一下模块,使gevent都认识 monkey.patch_all() import time import gevent def eat(): print('eatting1') time.sleep(1) print('eatting2') def play(): print('palying1') time.sleep(1) print('palying2') g1 = gevent.spawn(eat) #自动的检测阻塞事件,遇见阻塞了就会进行切换,不然不执行 g2 = gevent.spawn(play) #有些阻塞gevent不认识 g1.join() #阻塞知道g1结束, g2.join()
spawn(函数名) 产生了一个协程任务,在遇到IO操作的时候帮助我们在多任务之间进行切换
join() 阻塞,直到某个任务被执行完毕
join_all
value属性 获取返回值
from gevent import monkey;monkey.patch_all() import time import gevent def eat(): print('eat 1') time.sleep(1) print('eat 2') return 'eating end' def play(): print('play 1') time.sleep(1) print('play 2') return 'play finished' g1 = gevent.spawn(eat) g2 = gevent.spawn(play) gevent.joinall([g1,g2]) print(g1.value) print(g2.value)
爬虫
from gevent import monkey;monkey.patch_all() import gevent import time import requests url_lst = ['http://www.baidu.com', 'http://www.4399.com', 'http://www.7k7k.com', 'http://www.sogou.com', 'http://www.sohu.com', 'http://www.sina.com', 'http://www.jd.com'] def get_url(url): response = requests.get(url) if response.status_code == 200: print(url,len(response.text)) start = time.time() g_lst = [] for url in url_lst: ret = gevent.spawn(get_url,url) g_lst.append(ret) gevent.joinall(g_lst) print(time.time() - start)
协程来实现一个并发的socketserver
# from gevent import monkey;monkey.patch_all() # import sockt from gevent import socket #相当于monkey.... import gevent sk = socket.socket() sk.bind(('192.168.16.33',8088)) sk.listen() def server_talk(conn): while True: conn.send(b'byebye') print(conn.recv(1024)) while True: conn,addr = sk.accept() gevent.spawn(server_talk,conn) # 主进程accept 基本上都是阻塞
import socket from threading import Thread def client(): sk = socket.socket() sk.connect(('192.168.16.33', 8088)) while True: print(sk.recv(1024)) sk.send(b'hello') for i in range(300): Thread(target=client).start()