zoukankan      html  css  js  c++  java
  • 代理爬取

    # coding:utf-8
    # 因为网络上的代理毕竟是有限的,所以希望大家不要滥用
    
    import re
    import requests
    import time
    import pymongo
    import sys
    from bs4 import BeautifulSoup
    from multiprocessing.dummy import Pool
    
    client = pymongo.MongoClient("localhost", 27017)
    proxy = client['proxy']
    proxy_pool = proxy['proxy_pool']
    proxy_pool.ensure_index('ip_port', unique=True)  # 如果有重复的ip 写进去 会报错
    
    
    class ProxyPool:  # 获取代理ip的类
        def get_soup(self, url):
            resp = requests.get(url)
            if resp.status_code == 200:
                resp.encoding = "utf-8"
                soup = BeautifulSoup(resp.text, "lxml")
                return soup
    
        def get_youdaili(self):
            soup = self.get_soup("http://www.youdaili.net/Daili/")
            a_tag = soup.select("div.newslist_body > ul > li > a")
            for i in a_tag:
                url = i.get('href')
                ip_re = re.compile(r'((d{1,3}.d{1,3}.d{1,3}.d{1,3}:d{2,5})@([a-zA-Z0-9]{4,7}))')
                soup = self.get_soup(url)
                ips = ip_re.findall(soup.text)
                page_tag = soup.select("ul.pagelist > li > a")  # 是否还有第二页
                if page_tag:
                    page = re.search(r"d", page_tag[0].get_text()).group()
                    page = int(page)
                else:
                    page = 1
                if page >= 2:  # 如果有第二页就继续爬取
                    for i in range(2, page + 1):
                        soup_sub = self.get_soup(url[:-5] + "_" + str(i) + ".html")
                        ips += ip_re.findall(soup_sub.text)
                if ips:
                    for i in ips:
                        try:  # 数据库不允许插入相同的ip,如果有相同的,这里将会报错,所以加个try
                            proxy_pool.insert_one({
                                'ip_port': i[1],
                                'protocol': i[2].lower(),  # 协议
                                'update_time': int(time.time())  # 抓取时的时间
                            })
                        except pymongo.errors.DuplicateKeyError as ex:
                            pass
                print(url)
    
    
    class ProxyCheck:
        ip_port_all = [(i['ip_port'], i['protocol']) for i in proxy_pool.find()]  # 查询,获取所有ip
    
        def remove_ip(self, ip_port):  # 如果没能成功响应,将执行次方法,将其响应速度设置为空并且判断存在时间是否超过一周
            ip_data = proxy_pool.find({'ip_port': ip_port})
            proxy_pool.update_one({'ip_port': ip_port}, {'$set': {'speed': None}})
            if int(time.time()) - ip_data[0]['update_time'] > 604800:
                proxy_pool.remove({'ip_port': ip_port})
    
        def get_status(self, ip_port, protocol):
            url = "http://fz.58.com/"
            proxies = {"http": protocol + "://" + ip_port}
            headers = {
                "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36",
            }
            time1 = time.clock()
            try:  # 使用代理常常容易出错
                resp = requests.get(url, headers=headers, proxies=proxies, timeout=6)
            except Exception as ex:
                print(ex)
                return self.remove_ip(ip_port)
            time2 = time.clock()
            time_result = time2 - time1  # 计算响应时间
            if resp.status_code == 200:
                print(ip_port)
                proxy_pool.update_one({"ip_port": ip_port},
                                      {'$set': {'speed': time_result, 'update_time': int(time.time())}})
            else:
                self.remove_ip(ip_port)
    
        def check(self):  # 开启多线程进行检测
            pool = Pool(20)
            for i in self.ip_port_all:
                if i[1] == 'http':
                    pool.apply_async(self.get_status, args=i)
            pool.close()
            pool.join()
    
    
    if __name__ == "__main__":
        if len(sys.argv) > 1:  # 接收第一个参数,第一个参数为脚本运行的间隔时间
            time_sleep = int(sys.argv[1])
        else:
            time_sleep = 60 * 60
        while (True):
            pp = ProxyPool()
            pp.get_youdaili()
            pc = ProxyCheck()
            pc.check()
            time.sleep(time_sleep)
    

    爬取代理2代码:

    # -*- coding=utf8 -*-
    """
        从网上爬取HTTPS代理
    """
    import re
    import sys
    import time
    import Queue
    import logging
    import requests
    import threading
    from pyquery import PyQuery
    import requests.packages.urllib3
    requests.packages.urllib3.disable_warnings()
     
     
    #logging.basicConfig(
    #    level=logging.DEBUG,
    #    format="[%(asctime)s] %(levelname)s: %(message)s")
     
    class Worker(threading.Thread):  # 处理工作请求
        def __init__(self, workQueue, resultQueue, **kwds):
            threading.Thread.__init__(self, **kwds)
            self.setDaemon(True)
            self.workQueue = workQueue
            self.resultQueue = resultQueue
     
        def run(self):
            while 1:
                try:
                    callable, args, kwds = self.workQueue.get(False)  # get task
                    res = callable(*args, **kwds)
                    self.resultQueue.put(res)  # put result
                except Queue.Empty:
                    break
     
     
    class WorkManager:  # 线程池管理,创建
        def __init__(self, num_of_workers=10):
            self.workQueue = Queue.Queue()  # 请求队列
            self.resultQueue = Queue.Queue()  # 输出结果的队列
            self.workers = []
            self._recruitThreads(num_of_workers)
     
        def _recruitThreads(self, num_of_workers):
            for i in range(num_of_workers):
                worker = Worker(self.workQueue, self.resultQueue)  # 创建工作线程
                self.workers.append(worker)  # 加入到线程队列
     
        def start(self):
            for w in self.workers:
                w.start()
     
        def wait_for_complete(self):
            while len(self.workers):
                worker = self.workers.pop()  # 从池中取出一个线程处理请求
                worker.join()
                if worker.isAlive() and not self.workQueue.empty():
                    self.workers.append(worker)  # 重新加入线程池中
            #logging.info('All jobs were complete.')
     
        def add_job(self, callable, *args, **kwds):
            self.workQueue.put((callable, args, kwds))  # 向工作队列中加入请求
     
        def get_result(self, *args, **kwds):
            return self.resultQueue.get(*args, **kwds)
     
    def check_proxies(ip,port):
        """
        检测代理存活率
        分别访问v2ex.com以及guokr.com
        """
        proxies={'http': 'http://'+str(ip)+':'+str(port)}
        try:
            r0 = requests.get('http://v2ex.com', proxies=proxies,timeout=30,verify=False)
            r1 = requests.get('http://www.guokr.com', proxies=proxies,timeout=30,verify=False)
     
            if r0.status_code == requests.codes.ok and r1.status_code == requests.codes.ok and "09043258" in r1.content and "15015613" in r0.content:
                #r0.status_code == requests.codes.ok and r1.status_code == requests.codes.ok and 
                print ip,port
                return True
            else:
                return False
     
        except Exception, e:
            pass
            #sys.stderr.write(str(e))
            #sys.stderr.write(str(ip)+"	"+str(port)+"	error
    ")
            return False
     
    def get_ip181_proxies():
        """
        http://www.ip181.com/获取HTTP代理
        """
        proxy_list = []
        try:
            html_page = requests.get('http://www.ip181.com/',timeout=60,verify=False,allow_redirects=False).content.decode('gb2312')
            jq = PyQuery(html_page)
            for tr in jq("tr"):
                element = [PyQuery(td).text() for td in PyQuery(tr)("td")]
                if 'HTTP' not in element[3]:
                    continue
     
                result = re.search(r'd+.d+', element[4], re.UNICODE)
                if result and float(result.group()) > 5:
                    continue
                #print element[0],element[1]
                proxy_list.append((element[0], element[1]))
        except Exception, e:
            sys.stderr.write(str(e))
            pass
     
        return proxy_list
     
    def get_kuaidaili_proxies():
        """
        http://www.kuaidaili.com/获取HTTP代理
        """
        proxy_list = []
        for m in ['inha', 'intr', 'outha', 'outtr']:
            try:
                html_page = requests.get('http://www.kuaidaili.com/free/'+m,timeout=60,verify=False,allow_redirects=False).content.decode('utf-8')
                patterns = re.findall(r'(?P<ip>(?:d{1,3}.){3}d{1,3})</td>
    ?s*<td.*?>s*(?P<port>d{1,4})',html_page)
                for element in patterns:
                    #print element[0],element[1]
                    proxy_list.append((element[0], element[1]))
            except Exception, e:
                sys.stderr.write(str(e))
                pass
     
        for n in range(0,11):
            try:
                html_page = requests.get('http://www.kuaidaili.com/proxylist/'+str(n)+'/',timeout=60,verify=False,allow_redirects=False).content.decode('utf-8')
                patterns = re.findall(r'(?P<ip>(?:d{1,3}.){3}d{1,3})</td>
    ?s*<td.*?>s*(?P<port>d{1,4})',html_page)
                for element in patterns:
                    #print element[0],element[1]
                    proxy_list.append((element[0], element[1]))
            except Exception, e:
                sys.stderr.write(str(e))
                pass
     
        return proxy_list
     
    def get_66ip_proxies():
        """
        http://www.66ip.com/ api接口获取HTTP代理
        """
        urllists = [
            'http://www.proxylists.net/http_highanon.txt',
            'http://www.proxylists.net/http.txt',
            'http://www.66ip.cn/nmtq.php?getnum=1000&anonymoustype=%s&proxytype=2&api=66ip',
            'http://www.66ip.cn/mo.php?sxb=&tqsl=100&port=&export=&ktip=&sxa=&submit=%CC%E1++%C8%A1'
            ]
        proxy_list = []
        for url in urllists:
            try:
                html_page = requests.get(url,timeout=60,verify=False,allow_redirects=False).content.decode('gb2312')
                patterns = re.findall(r'((?:d{1,3}.){1,3}d{1,3}):([1-9]d*)',html_page)
                for element in patterns:
                    #print element[0],element[1]
                    proxy_list.append((element[0], element[1]))
            except Exception, e:
                sys.stderr.write(str(e))
                pass
     
        return proxy_list
     
     
    def get_proxy_sites():
        wm = WorkManager(20)
        proxysites = []
        proxysites.extend(get_ip181_proxies())
        proxysites.extend(get_kuaidaili_proxies())
        proxysites.extend(get_66ip_proxies())
     
        for element in proxysites:
            wm.add_job(check_proxies,str(element[0]),str(element[1]))
        wm.start()
        wm.wait_for_complete()
     
     
    if __name__ == '__main__':
        try:
            get_proxy_sites()
        except Exception as exc:
            print(exc)
    

      

      

  • 相关阅读:
    IDEA debug时特慢 Method breakpoints may dramatically slow down debugging
    docker构建镜像
    ubuntu 挂载硬盘
    python 的 flask 、django 、tornado 、sanic
    scrapy实战之scrapyrt的使用
    scrapy框架集成http
    python3之Splash
    CentOS7安装PostgreSQL9.6(图文详细操作)
    替代Navicat的数据库操作工具DBeaver
    CentOS 7 安装 Graylog
  • 原文地址:https://www.cnblogs.com/whoami101/p/5731903.html
Copyright © 2011-2022 走看看