背景
有时,我期望能够自动批量下载收藏的一些图片或视频。在 MacOS 上, you-get 可用于下载, xargs 命令可以提供批量的功能。那么,需要能够自动抓取图片、视频等资源链接地址的小工具。
在 “批量下载网站图片的Python实用小工具(下)” 中,编写了一个可以用于抓取和下载图片资源的小工具。本文基于这个小工具,做一点改造,来实现资源链接地址的方便抓取。
设计
资源链接规则
要实现资源链接的抓取,首先要定义资源链接规则。常见的资源链接的标签有 a, img , video 。进一步,可以通过 id, class 来精确定位所需的资源。
资源链接参数应当尽可能使用友好。可以采用 img=jpg,png;class=*;id=xyz 来定义资源的规则。在内部,会转换成 [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}] 的更灵活的 JSON 形式。这里是或关系,也就是可以同时抓取符合多个规则中任一个的资源地址。
很多用户很可能根本不知道资源链接规则如何定义。因此,这里可以提供一个默认选项。也就是不指定该规则的话,就默认抓取 img = png or jpg 以及 a 的 href 链接。
资源链接规则参数的形式转换可以查看 res.py 的 parseRulesParam 方法。
基础组件
需要能够抓取网页内容、定位网页元素的基础组件。为了提升速度,还需要一个并发组件。基础组件都放在包 common 下面。
-
可以使用 requests 库来抓取网页内容,见 net.py 的 getHTMLContentFromUrl 方法。不过,有些网页是动态加载的,需要等待动态加载完成才能够抓取生成的内容。这种情况下,可以使用 selenium + chromedriver 来获取网页内容。见 net.py 的 getHTMLContentAsync 方法。可以基于这两个方法做一层策略包装,见 net.py 的 getHTMLContent 方法。
-
可以使用 BeautifulSoup 来定位资源链接元素。见 res.py 的 findWantedLinks 方法。
-
定义一个 IoTaskThreadPool 来并发抓取网页内容,亦可用于并发下载资源。见 multitasks.py 的 IoTaskThreadPool 类。
-
使用装饰器来捕获异常。见 common.py 的 catchExc 包装器。
小技巧
在编写基础库时,如果需要一些配置项,用参数传递的方式会比较困难,或者导致代码不太简洁。此时,可以把函数包装成类,在类的实例化参数中传入。见 net.py 的 HTMLGrasper 类。
用法
运行前置条件
需要安装 Python3 环境及 bs4 , requests, selenium 包及 pip3, chromedriver 工具。自行网搜下哈。
brew install python3
sudo easy_install pip
pip3 install requests bs4 selenium -i https://pypi.doubanio.com/simple
安装问题:
- 在 chromedriver download 下载 chromedriver.zip 并解压后,将可执行的驱动程序复制到 /usr/local/bin/ 目录下,这样就不会报权限相关问题了。
命令使用
先使用如下命令获取资源,并写入到指定结果资源文件 reslinks.txt 中。
python3 tools/res.py -u https://space.bilibili.com/183260251/favlist -r 'class=*'
然后使用如下命令来去重并下载资源。
grep 'pattern' reslinks.txt | sort | uniq | xargs -I {} you-get {}
以上两个命令可以联合起来使用。
B 站视频
python3 tools/res.py -u 'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create'
python3 tools/res.py -u 'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create' -r 'class=*' | grep 'video' | sort | uniq | xargs -I {} you-get {}
黑光图集
python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167521.html'
python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167521.html' -r 'img=jpg!c' | sort | uniq | xargs -I {} you-get {}
源代码
包结构
具体可以下载工程: Pystudy Github。如果要修改 common 包下的方法,可以切换到 pystudy 目录下执行 sh install.sh 安装新修改后的包,然后再执行 res.py 脚本。
pystudy
|-- common
|-- __init.py__
|-- common.py
|-- multitasks.py
|-- net.py
|-- tools
|-- res.py
|-- install.sh
|-- setup.py
|-- __init.py__
res.py
#!/usr/bin/python3
#_*_encoding:utf-8_*_
import re
import sys
import json
import argparse
from bs4 import BeautifulSoup
from common.net import *
from common.multitasks import *
SaveResLinksFile = '/Users/qinshu/joy/reslinks.txt'
serverDomain = ''
def parseArgs():
description = '''This program is used to batch download resources from specified urls.
eg. python3 res.py -u http://xxx.html -r 'img=jpg,png;class=resLink;id=xyz'
will search resource links from network urls http://xxx.html by specified rules
img = jpg or png OR class = resLink OR id = xyz [ multiple rules ]
python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167480.html' -r 'img=jpg!c'
for <img src="xxx.jpg!c"/>
'''
parser = argparse.ArgumentParser(description=description)
parser.add_argument('-u','--url', nargs='+', help='At least one html urls are required', required=True)
parser.add_argument('-r','--rulepath', nargs=1, help='rules to search resources. if not given, search a hrefs or img resources in given urls', required=False)
args = parser.parse_args()
init_urls = args.url
rulepath = args.rulepath
return (init_urls, rulepath)
def getAbsLink(serverDomain, link):
try:
href = link.attrs['href']
if href.startswith('//'):
return 'https:' + href
if href.startswith('/'):
return serverDomain + href
else:
return href
except:
return ''
def getTrueResLink(reslink):
global serverDomain
try:
href = reslink.attrs['src']
if href.startswith('//'):
return 'http:' + href
if href.startswith('/'):
href = serverDomain + href
return href
pos = href.find('jpg@')
if pos == -1:
return href
return href[0: pos+3]
except:
return ''
def batchGetResTrueLink(resLinks):
hrefs = map(getTrueResLink, resLinks)
return filter(lambda x: x != '', hrefs)
resTags = set(['img', 'video'])
def findWantedLinks(htmlcontent, rule):
'''
find html links or res links from html by rule.
sub rules such as:
(1) a link with id=[value1,value2,...]
(2) a link with class=[value1,value2,...]
(3) res with src=xxx.jpg|png|mp4|...
a rule is map containing sub rule such as:
{ 'id': [id1, id2, ..., idn] } or
{ 'class': [c1, c2, ..., cn] } or
{ 'img': ['jpg', 'png', ... ]} or
{ 'video': ['mp4', ...]}
'''
#print("html===
"+htmlcontent+"
===End")
#print("rule===
"+str(rule)+"
===End")
soup = BeautifulSoup(htmlcontent, "lxml")
alinks = []
reslinks = []
for (key, values) in rule.items():
if key == 'id':
for id in values:
links = soup.find_all('a', id=id)
links = map(getTrueResLink, links)
links = filter(lambda x: x != '', links)
alinks.extend(links)
elif key == 'class':
for cls in values:
if cls == '*':
links = soup.find_all('a')
else:
links = soup.find_all('a', class_=cls)
links = map(lambda link: getAbsLink(serverDomain, link), links)
links = filter(lambda x: validate(x), links)
alinks.extend(links)
elif key in resTags:
for resSuffix in values:
reslinks.extend(soup.find_all(key, src=re.compile(resSuffix)))
allLinks = []
allLinks.extend(alinks)
allLinks.extend(batchGetResTrueLink(reslinks))
return allLinks
def validate(link):
validSuffix = ['png', 'jpg', 'jpeg', 'mp4']
for suf in validSuffix:
if link.endswith(suf):
return True
if link == '':
return False
if link.endswith('html'):
return False
if 'javascript' in link:
return False
return True
def batchGetLinksByRule(htmlcontentList, rules):
'''
find all res links from html content list by rules
'''
links = []
for htmlcontent in htmlcontentList:
for rule in rules:
links.extend(findWantedLinks(htmlcontent, rule))
return links
def batchGetLinks(urls, rules):
conf = {"async":1, "targetIdWhenAsync": "page-fav", "sleepWhenAsync": 10}
grasper = HTMLGrasper(conf)
htmlcontentList = grasper.batchGrapHtmlContents(urls)
allLinks = batchGetLinksByRule(htmlcontentList, rules)
with open(SaveResLinksFile, 'w') as f:
for link in allLinks:
print(link)
f.write(link + "
")
def parseRulesParam(rulesParam):
'''
parse rules params to rules json
eg. img=jpg,png;class=resLink;id=xyz to
[{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}]
'''
defaultRules = [{'img': ['jpg','png','jpeg']},{"class":"*"}]
if rulesParam:
try:
rules = []
rulesStrArr = rulesParam[0].split(";")
for ruleStr in rulesStrArr:
ruleArr = ruleStr.split("=")
key = ruleArr[0]
value = ruleArr[1].split(",")
rules.append({key: value})
return rules
except ValueError as e:
print('Param Error: invalid rulepath %s %s' % (rulepathjson, e))
sys.exit(1)
return defaultRules
def parseServerDomain(url):
parts = url.split('/', 3)
return parts[0] + '//' + parts[2]
def testBatchGetLinks():
urls = ['http://dp.pconline.com.cn/list/all_t145.html']
rules = [{"img":["jpg"], "video":["mp4"]}]
batchGetLinks(urls, rules)
if __name__ == '__main__':
#testBatchGetLinks()
(init_urls, rulesParam) = parseArgs()
print('init urls: %s' % "
".join(init_urls))
rulepath = parseRulesParam(rulesParam)
serverDomain = parseServerDomain(init_urls[0])
print('rulepath: %s
serverDomain:%s' % (rulepath, serverDomain))
batchGetLinks(init_urls, rulepath)
common.py
import os
def createDir(dirName):
if not os.path.exists(dirName):
os.makedirs(dirName)
def catchExc(func):
def _deco(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
print ("error catch exception for %s (%s, %s): %s" % (func.__name__, str(*args), str(**kwargs), e))
return None
return _deco
multitasks.py
from multiprocessing import (cpu_count, Pool)
from multiprocessing.dummy import Pool as ThreadPool
ncpus = cpu_count()
def divideNParts(total, N):
'''
divide [0, total) into N parts:
return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)]
'''
each = total / N
parts = []
for index in range(N):
begin = index * each
if index == N - 1:
end = total
else:
end = begin + each
parts.append((begin, end))
return parts
class IoTaskThreadPool(object):
'''
thread pool for io operations
'''
def __init__(self, poolsize):
self.ioPool = ThreadPool(poolsize)
def exec(self, ioFunc, ioParams):
if not ioParams or len(ioParams) == 0:
return []
return self.ioPool.map(ioFunc, ioParams)
def execAsync(self, ioFunc, ioParams):
if not ioParams or len(ioParams) == 0:
return []
self.ioPool.map_async(ioFunc, ioParams)
def close(self):
self.ioPool.close()
def join(self):
self.ioPool.join()
net.py
import requests
import time
from bs4 import BeautifulSoup
from common.common import catchExc
from common.multitasks import IoTaskThreadPool
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
delayForHttpReq = 0.5 # 500ms
class HTMLGrasper(object):
def __init__(self, conf):
'''
抓取 HTML 网页内容时的配置项
_async: 是否异步加载网页。 _async = 1 当网页内容是动态生成时,异步加载网页;
targetIdWhenAsync: 当 _async = 1 指定。
由于此时会加载到很多噪音内容,需要指定 ID 来精确获取所需的内容部分
sleepWhenAsync: 当 _async = 1 指定。
异步加载网页时需要等待的秒数
'''
self._async = conf.get('async', 0)
self.targetIdWhenAsync = conf.get('targetIdWhenAsync', '')
self.sleepWhenAsync = conf.get('sleepWhenAsync', 10)
def batchGrapHtmlContents(self, urls):
'''
batch get the html contents of urls
'''
grapHtmlPool = IoTaskThreadPool(20)
return grapHtmlPool.exec(self.getHTMLContent, urls)
def getHTMLContent(self, url):
if self._async == 1:
htmlContent = self.getHTMLContentAsync(url)
if htmlContent is not None and htmlContent != '':
html = '<html><head></head><body>' + htmlContent + '</body></html>'
return html
return self.getHTMLContentFromUrl(url)
def getHTMLContentAsync(self, url):
'''
get html content from dynamic loaed html url
'''
chrome_options = Options()
chrome_options.add_argument('--headless')
chrome_options.add_argument('--disable-gpu')
driver = webdriver.Chrome(chrome_options=chrome_options)
driver.get(url)
time.sleep(self.sleepWhenAsync)
try:
elem = driver.find_element_by_id(self.targetIdWhenAsync)
except:
elem = driver.find_element_by_xpath('/html/body')
return elem.get_attribute('innerHTML')
def getHTMLContentFromUrl(self, url):
'''
get html content from html url
'''
r = requests.get(url)
status = r.status_code
if status != 200:
return ''
return r.text
setup.py
from distutils.core import setup
setup(
name = "pystudy" ,
version = "1.0" ,
description = "Python Study" ,
author = " shuqin " ,
author_email = " shuqin_1984@163.com ",
url = " https://github.com/shuqin/pystudy " ,
license = " LGPL " ,
packages = ['common']
)
install.sh
python3 setup.py build
python3 setup.py sdist
python3 setup.py install