zoukankan      html  css  js  c++  java
  • 抓取图片视频等资源链接地址的Python小工具

    背景

    有时,我期望能够自动批量下载收藏的一些图片或视频。在 MacOS 上, you-get 可用于下载, xargs 命令可以提供批量的功能。那么,需要能够自动抓取图片、视频等资源链接地址的小工具。

    “批量下载网站图片的Python实用小工具(下)” 中,编写了一个可以用于抓取和下载图片资源的小工具。本文基于这个小工具,做一点改造,来实现资源链接地址的方便抓取。


    设计

    资源链接规则

    要实现资源链接的抓取,首先要定义资源链接规则。常见的资源链接的标签有 a, img , video 。进一步,可以通过 id, class 来精确定位所需的资源。

    资源链接参数应当尽可能使用友好。可以采用 img=jpg,png;class=*;id=xyz 来定义资源的规则。在内部,会转换成 [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}] 的更灵活的 JSON 形式。这里是或关系,也就是可以同时抓取符合多个规则中任一个的资源地址。

    很多用户很可能根本不知道资源链接规则如何定义。因此,这里可以提供一个默认选项。也就是不指定该规则的话,就默认抓取 img = png or jpg 以及 a 的 href 链接。

    资源链接规则参数的形式转换可以查看 res.py 的 parseRulesParam 方法。


    基础组件

    需要能够抓取网页内容、定位网页元素的基础组件。为了提升速度,还需要一个并发组件。基础组件都放在包 common 下面。

    • 可以使用 requests 库来抓取网页内容,见 net.py 的 getHTMLContentFromUrl 方法。不过,有些网页是动态加载的,需要等待动态加载完成才能够抓取生成的内容。这种情况下,可以使用 selenium + chromedriver 来获取网页内容。见 net.py 的 getHTMLContentAsync 方法。可以基于这两个方法做一层策略包装,见 net.py 的 getHTMLContent 方法。

    • 可以使用 BeautifulSoup 来定位资源链接元素。见 res.py 的 findWantedLinks 方法。

    • 定义一个 IoTaskThreadPool 来并发抓取网页内容,亦可用于并发下载资源。见 multitasks.py 的 IoTaskThreadPool 类。

    • 使用装饰器来捕获异常。见 common.py 的 catchExc 包装器。


    小技巧

    在编写基础库时,如果需要一些配置项,用参数传递的方式会比较困难,或者导致代码不太简洁。此时,可以把函数包装成类,在类的实例化参数中传入。见 net.py 的 HTMLGrasper 类。


    用法

    运行前置条件

    需要安装 Python3 环境及 bs4 , requests, selenium 包及 pip3, chromedriver 工具。自行网搜下哈。

    brew install python3
    sudo easy_install pip
    pip3 install requests bs4 selenium   -i  https://pypi.doubanio.com/simple
    

    安装问题:

    • chromedriver download 下载 chromedriver.zip 并解压后,将可执行的驱动程序复制到 /usr/local/bin/ 目录下,这样就不会报权限相关问题了。

    命令使用

    先使用如下命令获取资源,并写入到指定结果资源文件 reslinks.txt 中。

    
    python3 tools/res.py -u https://space.bilibili.com/183260251/favlist -r 'class=*'
    
    

    然后使用如下命令来去重并下载资源。

    
    grep 'pattern' reslinks.txt | sort | uniq | xargs -I {} you-get {}
    
    

    以上两个命令可以联合起来使用。


    B 站视频

    python3 tools/res.py -u 'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create'
    python3 tools/res.py -u 'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create' -r 'class=*' | grep 'video' | sort | uniq | xargs -I {} you-get {} 
    

    黑光图集

    python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167521.html'
    python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167521.html' -r 'img=jpg!c' | sort | uniq | xargs -I {} you-get {}
    

    源代码

    包结构

    具体可以下载工程: Pystudy Github。如果要修改 common 包下的方法,可以切换到 pystudy 目录下执行 sh install.sh 安装新修改后的包,然后再执行 res.py 脚本。

    pystudy
       |-- common
                |-- __init.py__
                |-- common.py
                |-- multitasks.py
                |-- net.py
       |-- tools
               |-- res.py
       |-- install.sh
       |-- setup.py
       |-- __init.py__
    

    res.py

    #!/usr/bin/python3
    #_*_encoding:utf-8_*_
    
    import re
    import sys
    import json
    
    import argparse
    from bs4 import BeautifulSoup
    from common.net import *
    from common.multitasks import *
    
    SaveResLinksFile = '/Users/qinshu/joy/reslinks.txt'
    serverDomain = ''
    
    def parseArgs():
        description = '''This program is used to batch download resources from specified urls.
                         eg. python3 res.py -u http://xxx.html -r 'img=jpg,png;class=resLink;id=xyz'
                         will search resource links from network urls http://xxx.html  by specified rules
                         img = jpg or png OR class = resLink OR id = xyz [ multiple rules ]
    
                         python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167480.html' -r 'img=jpg!c'
                         for <img src="xxx.jpg!c"/> 
                      '''
        parser = argparse.ArgumentParser(description=description)
        parser.add_argument('-u','--url', nargs='+', help='At least one html urls are required', required=True)
        parser.add_argument('-r','--rulepath', nargs=1, help='rules to search resources. if not given, search a hrefs or img resources in given urls', required=False)
        args = parser.parse_args()
        init_urls = args.url
        rulepath = args.rulepath
        return (init_urls, rulepath)
    
    def getAbsLink(serverDomain, link):
    
        try:
            href = link.attrs['href']
            if href.startswith('//'):
                return 'https:' + href
            if href.startswith('/'):
                return serverDomain + href
            else:
                return href
        except:
            return ''
    
    def getTrueResLink(reslink):
        global serverDomain
        try:
            href = reslink.attrs['src']
            if href.startswith('//'):
                return 'http:' + href 
            if href.startswith('/'):
                href = serverDomain + href
                return href
            pos = href.find('jpg@')
            if pos == -1:
                return href
            return href[0: pos+3]
        except:
            return ''
    
    def batchGetResTrueLink(resLinks):
        hrefs = map(getTrueResLink, resLinks)
        return filter(lambda x: x != '', hrefs)
    
    resTags = set(['img', 'video'])
    
    def findWantedLinks(htmlcontent, rule):
        '''
           find html links or res links from html by rule.
           sub rules such as:
              (1) a link with id=[value1,value2,...]
              (2) a link with class=[value1,value2,...]
              (3) res with src=xxx.jpg|png|mp4|...
           a rule is map containing sub rule such as:
              { 'id': [id1, id2, ..., idn] } or
              { 'class': [c1, c2, ..., cn] } or
              { 'img': ['jpg', 'png', ... ]} or
              { 'video': ['mp4', ...]}
    
        '''
    
        #print("html===
    "+htmlcontent+"
    ===End")
        #print("rule===
    "+str(rule)+"
    ===End")
    
        soup = BeautifulSoup(htmlcontent, "lxml")
        alinks = []
        reslinks = []
    
        for (key, values) in rule.items():
            if key == 'id':
                for id in values:
                    links = soup.find_all('a', id=id)
                    links = map(getTrueResLink, links)
                    links = filter(lambda x: x != '', links)
                    alinks.extend(links)
            elif key == 'class':
                for cls in values:
                    if cls == '*':
                        links = soup.find_all('a')
                    else:
                        links = soup.find_all('a', class_=cls)
                    links = map(lambda link: getAbsLink(serverDomain, link), links)
                    links = filter(lambda x: validate(x), links)
                    alinks.extend(links)
            elif key in resTags:
                for resSuffix in values:
                    reslinks.extend(soup.find_all(key, src=re.compile(resSuffix)))
    
        allLinks = []
        allLinks.extend(alinks)
        allLinks.extend(batchGetResTrueLink(reslinks))
        return allLinks
    
    def validate(link):
    
        validSuffix = ['png', 'jpg', 'jpeg', 'mp4']
    
        for suf in validSuffix:
            if link.endswith(suf):
                return True
        if link == '':
            return False
        if link.endswith('html'):
            return False
        if 'javascript' in link:
            return False    
        return True    
    
    def batchGetLinksByRule(htmlcontentList, rules):
        '''
           find all res links from html content list by rules
        '''
    
        links = []
        for htmlcontent in htmlcontentList:
            for rule in rules:
                links.extend(findWantedLinks(htmlcontent, rule))
        return links
    
    def batchGetLinks(urls, rules):
        conf = {"async":1, "targetIdWhenAsync": "page-fav", "sleepWhenAsync": 10}
        grasper = HTMLGrasper(conf)
        htmlcontentList = grasper.batchGrapHtmlContents(urls)
        allLinks = batchGetLinksByRule(htmlcontentList, rules)
        with open(SaveResLinksFile, 'w') as f:
            for link in allLinks:
                print(link)
                f.write(link + "
    ")
    
    def parseRulesParam(rulesParam):
        '''
           parse rules params to rules json
           eg. img=jpg,png;class=resLink;id=xyz to
               [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}]
        '''
        defaultRules = [{'img': ['jpg','png','jpeg']},{"class":"*"}]
        if rulesParam:
            try:
                rules = []
                rulesStrArr = rulesParam[0].split(";")
                for ruleStr in rulesStrArr:
                    ruleArr = ruleStr.split("=")
                    key = ruleArr[0]
                    value = ruleArr[1].split(",")
                    rules.append({key: value})
                return rules
            except ValueError as e:
                print('Param Error: invalid rulepath %s %s' % (rulepathjson, e))
                sys.exit(1)
        return defaultRules
    
    def parseServerDomain(url):
        parts = url.split('/', 3)
        return parts[0] + '//' + parts[2]
    
    def testBatchGetLinks():
        urls = ['http://dp.pconline.com.cn/list/all_t145.html']
        rules = [{"img":["jpg"], "video":["mp4"]}]
    
        batchGetLinks(urls, rules)
    
    if __name__ == '__main__':
    
        #testBatchGetLinks()
    
        (init_urls, rulesParam) = parseArgs()
        print('init urls: %s' % "
    ".join(init_urls))
    
        rulepath = parseRulesParam(rulesParam)
        serverDomain = parseServerDomain(init_urls[0])
        print('rulepath: %s
     serverDomain:%s' % (rulepath, serverDomain))
    
        batchGetLinks(init_urls, rulepath)
    
    

    common.py

    import os
    
    def createDir(dirName):
        if not os.path.exists(dirName):
            os.makedirs(dirName)
    
    def catchExc(func):
        def _deco(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                print ("error catch exception for %s (%s, %s): %s" % (func.__name__, str(*args), str(**kwargs), e))
                return None
        return _deco
    

    multitasks.py

    from multiprocessing import (cpu_count, Pool)
    from multiprocessing.dummy import Pool as ThreadPool
    
    ncpus = cpu_count()
    
    def divideNParts(total, N):
        '''
           divide [0, total) into N parts:
            return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)]
        '''
    
        each = total / N
        parts = []
        for index in range(N):
            begin = index * each
            if index == N - 1:
                end = total
            else:
                end = begin + each
            parts.append((begin, end))
        return parts
    
    class IoTaskThreadPool(object):
        '''
           thread pool for io operations
        '''
        def __init__(self, poolsize):
            self.ioPool = ThreadPool(poolsize)
    
        def exec(self, ioFunc, ioParams):
            if not ioParams or len(ioParams) == 0:
                return []
            return self.ioPool.map(ioFunc, ioParams)
    
        def execAsync(self, ioFunc, ioParams):
            if not ioParams or len(ioParams) == 0:
                return []
            self.ioPool.map_async(ioFunc, ioParams)
    
        def close(self):
            self.ioPool.close()
    
        def join(self):
            self.ioPool.join()
    

    net.py

    import requests
    import time
    from bs4 import BeautifulSoup
    from common.common import catchExc
    from common.multitasks import IoTaskThreadPool
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    
    delayForHttpReq = 0.5 # 500ms
    
    class HTMLGrasper(object):
    
        def __init__(self, conf):
            '''
            抓取 HTML 网页内容时的配置项
              _async: 是否异步加载网页。 _async = 1 当网页内容是动态生成时,异步加载网页; 
              targetIdWhenAsync: 当 _async = 1 指定。
                 由于此时会加载到很多噪音内容,需要指定 ID 来精确获取所需的内容部分
              sleepWhenAsync:  当 _async = 1 指定。
                 异步加载网页时需要等待的秒数  
            '''
            self._async = conf.get('async', 0)
            self.targetIdWhenAsync = conf.get('targetIdWhenAsync', '')
            self.sleepWhenAsync = conf.get('sleepWhenAsync', 10)
    
        def batchGrapHtmlContents(self, urls):
            '''
               batch get the html contents of urls
            '''
            grapHtmlPool = IoTaskThreadPool(20)
            return grapHtmlPool.exec(self.getHTMLContent, urls)
    
        def getHTMLContent(self, url):
            if self._async == 1:
                htmlContent = self.getHTMLContentAsync(url)
    
                if htmlContent is not None and htmlContent != '':
                    html = '<html><head></head><body>' + htmlContent + '</body></html>'
                    return html
    
            return self.getHTMLContentFromUrl(url)
    
        def getHTMLContentAsync(self, url):
            '''
               get html content from dynamic loaed html url
            '''
    
            chrome_options = Options()
            chrome_options.add_argument('--headless')
            chrome_options.add_argument('--disable-gpu')
            driver = webdriver.Chrome(chrome_options=chrome_options)
            driver.get(url)
            time.sleep(self.sleepWhenAsync)
    
            try:
                elem = driver.find_element_by_id(self.targetIdWhenAsync)
            except:
                elem = driver.find_element_by_xpath('/html/body')
    
            return elem.get_attribute('innerHTML')       
    
        def getHTMLContentFromUrl(self, url):
            '''
               get html content from html url
            '''
            r = requests.get(url)
            status = r.status_code
            if status != 200:
                return ''
            return r.text
    

    setup.py

    from distutils.core import setup
    
    setup(
           name = "pystudy" ,
           version = "1.0" ,
           description = "Python Study" ,
           author = " shuqin " ,
           author_email = " shuqin_1984@163.com ",
           url = " https://github.com/shuqin/pystudy " ,
           license = " LGPL " ,
           packages = ['common']
           )
    

    install.sh

    python3 setup.py build
    python3 setup.py sdist
    python3 setup.py install
    

  • 相关阅读:
    关于body添加position:fixed会返回到网页顶部的问题
    微信个人信息的接口
    无缝滚动+定时器
    判断各种浏览器的内核和 移动端的类型
    laery.js提示层结束之后自动跳转到新页面
    支持火狐的文本超出隐藏以省略号显示
    多个列表同时进行倒计时
    记录制作人生第一把分体式键盘 ^.^
    NoSQLBooster for mac 破解(win同理)
    手机对应归属地规则
  • 原文地址:https://www.cnblogs.com/lovesqcc/p/13766522.html
Copyright © 2011-2022 走看看