zoukankan      html  css  js  c++  java
  • 实现多线程爬取数据并保存到mongodb

    多线程爬取二手房网页并将数据保存到mongodb的代码:

    import pymongo
    import threading
    import time
    
    from lxml import etree
    import requests
    from queue import Queue
    
    index_url='https://m.lianjia.com/gz/ershoufang/pg{}/'
    detail_url='https://m.lianjia.com{}'
    
    # 设置爬取主页的页数
    INDEX_PAGE_NUM=200
    
    # 定义一个类
    
    # 0定义主页url队列、主页html队列、详情页url队列、html队列、内容队列
    # 1获取首页url并解析详情页url
    # 2获取详情页的内容
    # 3保存内容
    # 4设置多线程调用方法
    
    # 设置mongodb
    client = pymongo.MongoClient('localhost')
    # 设置数据库名
    db = client['ershoufang']
    # 指定集合名
    index = 'index_info'
    detail = 'detail_info'
    
    
    class lianJia():
        def __init__(self):
            self.index_url_queue=Queue()
    
            self.html_index_queue=Queue()
    
            self.index_content_queue=Queue()
            self.detail_content_queue = Queue()
    
        #     获取主页的url和html内容并解析出index页内容和详情页url
        def get_index(self):
            for i in range(INDEX_PAGE_NUM):
                # print(index_url.format(i+1))
                url=index_url.format(i+1)
                self.index_url_queue.put(url)
                # index=requests.get(index_url.format(i+1)).content.decode()
                # self.html_index_queue.put(index)
        # 获取主页html
        def get_index_html(self):
            while True:
                url=self.index_url_queue.get()
                index = requests.get(url).content.decode()
                self.html_index_queue.put(index)
                self.index_url_queue.task_done()
        def parse_index(self):
            while True:
                # 获取队列里得内容
                html1=self.html_index_queue.get()
                xml=etree.HTML(html1)
                pingjie_list=xml.xpath('''//ul[@class='lists']/li[position()>1]''')
                # 将 pingjie_list拼接在xpath前,少写xpath语句
                index_content_list=[]
                for pj in pingjie_list:
                    index_infor={}
                    # #判空炒作,如果为空则显示none if len(index_infor['title']) > 0 else None
                    index_infor['title']=pj.xpath('''./div/div[@class='item_list']/div[1]/text()''')
    
                    index_infor['title']=index_infor['title'][0] if len(index_infor['title']) > 0 else None
                    index_infor['detail_url'] = pj.xpath('''./a/@href''')[0]
                    index_infor['index_detail']=pj.xpath('''./div/div[2]/div[2]/text()''')
                    index_infor['index_detail']=index_infor['index_detail'][0] if len(index_infor['index_detail'])>0 else None
                    index_infor['total_price']=pj.xpath('''./div/div[2]/div[position()>2]/span[1]/em/text()''')
                    index_infor['total_price']= index_infor['total_price'][0] if len( index_infor['total_price'])>0 else None
                    index_infor['average_price']=pj.xpath('''./div/div[@class='item_list']/div[3]/span[2]/text()''')
                    index_infor['average_price']=index_infor['average_price'][0]if len(index_infor['average_price'])>0 else None
                    index_content_list.append(index_infor)
                    #  队列保存时不能在循环里 否之回保存很多个队列
                    # self.index_content_queue.put(index_content_list)
                    # 把content_list放进content_queue里面
    
                self.index_content_queue.put(index_content_list)
                # print(index_content_list)
    
                # 每从队列中获取一个数,队列则减少一个数,所以此代码必须写
                self.html_index_queue.task_done()
    
    
        # 获取详情页内容
        def get_detail(self):
            pass
    
        # 保存内容
        def save_content(self):
            while True:
                index_conten_list=self.index_content_queue.get()
    
                for i in index_conten_list:
                    # print(i['title'])
                    if i['title']==None or i['total_price']==None or i['average_price']==None:
                        print('该数据为空,不进行保存')
    
                    else:
                        db['index_info'].insert(i)
                        # db['detailDta'].insert(detail_datas)
                        print('保存数据成功')
                self.index_content_queue.task_done()
    
    
    
        # 主线程:分配各种子线程去执行class里得每一个函数
        # 使用队列的方式得设置多线程进行调用函数,才能让程序执行速度更快
        def run(self):
            # 设置线程列表
            thread_list=[]
            # start_time=time.time()
            # 1.url_list
            # threading.Thread不需要传参数,参数都是从队列里面取得
            # for i in range(20):
            t_index_u=threading.Thread(target=self.get_index)
            thread_list.append(t_index_u)
    
            # 2.遍历,发送请求,获取响应
            for i in range(20):
                t_index_html=threading.Thread(target=self.get_index_html)
                thread_list.append(t_index_html)
    
            # 3.提取数据
            for i in range(2):
                t_parse_index=threading.Thread(target=self.parse_index)
                thread_list.append(t_parse_index)
    
            # 4.保存数据
            t_save=threading.Thread(target=self.save_content)
            thread_list.append(t_save)
            #     循环开启各子线程
            for t in thread_list:
                # 表示主线程结束,子线程(设置为true无限循环)也跟着结束(用主线程控制子线程)
                t.setDaemon(True)
                # 启动线程
                t.start()
            for q in [self.index_url_queue,self.html_index_queue,self.index_content_queue]:
                # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程
                q.join()
                # end_time=time.time()
                # print('总耗时%.2f秒'%(end_time-start_time))
    
    if __name__=='__main__':
        sk = time.clock()
        func=lianJia()
        func.run()
        ek = time.clock()
        print('程序总耗时:',ek-sk)

     多线程爬取糗事百科:

    # coding=utf-8
    import requests
    from lxml import etree
    import threading
    from queue import Queue
    
    # https://docs.python.org/3/library/queue.html#module-queue
    # 队列使用方法简介
    # q.qsize() 返回队列的大小
    # q.empty() 如果队列为空,返回True,反之False
    # q.full() 如果队列满了,返回True,反之False
    # q.full 与 maxsize 大小对应
    # q.get([block[, timeout]]) 获取队列,timeout等待时间
    # q.get_nowait() 相当q.get(False)
    # q.put(item) 写入队列,timeout等待时间
    # q.put_nowait(item) 相当q.put(item, False)
    # q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    # q.join() 实际上意味着等到队列为空,再执行别的操作
    
    
    class QiubaiSpdier:
        def __init__(self):
            self.url_temp = "https://www.qiushibaike.com/8hr/page/{}/"
            self.headers = {"User-Agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
            self.url_queue = Queue()
            self.html_queue  = Queue()
            self.content_queue = Queue()
        def get_url_list(self):
            # return [self.url_temp.format(i) for i in range(1,14)]
            for i in range(1,14):
                # 把13个索引页面的Url放进url_queue队列里
                self.url_queue.put(self.url_temp.format(i))
    
        def parse_url(self):
            while True:
                # get方法和task_done搭配使用
                # 在put是队列+1,get和task_done一起使用时队列才会-1
                url = self.url_queue.get()
                print(url)
                response = requests.get(url,headers=self.headers)
                # 然后把索引页的响应页面放进html_queue队列里
                self.html_queue.put(response.content.decode())
                self.url_queue.task_done()
    
        def get_content_list(self): #提取数据
            while True:
                # 先从索引页响应页面html_queue队列里面取出索引页面
                html_str = self.html_queue.get()
    
                html = etree.HTML(html_str)
                div_list = html.xpath("//div[@id='content-left']/div")  #分组
                content_list = []
                for div in div_list:
                    item= {}
                    item["content"] = div.xpath(".//div[@class='content']/span/text()")
                    item["content"] = [i.replace("
    ","") for i in item["content"]]
                    item["author_gender"] = div.xpath(".//div[contains(@class,'articleGender')]/@class")
                    item["author_gender"] = item["author_gender"][0].split(" ")[-1].replace("Icon","") if len(item["author_gender"])>0 else None
                    item["auhtor_age"] = div.xpath(".//div[contains(@class,'articleGender')]/text()")
                    item["auhtor_age"] = item["auhtor_age"][0] if len(item["auhtor_age"])>0 else None
                    item["content_img"] = div.xpath(".//div[@class='thumb']/a/img/@src")
                    item["content_img"] = "https:"+item["content_img"][0] if len(item["content_img"])>0 else None
                    item["author_img"] = div.xpath(".//div[@class='author clearfix']//img/@src")
                    item["author_img"] = "https:"+item["author_img"][0] if len(item["author_img"])>0 else None
                    item["stats_vote"] = div.xpath(".//span[@class='stats-vote']/i/text()")
                    item["stats_vote"] = item["stats_vote"][0] if len(item["stats_vote"])>0 else None
                    content_list.append(item)
                # 把content_list放进content_queue里面
                self.content_queue.put(content_list)
                self.html_queue.task_done()
    
        def save_content_list(self): #保存
            while True:
                content_list = self.content_queue.get()
                for i in content_list:
                    print(i)
                    pass
                self.content_queue.task_done()
    
        def run(self): #实现主要逻辑
            thread_list = []
            #1.url_list
            # threading.Thread不需要传参数,参数都是从队列里面取得
            t_url = threading.Thread(target=self.get_url_list)
            thread_list.append(t_url)
            #2.遍历,发送请求,获取响应
            for i in range(20): # 添加20个线程
                t_parse = threading.Thread(target=self.parse_url)
                thread_list.append(t_parse)
            #3.提取数据
            for i in range(2): # 添加2个线程
                t_html = threading.Thread(target=self.get_content_list)
                thread_list.append(t_html)
            #4.保存
            t_save = threading.Thread(target=self.save_content_list)
            thread_list.append(t_save)
            for t in thread_list:
                t.setDaemon(True) #把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)
                t.start()
    
            for q in [self.url_queue,self.html_queue,self.content_queue]:
                q.join() #让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程
    
            print("主线程结束")
    
    
    
    if __name__ == '__main__':
        qiubai = QiubaiSpdier()
        qiubai.run()
      
    # 所没有tast_done方法,程序最终会卡着不动,无法终止
    
    # 线程的设计注意:耗时的操作要分配一些线程
  • 相关阅读:
    COJ 1002 WZJ的数据结构(二)(splay模板)
    生成网络流图
    最小费用最大流MCMF zkw费用流
    COJ 2003 选根 (树的重心)
    最小费用最大流MCMF 最小增广
    PDO 基础知识
    使 用 Jquery 全选+下拉+单选+事件+挂事件
    搜 房 网 站 设 计 练 习
    百分比进度条
    在PHP系统里连接MySQL 数据访问,+ + + + + 数据删除
  • 原文地址:https://www.cnblogs.com/Dark-fire-liehuo/p/9998573.html
Copyright © 2011-2022 走看看