1.线程池与进程池
1.concurrent.futures模块的基础是Exectuor,Executor是一个抽象类,它不能被直接使用。但是它提供的两个子类ThreadPoolExecutor和ProcessPoolExecutor却是非常有用,顾名思义两者分别被用来创建线程池和进程池的代码。我们可以将相应的tasks直接放入线程池/进程池,不需要维护Queue来操心死锁的问题,线程池/进程池会自动帮我们调度。
代码实现:
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ThreadPoolExecutor
import os,time,random
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
a).进程池(from concurrent.futures import ProcessPoolExecutor)
if __name__ == '__main__':
p=ProcessPoolExecutor()
l=[]
start=time.time()
for i in range(10):
obj=p.submit(task,i)
l.append(obj)
p.shutdown()
print('='*30)
print([obj for obj in l])
# print([obj.result() for obj in l])
print(time.time()-start)
b).线程池(from concurrent.futures import ThreadPoolExecutor)
if __name__ == '__main__':
p=ThreadPoolExecutor()
l=[]
start=time.time()
for i in range(10):
obj=p.submit(task,i)
l.append(obj)
p.shutdown()
print('='*30)
print([obj.result() for obj in l])
print(time.time()-start)
2.回调函数:.add_done_callback()
代码:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import requests
import os
import time
from threading import currentThread
def get_page(url):
print('%s:<%s> is getting [%s]' %(currentThread().getName(),os.getpid(),url))
response=requests.get(url)
time.sleep(2)
return {'url':url,'text':response.text}
def parse_page(res):
res=res.result()
print('%s:<%s> parse [%s]' %(currentThread().getName(),os.getpid(),res['url']))
with open('db.txt','a') as f:
parse_res='url:%s size:%s
' %(res['url'],len(res['text']))
f.write(parse_res)
if __name__ == '__main__':
p=ProcessPoolExecutor()# a).线程:
# p=ThreadPoolExecutor() # b).进程:
urls = [
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
'https://www.baidu.com',
]
for url in urls:
# .pool_obj.apply_async(get_page,args=(url,),callback=parse_page)
p.submit(get_page, url).add_done_callback(parse_page)
p.shutdown()
print('主',os.getpid())
3.map功能:
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time,random
def task(n):
print('%s is running' %os.getpid())
time.sleep(2)
return n**2
if __name__ == '__main__':
p=ProcessPoolExecutor()
obj=p.map(task,range(10))
p.shutdown()
print('='*30)
print(list(obj))
2.协程
a).greenlet模块:
from greenlet import greenlet
import time
def eat(name):
print('%s eat 1' %name)
# time.sleep(10)
g2.switch('egon')
print('%s eat 2' %name)
g2.switch()
def play(name):
print('%s play 1' %name)
g1.switch()
print('%s play 2' %name)
g1=greenlet(eat)
g2=greenlet(play)
g1.switch('egon')#可以在第一次switch时传入参数,以后都不需要
b).gevent模块
from gevent import monkey;monkey.patch_all()
import gevent
import time,threading
def eat(name):
print('%s eat 1' %name)
time.sleep(2)
print('%s eat 2' %name)
return 'eat'
def play(name):
print('%s play 1' %name)
time.sleep(1)
print('%s play 2' %name)
return 'play'
start=time.time()
g1=gevent.spawn(eat,'egon')
g2=gevent.spawn(play,'egon')
# g1.join()
# g2.join()
gevent.joinall([g1,g2])
print('主',(time.time()-start))
print(g1.value)
print(g2.value)
c).爬虫实例
from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time
def get_page(url):
print('GET: %s' %url)
response=requests.get(url)
if response.status_code == 200:
print('%d bytes received from %s' %(len(response.text),url))
start_time=time.time()
# get_page('https://www.python.org/')
# get_page('https://www.yahoo.com/')
# get_page('https://github.com/')
g1=gevent.spawn(get_page, 'https://www.python.org/')
g2=gevent.spawn(get_page, 'https://www.yahoo.com/')
g3=gevent.spawn(get_page, 'https://github.com/')
gevent.joinall([g1,g2,g3])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
d).客户端&服务端实例
客户端:
from multiprocessing import Process
from socket import *
def client(server_ip,server_port):
client=socket(AF_INET,SOCK_STREAM)
client.connect((server_ip,server_port))
while True:
client.send('hello'.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
if __name__ == '__main__':
for i in range(500):
p=Process(target=client,args=('127.0.0.1',8088))
p.start()
服务端:
from multiprocessing import Process
from socket import *
def client(server_ip,server_port):
client=socket(AF_INET,SOCK_STREAM)
client.connect((server_ip,server_port))
while True:
client.send('hello'.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
if __name__ == '__main__':
for i in range(500):
p=Process(target=client,args=('127.0.0.1',8088))
p.start()