zoukankan      html  css  js  c++  java
  • python 爬虫proxy,BeautifulSoup+requests+mysql 爬取样例

    实现思路:

         由于反扒机制,所以需要做代理切换,去爬取,内容通过BeautifulSoup去解析,最后入mysql库

    1.在西刺免费代理网获取代理ip,并自我检测是否可用

    2.根据获取的可用代理ip去发送requests模块的请求,带上代理

    3.内容入库

    注:日志模块在上一篇随笔

    下面附上代码

    1.可用代理获取

    # -*- coding: utf-8 -*-
    import random
    import time
    import requests
    from bs4 import BeautifulSoup
    import log_config
    logger = log_config.getlogger('ip_pool', 'ip_pool.log')
    
    
    class IPProxyPool:
        # 初始化,定义一个空数组ip_list用于存储ip代理
        def __init__(self):
    
            # 代理ip获取网址
            self.proxy_url_list = ['http://www.xicidaili.com', 'http://www.xicidaili.com/nn', 'http://www.xicidaili.com/nn/2']
            self.ip_list = []
            self.headers = {'Host': 'www.xicidaili.com',
                            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
                            }
            self.user_agent_list = [
                "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
                "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
                "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6",
                "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1090.0 Safari/536.6",
                "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/19.77.34.5 Safari/537.1",
                "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.9 Safari/536.5",
                "Mozilla/5.0 (Windows NT 6.0) AppleWebKit/536.5 (KHTML, like Gecko) Chrome/19.0.1084.36 Safari/536.5",
                "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
                "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
                "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_0) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1063.0 Safari/536.3",
                "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
                "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1062.0 Safari/536.3",
                "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
                "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
                "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.1 Safari/536.3",
                "Mozilla/5.0 (Windows NT 6.2) AppleWebKit/536.3 (KHTML, like Gecko) Chrome/19.0.1061.0 Safari/536.3",
                "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24",
                "Mozilla/5.0 (Windows NT 6.2; WOW64) AppleWebKit/535.24 (KHTML, like Gecko) Chrome/19.0.1055.1 Safari/535.24"
            ]
    
        def get_xici_all_ip(self):
            ip_lists = []
            for proxy_url in self.proxy_url_list:
                html = requests.get(proxy_url, headers=self.headers)
                content = html.text
                soup = BeautifulSoup(content, "lxml")
                ip_body = soup.find("table", attrs={"id": "ip_list"})
                ip_page_lists = ip_body.find_all("tr", attrs={"class": "odd"})
                ip_lists = ip_lists + ip_page_lists
            return ip_lists
    
        def get_ip_list(self):
            ip_lists = self.get_xici_all_ip()
            ip_test_pool = []
            for ip in ip_lists:
                http_type = ip.find_all("td")[5].get_text()
                if http_type == 'HTTP':
                    ip_test_account = ip.find_all("td")[1].get_text()
                    ip_test_port = ip.find_all("td")[2].get_text()
                    ip_port_dict = {ip_test_account: ip_test_port}
                    ip_test_pool.append(ip_port_dict)
    
            for ipn in ip_test_pool:
                ip_addr = "http://"
                for ip, port in ipn.items():
                    ip_addr = ip_addr + ip + ':' + port
                # ip代理有效性检验
                statu = self.check_ip(ip_addr)
                if statu:
                    # 将有效ip代理存储至数组ip_list中
                    self.ip_list.append(ip_addr.strip())
    
        def check_ip(self, ip):
            return self.microbell_proxy_ip(ip)
    
        def microbell_proxy_ip(self, ip):
            try:
                test_url = 'http://www.microbell.com/elitelist.html'
                proxy = {'http': ip}
                user_agent = self.random_agent()
                headers_agent = {'User-Agent': user_agent}
                response_body = requests.get(test_url, headers=headers_agent, proxies=proxy, timeout=5)
                if response_body.status_code == 200:
                    # 即使返回了200,也可能不是我们访问的页面,而是代理给我们的页面,所以还需要做判断
                    #response_body.encoding('gbk')
                    content = response_body.text
                    soup = BeautifulSoup(content, "lxml")
                    body = soup.find("div", attrs={"class": "index_docmain"})
                    if body is None:
                        return False
                    if body.get_text() != "":
                        logger.info("ok proxy ip %s" % ip)
                        return True
                    else:
                        return False
    
                else:
                    return False
    
            except Exception as e:
                logger.exception(e.message)
                time.sleep(1)
                return False
    
        def random_agent(self):
            user_agent = random.choice(self.user_agent_list)
            return user_agent
    
    
    if __name__ == "__main__":
        IPProxyPool = IPProxyPool()
        IPProxyPool.get_ip_list()
        print IPProxyPool.ip_list
    
        # proxies = {
        #     "http": "http://118.190.95.35:9001"  # 代理ip
        # }
        #
        # headers = {
        #     'Host': 'www.4399.com',
        #     'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36'
        #
        # }
        #
        # http_url = "http://www.4399.com/"
        # try:
        #     res = requests.get(url=http_url, headers=headers, proxies=proxies, timeout=3)
        #     if res.status_code == 200:
        #         print u"访问网页成功"
        #     else:
        #         print "faile"
        # except Exception as e:
        #     print e

    2.解析界面,获取想要的内容,并入库

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import ConfigParser
    import datetime
    import sys
    import pymysql
    import requests
    from bs4 import BeautifulSoup
    import log_config
    import time
    import random
    from agent_and_proxy_ip_pool import IPProxyPool
    
    logger = log_config.getlogger('report', 'report.log')
    
    # 是否获取今天的数据 0 表示获取今天的数据,1表示获取全部数据 2表示页面没有数据
    get_today_data = 1
    if len(sys.argv) != 1:
        if sys.argv[1] == 1:
            get_today_data = 1
        else:
            print 'input error,please input 0->today ,1->all data'
            exit()
    
    
    class research_report:
        def __init__(self):
            conf = ConfigParser.ConfigParser()
            conf.read("mysql.conf")
            self.ip_proxy_pool = IPProxyPool()
            # self.ip_proxy_pool.get_ip_list()
            # self.ip_pool = self.ip_proxy_pool.ip_list
            # logger.info('You can currently use IP %s' % self.ip_pool)
    #多个可用的cookies
    self.cookies_pool = [ 'c=; ASPSESSIONIDQCQRQQCR=LEOOBOJCBAMFFDHMFBHFJKEE; __guid=188006958.3779224451650617000.1539657585525.2588; ASPSESSIONIDSATRTTDQ=MCDEIPFDLLKBNHPBBEMGBGFC; safedog-flow-item=C07B93F771; UM_distinctid=16680b1e9e411f-0674a4c85ccc2-454c092b-1fa400-16680b1e9e539d; CNZZDATA1752123=cnzz_eid%3D2075545357-1539752826-%26ntime%3D1539752826; Hm_lvt_d554f0f6d738d9e505c72769d450253d=1539757436; robih=vXuWjYMDvV6XmNxOuNmP; MBpermission=0; MBname=sunyue1993; did=67A671BFE; monitor_count=6; Hm_lpvt_d554f0f6d738d9e505c72769d450253d=1539757719' ] self.get_today = get_today_data self.user = conf.get("mysql", "user") self.mysql_password = conf.get("mysql", "password") self.database_name = conf.get("mysql", "database") self.host = conf.get("mysql", "host") self.port = conf.get("mysql", "port") self.site_url = 'http://www.microbell.com/' self.page_url = 'http://www.microbell.com/elitelist_1_0.html' self.headers = {'Host': 'www.microbell.com', 'Accept': 'application/json, text/javascript, */*; q=0.01', 'Accept-Encoding': 'gzip, deflate, sdch', 'Accept-Language': 'zh-CN,zh;q=0.8', 'Connection': 'keep-alive' } # 生成随机agent def get_random_headers(self): # self.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.79 Safari/537.36' self.headers['User-Agent'] = self.ip_proxy_pool.random_agent() self.headers['Cookie'] = random.choice(self.cookies_pool) # 获取指定数组中的随机ip def get_random_proxy(self): proxy_ip = random.choice(self.ip_pool) proxies = {'http': proxy_ip} return proxies # 获取列表页面 def get_html_content(self, page_num_url): try: self.get_random_headers() req = requests.get(page_num_url, headers=self.headers, timeout=5) req.encoding = 'gbk' text = req.text soup = BeautifulSoup(text, "lxml") # soup = body.prettify #美化 report_list = soup.find_all("div", attrs={"class": "classbaogao_sousuo_list"}) list_data = [] logger.info("%s owner %s pages" % (page_num_url, len(report_list))) if len(report_list) == 0: return 2 for report_item in report_list: url = self.site_url + report_item.table.tr.find_all("td")[1].a["href"] title = report_item.table.tr.find_all("td")[1].a["title"] item_data = {"url": url, "title": title} list_data.append(item_data) end_flag = self.get_list_page_data(list_data) return end_flag except Exception as e: logger.exception("get list %s page fail error info : %s" % (page_num_url, e)) return 2 # 获取一页数据38条的每条的详情 def get_list_page_data(self, list_data): try: # 一页数据(38)数组入库一次 page_datas = [] now_date = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.get_random_headers() #proxy_ip = self.get_random_proxy() for item_data in list_data: retry_num = 0 while retry_num < 3: try: # 休眠两秒,避免被封 t = random.uniform(3, 5) time.sleep(t) req = requests.get(item_data["url"], headers=self.headers, timeout=5) req.encoding = 'gbk' text = req.text soup = BeautifulSoup(text, "lxml") detail_div = soup.find("div", attrs={"class": "leftn2"}) tr_s = detail_div.table.find_all("tr") public_time = tr_s[0].find_all("td")[2].span.get_text() if self.get_today == 0: # 如果是爬取今天的数据,会进入这个判断,如果当期获取的文件的时间早于今天就直接退出循环 logger.info("now spider today data") today = datetime.date.today() today_time = int(time.mktime(today.timetuple())) time_array = time.strptime(public_time, "%Y-%m-%d %H:%M:%S") pub_time = int(time.mktime(time_array)) if pub_time < today_time: break abstract_br_replace = soup.find("div", attrs={"class": "p_main"}).p.span str1 = str(abstract_br_replace).replace("<br/>", r" ") abstract_object = BeautifulSoup(str1, "lxml") [s.extract() for s in abstract_object("font")] abstract = abstract_object.get_text() sec_name = tr_s[0].find_all("td")[0].span.get_text() sec_code = tr_s[0].find_all("td")[1].span.get_text() report_type = tr_s[1].find_all("td")[0].span.get_text() doc_type = tr_s[1].find_all("td")[1].span.get_text() author = tr_s[1].find_all("td")[2].span.get_text() provenance = tr_s[2].find_all("td")[0].span.get_text() pages = tr_s[2].find_all("td")[1].span.get_text() rec_rate = tr_s[2].find_all("td")[2].span.get_text() doc_size = tr_s[3].find_all("td")[0].span.get_text() promulgator = tr_s[3].find_all("td")[1].span.get_text() #doc_url_str = soup.find("div", attrs={"class": "anniu_main"}).a["onclick"] doc_url_str = "" doc_url_list = doc_url_str.split(",") doc_url = self.site_url + doc_url_list[2] title = item_data["title"] create_time = now_date update_time = now_date page_data = [title, sec_name, sec_code, public_time, report_type, doc_type, author, provenance, pages, rec_rate, doc_size, doc_url, promulgator, abstract, create_time, update_time] page_datas.append(page_data) break except Exception as e: retry_num += 1 if retry_num == 3: logger.warning("current page is not get %s" % item_data) if len(page_datas) > 0: self.set_data_mysql(page_datas) if self.get_today == 0: if len(page_datas) < 38: return 0 return 1 else: return 2 except Exception as e: logger.error("get detail page fail" % list_data, e) return 2 # 批量插入mysql def set_data_mysql(self, page_datas): # 创建连接 conn = pymysql.connect(host=self.host, port=int(self.port), user=self.user, passwd=self.mysql_password, db=self.database_name) try: # 创建游标 cursor = conn.cursor() sql = "INSERT INTO report(title,sec_name,sec_code,public_time,report_type,doc_type,author," "provenance,pages,rec_rate,doc_size,doc_url,promulgator,abstract,create_time,update_time) " "VALUES(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)" effect_row = cursor.executemany(sql, page_datas) # 提交sql,不提交不会进入mysql conn.commit() logger.info("already into dabatabase %s" % effect_row) finally: conn.close() # 登录,获取cookie,暂时没用 # def login_in(self): # data = { # 'namelogin': self.user_name, # 'pwdlogin': self.password # } # req = requests.post(self.login_url, headers=self.headers, data=data) # req.encoding = req.apparent_encoding # cookies = req.cookies.get_dict() # print cookies # http://www.microbell.com/elitelist_1_0.html 初始页面,后面的页面只有url中的"1"这个值会变动 def process(self): # 分析页面,总共不超过360页 if get_today_data == 0: for i in range(1, 20): base_url = "http://www.microbell.com/elitelist_%s_0.html" % i logger.info("当前获取页面url=%s" % base_url) end_flag = self.get_html_content(base_url) if end_flag == 0: logger.info("The page %s is already the last page" % base_url) break else: for i in reversed(range(1, 107)): base_url = "http://www.microbell.com/elitelist_%s_0.html" % i logger.info("当前获取页面url=%s" % base_url) self.get_html_content(base_url) if __name__ == "__main__": research_class = research_report() research_class.process() else: research_class = research_report() research_class.process()
  • 相关阅读:
    python数据集处理,加载成list
    *和multiply
    RuntimeWarning: overflow encountered in exp
    机器学习 回归
    argsort()
    transpose()、T
    numpy、matplotlib第三方库安装
    feedparser安装
    机器学习实战错误校正
    机器学习 基于概率论的分类方法:朴素贝叶斯
  • 原文地址:https://www.cnblogs.com/keepMoveForevery/p/9815456.html
Copyright © 2011-2022 走看看