浅谈 Python 多线程、进程、协程上手体验
前言:浅谈 Python 很多人都认为 Python 的多线程是垃圾(GIL 说这锅甩不掉啊~);本章节主要给你体验下 Python 的两个库
- Threading
- Multiprocessing
- Gevent - 第三方包
有一种包叫Grequests = Gevent + requests 的结合,协程爬虫
一.线程
Threading
Threading 模块建立在 _thread 模块之上。_thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。
Demo:
import threading
from time import sleep
def A():
'''打印 “A” 五次'''
for x in range(5):
print("AAA")
sleep(1.0)
def B():
for x in range(5):
print("BBB")
sleep(1.0)
def main():
t1 = threading.Thread(target=A)
t2 = threading.Thread(target=B)
t1.start()
t2.start()
if __name__ == '__main__':
main()
打印结果:
AAA
BBB
BBB
AAA
BBB
AAA
BBB
AAA
BBB
AAA
[Finished in 5.3s]
如果我们按照常规的方式执行 A()、B()方法,将耗时更多,结果如下:
AAA
AAA
AAA
AAA
AAA
BBB
BBB
BBB
BBB
BBB
[Finished in 10.3s]
对比下时间就知道多线程的重要性,简单来说就是花费更少时间做事得到最高的回报。
Threading 常用方法:
t.start() : 激活线程,
t.getName() : 获取线程的名称
t.setName() : 设置线程的名称
t.name : 获取或设置线程的名称
t.is_alive() : 判断线程是否为激活状态
t.isAlive() :判断线程是否为激活状态
t.setDaemon() : 父线程打印内容后便结束了,不管子线程是否执行完毕了
t.isDaemon() : 判断是否为守护线程
t.ident : 获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
t.join() : 逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
t.run() : 线程被cpu调度后自动执行线程对象的run方法
不一样的 _Thread
说明:Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。
Demo:
import _thread
import time
def print_time( threadName, delay):
count = 0
while count < 5:
time.sleep(delay)
count += 1
print ("%s: %s" % ( threadName, time.ctime(time.time()) ))
try:
_thread.start_new_thread( print_time, ("Thread-1", 2, ) )
_thread.start_new_thread( print_time, ("Thread-2", 4, ) )
except:
print ("Error: 无法启动线程")
while True:
pass
打印结果:
Thread-1: Wed Mar 20 15:36:32 2019
Thread-1: Wed Mar 20 15:36:34 2019
Thread-2: Wed Mar 20 15:36:34 2019
Thread-1: Wed Mar 20 15:36:36 2019
Thread-2: Wed Mar 20 15:36:38 2019
Thread-1: Wed Mar 20 15:36:38 2019
[Cancelled]
_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。
- _Thread 模块已被废弃,Threading 身为它的接班人
二.进程
Multiprocessing
Multiprocessing 模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块 threading 的编程接口类似。
- 启动进程数量应该是根据CPU个数来确定的,最好是2 * CPU数 + 1
from multiprocessing import Process
import time
def _proces(name):
print("Process " + name)
time.sleep(1.0)
if __name__ == "__main__":
p1 = Process(target=_proces, args=('A',))
p1.start()
p1.join()
p2 = Process(target=_proces, args=('B',))
p2.start()
p2.join()
输出打印结果如下:
Process A
Process B
[Finished in 3.2s]
Multiprocessing 常用方法:
p.start():启动进程,并调用该子进程中的p.run()
p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
p.is_alive():如果p仍然运行,返回True
p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=(1,2,'hexin',)
kwargs表示调用对象的字典,kwargs={'name':'hexin','age':18}
name为子进程的名称
三.协程
Gevent
Gevent 又称微线程,在单线程上执行多个任务,用函数切换,开销极小。不通过操作系统调度,没有进程、线程的切换开销。genvent,monkey.patchall
Demo:
from gevent import monkey
monkey.patch_all()
import gevent
import urllib.request
def run(url):
print('run --> %s' % url)
try:
response = urllib.request.urlopen(url)
data = response.read()
print('%d bytes received from %s.' % (len(data), url))
except Exception as e:
print(e)
if __name__ == '__main__':
urls = ['https://www.baidu.com','https://www.jd.com','https://www.cnblogs.com/']
lis = [gevent.spawn(run, url) for url in urls]
gevent.joinall(lis)
输出打印结果如下:
run --> https://www.baidu.com
run --> https://www.jd.com
run --> https://www.cnblogs.com/
227 bytes received from https://www.baidu.com.
111917 bytes received from https://www.jd.com.
47872 bytes received from https://www.cnblogs.com/.
[Finished in 0.7s]
四.实例
多线程Demo:
from threading import Thread
from queue import Queue
from lxml import etree
import requests
class douban(Thread):
def __init__(self, url, q):
# 重写写父类的__init__方法
super(douban, self).__init__()
self.url = url
self.q = q
self.headers={}
def run(self):
self.parse_page()
def send_request(self,url):
'''
用来发送请求的方法
:return: 返回网页源码
'''
# 请求出错时,重复请求3次,
i = 0
while i <= 3:
try:
html = requests.get(url=url,headers=self.headers).content
except Exception as e:
print(u'[DEBUG] %s%s'% (e,url))
i += 1
else:
return html
def parse_page(self):
'''
解析网站源码,并采用 xpath 提取 电影名称和平分放到队列中
:return:
'''
response = self.send_request(self.url)
html = etree.HTML(response)
# 获取到一页的电影数据
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 电影名称
title = move.xpath('.//a/span/text()')[0]
# 电影主题
theme = str(move.xpath('.//div[@class="bd"]//span[@class="inq"]/text()'))
# 评分
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# 将每一部电影的名称跟评分加入到队列
self.q.put(title + " |" + score +" " + theme)
def main():
# 创建一个队列用来保存进程获取到的数据
q = Queue()
base_url = 'https://movie.douban.com/top250?start='
# 构造所有 url
url_list = [base_url+str(num) for num in range(0,50+1,25)]
# 保存线程
Thread_list = []
# 创建并启动线程
for url in url_list:
p = douban(url,q)
p.start()
Thread_list.append(p)
# 让主线程等待子线程执行完成
for i in Thread_list:
i.join()
while not q.empty():
print(q.get())
if __name__=="__main__":
main()
输出打印:
肖申克的救赎 |9.6 ['希望让人自由。']
霸王别姬 |9.6 ['风华绝代。']
这个杀手不太冷 |9.4 ['怪蜀黍和小萝莉不得不说的故事。']
阿甘正传 |9.4 ['一部美国近现代史。']
美丽人生 |9.5 ['最美的谎言。']
泰坦尼克号 |9.3 ['失去的才是永恒的。 ']
千与千寻 |9.3 ['最好的宫崎骏,最好的久石让。 ']
辛德勒的名单 |9.5 ['拯救一个人,就是拯救整个世界。']
..........(省略)
..........(省略)
[Finished in 0.9s]
多进程Demo:
from multiprocessing import Process, Queue
import time
from lxml import etree
import requests
class douban(Process):
def __init__(self, url, q):
# 重写写父类的__init__方法
super(douban, self).__init__()
self.url = url
self.q = q
self.headers = {}
def run(self):
self.parse_page()
def send_request(self,url):
'''
用来发送请求的方法
:return: 返回网页源码
'''
# 请求出错时,重复请求3次,
i = 0
while i <= 3:
try:
return requests.get(url=url,headers=self.headers).content
except Exception as e:
print(u'[DEBUG] %s%s'% (e,url))
i += 1
def parse_page(self):
'''
解析网站源码,并采用xpath提取 电影名称和平分放到队列中
:return:
'''
response = self.send_request(self.url)
html = etree.HTML(response)
# 获取到一页的电影数据
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 电影名称
title = move.xpath('.//a/span/text()')[0]
# 电影主题
theme = str(move.xpath('.//div[@class="bd"]//span[@class="inq"]/text()'))
# 评分
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# 将每一部电影的名称跟评分加入到队列
self.q.put(title + " |" + score +" " + theme)
def main():
# 创建一个队列用来保存进程获取到的数据
q = Queue()
base_url = 'https://movie.douban.com/top250?start='
# 构造所有url
url_list = [base_url+str(num) for num in range(0,50+1,25)]
# 保存进程
Process_list = []
# 创建并启动进程
for url in url_list:
p = douban(url,q)
p.start()
Process_list.append(p)
# 让主进程等待子进程执行完成
for i in Process_list:
i.join()
while not q.empty():
print(q.get())
if __name__=="__main__":
main()
输出打印:
肖申克的救赎 |9.6 ['希望让人自由。']
霸王别姬 |9.6 ['风华绝代。']
这个杀手不太冷 |9.4 ['怪蜀黍和小萝莉不得不说的故事。']
阿甘正传 |9.4 ['一部美国近现代史。']
美丽人生 |9.5 ['最美的谎言。']
泰坦尼克号 |9.3 ['失去的才是永恒的。 ']
千与千寻 |9.3 ['最好的宫崎骏,最好的久石让。 ']
辛德勒的名单 |9.5 ['拯救一个人,就是拯救整个世界。']
盗梦空间 |9.3 ['诺兰给了我们一场无法盗取的梦。']
忠犬八公的故事 |9.3 ['永远都不能忘记你所爱的人。']
机器人总动员 |9.3 ['小瓦力,大人生。']
..........(省略)
..........(省略)
[Finished in 2.4s]
协程Demo:
from queue import Queue
import time
from lxml import etree
import requests
import gevent
from gevent import monkey
monkey.patch_all()
class douban(object):
def __init__(self):
# 创建一个队列用来保存进程获取到的数据
self.q = Queue()
self.headers = {}
def run(self,url):
self.parse_page(url)
def send_request(self,url):
'''
用来发送请求的方法
:return: 返回网页源码
'''
# 请求出错时,重复请求3次,
i = 0
while i <= 3:
try:
html = requests.get(url=url,headers=self.headers).content
except Exception as e:
print(u'[DEBUG] %s%s'% (e,url))
i += 1
else:
return html
def parse_page(self,url):
'''
解析网站源码,并采用xpath提取 电影名称和平分放到队列中
:return:
'''
response = self.send_request(url)
html = etree.HTML(response)
# 获取到一页的电影数据
node_list = html.xpath("//div[@class='info']")
for move in node_list:
# 电影名称
title = move.xpath('.//a/span/text()')[0]
# 电影主题
theme = str(move.xpath('.//div[@class="bd"]//span[@class="inq"]/text()'))
# 评分
score = move.xpath('.//div[@class="bd"]//span[@class="rating_num"]/text()')[0]
# 将每一部电影的名称跟评分加入到队列
self.q.put(title + " |" + score +" " + theme)
def main(self):
base_url = 'https://movie.douban.com/top250?start='
# 构造所有url
url_list = [base_url+str(num) for num in range(0,225+1,25)]
# 创建协程并执行
job_list = [gevent.spawn(self.run,url) for url in url_list]
# 让线程等待所有任务完成,再继续执行。
gevent.joinall(job_list)
while not self.q.empty():
print(self.q.get())
if __name__=="__main__":
douban = douban()
douban.main()
输出打印:
肖申克的救赎 |9.6 ['希望让人自由。']
霸王别姬 |9.6 ['风华绝代。']
这个杀手不太冷 |9.4 ['怪蜀黍和小萝莉不得不说的故事。']
阿甘正传 |9.4 ['一部美国近现代史。']
美丽人生 |9.5 ['最美的谎言。']
泰坦尼克号 |9.3 ['失去的才是永恒的。 ']
千与千寻 |9.3 ['最好的宫崎骏,最好的久石让。 ']
辛德勒的名单 |9.5 ['拯救一个人,就是拯救整个世界。']
盗梦空间 |9.3 ['诺兰给了我们一场无法盗取的梦。']
忠犬八公的故事 |9.3 ['永远都不能忘记你所爱的人。']
机器人总动员 |9.3 ['小瓦力,大人生。']
三傻大闹宝莱坞 |9.2 ['英俊版憨豆,高情商版谢耳朵。']
..........(省略)
..........(省略)
[Finished in 1.0s]
总结
通过以上测试方法得出个结论:
-
多进程:密集CPU任务,需要充分使用服务器多核CPU资源,计算大量的并发请求时候,推荐 multiprocessing (多进程)
缺陷:多个进程之间通信成本高,切换开销大。 -
多线程:密集I/O任务(网络I/O,磁盘I/O,数据库I/O)爬虫比较合适多线程。推荐 threading.Thread、multiprocessing.dummy (多线程)
缺陷:同一个时间切片只能运行一个线程,不能做到高并行,但是可以做到高并发。
想要追求更有效率,多进程加异步速度会很快
下次更新各种细节与 Python 高级用法