并发下载
一.queue的实现
from queue import Queue
queue_object=Queue()
for i in range(4):
queue_object.put(i)
while not queue_object.empty():
print(queue_object.get())
0
1
2
3
from queue import LifoQueue
lifo_queue=LifoQueue()
for i in range(4):
lifo_queue.put(i)
while not lifo_queue.empty():
print(lifo_queue.get())
3
2
1
0
from queue import PriorityQueue
class Job(object):
def __init__(self,level,description):
self.level=level
self.description=description
return
def __lt__(self,other):
return self.level<other.level
priority_queue=PriorityQueue()
priority_queue.put(Job(5,"中级别工作"))
priority_queue.put(Job(10,"低级别工作"))
priority_queue.put(Job(1,"重要工作"))
while not priority_queue.empty():
next_job=priority_queue.get()
print("开始工作:",next_job.description)
开始工作: 重要工作
开始工作: 中级别工作
开始工作: 低级别工作
二.三种技术采集和解析数据对比
1.单线程实现
步骤:构建网址—>访问网页并获取源代码—>解析源代码—>转成JSON格式—>存储在本地文件
from lxml import etree
import requests
import json
# 访问网页的请求头
headers={
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36",
"Accept-Language":"zh-CN,zh;q=0.8"
}
# 存储解析后数据的本地文件
local_file=open("duanzi.json","a")
# 解析 html字符串,获取需要信息
def parse_html(html):
text=etree.HTML(html)
# 返回所有段子的结点位置
# contains模糊查询,第一个参数是要匹配的标签,第二个参数是标签名的部分内容
node_list=text.xpath('//li[contains(@id,"qiushi_tag")]')
for node in node_list:
# 获取用户名
username=node.xpath('.//span[@class="recmd-name"]/text()')
#图片链接
image=node.xpath('.//img/@src')[0]
# 段子内容
content=node.xpath('.//a[@class="recmd-content"]')[0].text
#点赞
like=node.xpath('.//div[@class="recmd-num"]/span')[0].text
# 评论
try:
comments=node.xpath('.//div[@class="recmd-num"]/span')[3].text
except IndexError:
comments=0
items={
"username":username,
"image":image,
"content":content,
"like":like,
"comments":comments
}
local_file.write(json.dumps(items,ensure_ascii=False)+"
")
def main():
# 获取1-10页的网页源代码解析
for page in range(1,11):
url="https://www.qiushibaike.com/8hr/page/"+str(page)+"/"
# 爬取网页源代码
html=requests.get(url,headers=headers).text
parse_html(html)
main()
2.多线程实现
从单线程爬虫的流程可以看出,全部过程只是用了一个线程,先爬取一个网页,对网页内容进行解析,然后存储,完成整套操作后再开始爬取下一个网页,每个网页依次进行,效率非常慢
爬虫的流程简要步骤如下:
- 使用一个队列pageQueue保存要访问的网页页码
- 同时启动多个采集线程,每个线程都从网页页码队列pageQueue中取出一个要访问的页码,构建网址,访问网址并爬取数据.操作完一个网页后再从网页页码队列中取出下一个页码,依次进行,直到所有的页码都已访问完毕.所有的采集线程保存在列表threadCrawls中
- 使用一个队列dataQueue来保存所有的网页源代码,每个线程获取到的数据都放入该队列中
- 同时启动多个解析线程,每个线程都从网页源代码队列dataQueue中取出一个网页源代码,并进行解析,获取想要的数据,并转化为JSON格式.解析完成后再取出下一个网页源代码,依次进行,直到所有的源代码都已被取出.将所有的解析线程存储在列表threadParses中
- 将解析得到的JSON数据存储在本地文件duanzi.json中
from IPython.display import Image
Image(filename="./data/thread.png",width=500)
创建一个ThreadCrawl类,继承自threading.Thread类,用于采集网页信息
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author LQ6H
import requests
import threading
from lxml import etree
import json
from queue import Queue
# 采集网页页码队列是否为空的信号
CRAWL_EXIT=False
class ThreadCrawl(threading.Thread):
def __init__(self,threadName,pageQueue,dataQueue):
threading.Thread.__init__(self)
# 线程名
self.threadName=threadName
# 页码队列
self.pageQueue=pageQueue
# 数据队列
self.dataQueue=dataQueue
self.headers="{'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'}"
def run(self):
print("启动:"+self.threadName)
while not CRAWL_EXIT:
try:
# 从dataQueue中取出1一个页码数字,先进先出
# 可选参数block,默认值是True
# 如果队列为空,block为True,会进入阻塞状态,直到队列有新的数据
# 如果队列为空,block为False,会弹出一个Queue.empty()异常
page=self.pageQueue.get(False)
# 构建网页的URL地址
url="https://www.qiushibaike.com/8hr/page/"+str(page)+"/"
content=requests.get(url,headers=self.headers).text
# 将爬取到的网页源代码放入dataQueue队列中
self.dataQueue.put(content)
except:
pass
print("结束:"+self.threadName)
首先定义一个全局变量CRAWL_EXIT,用于标识pageQueue队列是否为空.当pageQueue不玩空时,线程继续爬取下一个页码;当pageQueue为空时,表明所有的网页都已被爬取完毕,线程就可以退出
队列pageQueue是线程安全的,使用队列来调度线程,保证了每个线程采集的网页地址不重复
创建一个ThreadParse类,继承自threading.Thread,用于解析网页信息
PARSE_EXIT=False
class ThreadParse(threading.Thread):
def __init__(self,threadName,dataQueue,localFile,lock):
super(ThreadParse,self).__init__()
# 线程名
self.threadName=threadName
# 数据队列
self.dataQueue=dataQueue
# 保存解析后数据的文件名
self.localFile=localFile
# 互斥锁
self.lock=lock
def run(self):
print("启动:"+self.threadName)
while not PARSE_EXIT:
try:
html=self.dataQueue.get(False)
self.parse(html)
except:
pass
print("结束:"+self.threadName)
def parse(self,html):
text = etree.HTML(html)
node_list = text.xpath('//li[contains(@id,"qiushi_tag")]')
for node in node_list:
try:
# 获取用户名
username = node.xpath('.//span[@class="recmd-name"]/text()')
# 图片链接
image = node.xpath('.//img/@src')[0]
# 段子内容
content = node.xpath('.//a[@class="recmd-content"]')[0].text
# 点赞
like = node.xpath('.//div[@class="recmd-num"]/span')[0].text
# 评论
try:
comments = node.xpath('.//div[@class="recmd-num"]/span')[3].text
except IndexError:
comments = 0
items = {
"username": username,
"image": image,
"content": content,
"like": like,
"comments": comments
}
# with后面有两个必须执行的操作:__enter__和__exit__,打开和关闭
# 不管里面的操作如何,都会直接打开和关闭功能
# 打开锁,向文件添加内容,释放锁
with self.lock:
# 写入解析后的数据
self.localFile.write(json.dumps(items,ensure_ascii=False)+"
")
except:
pass
在多线程开发中,为了维护资源的完整性,在访问共享资源时使用共享锁lock.线程获得了锁之后,才可以访问文件localFile,并往里写入数据,写入完毕后,将锁释放,其他线程就可以访问这个文件.同一时刻,只允许一个协程访问该文件
def main():
页码队列,存储10个页码,先进先出
pageQueue=Queue(10)
for i in range(1,11):
pageQueue.put(i)
# 采集结果(网页的HTML源代码)的数据队列,参数为空表示不限制
dataQueue=Queue()
# 以追加的方式打开本地文件
localFile=open("duanzi.json","wb+")
# 互斥锁
lock=threading.Lock()
# 3个采集线程的名字
crawlList=["采集线程1号","采集线程2号","采集线程3号"]
# 创建,启动和存储3个采集线程
threadCrawls=[]
for threadName in crawlList:
thread=ThreadCrawl(threadName,pageQueue,dataQueue)
thread.start()
threadCrawls.append(thread)
# 3个解析线程的名字
parseList=["解析线程1号","解析线程2号","解析线程3号"]
# 创建,启动和存储3个解析线程
threadParses=[]
for threadName in parseList:
thread=ThreadParse(threadName,dataQueue,localFile,lock)
thread.start()
threadParses.append(thread)
while not pageQueue.empty():
pass
# 如果pageQueue为空,采集线程退出循环
global CRAWL_EXIT
CRAWL_EXIT=True
print("pageQueue为空
")
for thread in threadCrawls:
# 阻塞子线程
thread.join()
while not dataQueue.empty():
pass
print("dataQueue为空")
global PARSE_EXIT
PARSE_EXIT=True
for thread in threadParses:
thread.join()
with lock:
# 关闭文件,在关闭之前,内容都在内存里
localFile.close()
if name==“main”:
main()
3.协程实现
在上面实现的多线程爬虫中,分别开启了3个采集线程爬取网页和3个解析线程来解析网页,提供了程序执行的效率.但是,线程是交由CPU调度的,每个时间片段中只能有一个线程执行.而协程是在一个线程内部执行,一旦遇到了网络I/O阻塞,它就会立刻切换到另一个协程中执行,通过不断的轮询,降低了爬取网页的时间.对于爬虫而言,协程和多线程在效率上没有很大的不同
使用协程来实现爬虫,具体步骤如下:
- 定义一个负责爬虫的类,所有的爬虫工作完全交由该类负责
- 使用一个队列data_queue保存所有的数据
- 创建多个协程任务,每个协程都会使用页码构建完整的网址,访问网址爬取和提取有用的数据,并保存到数据队列中,直到所有网页中的数据提取出来
- 将data_queue队列中的数据全部提取出来,保存到本地文件duanzi.txt中
#!/usr/bin/env python3
# -*- coding:utf-8 -*-
# Author LQ6H
import requests
from queue import Queue
import time
from lxml import etree
import gevent
class Spider(object):
def __init__(self):
self.headers={
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36"
}
self.base_url="https://www.qiushibaike.com/8hr/page/"
# 创建保存数据的队列
self.data_queue=Queue()
# 统计数量
self.count=0
def send_request(self,url):
print("[INFO]:正在爬取"+url)
html=requests.get(url,headers=self.headers).content
# 每次请求间隔1s
time.sleep(1)
self.parse_page(html)
def parse_page(self,html):
html_ogj=etree.HTML(html)
node_list=html_ogj.xpath('//li[contains(@id,"qiushi_tag")]')
for node in node_list:
try:
# 获取用户名
username = node.xpath('.//span[@class="recmd-name"]/text()')
# 图片链接
image = node.xpath('.//img/@src')[0]
# 段子内容
content = node.xpath('.//a[@class="recmd-content"]')[0].text
# 点赞
like = node.xpath('.//div[@class="recmd-num"]/span')[0].text
# 评论
try:
comments = node.xpath('.//div[@class="recmd-num"]/span')[3].text
except IndexError:
comments = 0
items = {
"username": username,
"image": image,
"content": content,
"like": like,
"comments": comments
}
self.count+=1
self.data_queue.put(items)
except:
pass
def start_work(self):
job_list=[]
for page in range(1,11):
# 构建一个协程任务对象
url=self.base_url+str(page)+"/"
job=gevent.spawn(self.send_request,url)
# 保存所有的协程任务
job_list.append(job)
# joinall()接收一个列表,将列表中的所有协程任务添加到任务队列里执行
gevent.joinall(job_list)
local_file=open("duanzi.json","wb+")
while not self.data_queue.empty():
content=self.data_queue.get()
result=str(content).encode("utf-8")
local_file.write(result+b"
")
local_file.close()
print(self.count)
if __name__=="__main__":
spider=Spider()
spider.start_work()
4.性能分析
使用单线程,多线程和协程实现数据的爬取后,通过计算这3种方式下的耗时情况,比较三种爬虫的效率
首先导入time模块,然后计算main()函数执行之后与之前的时间差,或者计算调用start_work()方法之前与调用之后的时间差
计算main()函数执行前后时间差:
if __name__=="__main__":
startTime=time.time()
main()
print(time.time()-startTime)
计算start_work()方法调用前后时间差:
if __name__=="__main__":
spider=Spider()
start=time.time()
spider.start_work()
print("[INFO]:Using time%f secend"%(time.time()-start))