zoukankan      html  css  js  c++  java
  • 用队列方式实现多线程爬虫

    声明:无意滋生事端,仅学习分享,如有侵权,将立即删除。

    说明:糗事百科段子的爬取,采用了队列和多线程的方式,其中关键点是Queue.task_done()、Queue.join(),保证了线程的有序进行。

    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

    新版

    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

    import requests
    from lxml import etree
    from queue import Queue
    from threading import Thread
    from multiprocessing.dummy import Pool
    import json
    import time
    
    class Qsbk(object):
        """
        主要类,用来实现主要模型
        """
        def __init__(self):
            self.headers = {
                "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36",
            }
            self.start_url = "https://www.qiushibaike.com/text"
            # 实例化三个队列,用来存放内容
            # 这时线程是安全的
            self.url_queue = Queue()
            self.html_queue = Queue()
            self.content_queue = Queue()
    
        def get_total_url(self):
            """获取所有页面的url,并返回url_list"""
            # 构造url列表,拼接第一个url和second个url
            url_list = list()
            url_list.append(self.start_url)
            second_url = "https://www.qiushibaike.com/text/page/{}/"
            for i in range(2, 14):
                url = second_url.format(str(i))
                url_list.append(url)
    
            # 将url添加到队列中
            for url in url_list:
                self.url_queue.put(url)
    
        def get_response_html(self):
            """获取响应的html"""
            while self.url_queue.not_empty:
                # 判断非空,为空的时候结束循环
                
                # 从队列中取出一个url
                url = self.url_queue.get()
                print("parse url:",url)
    
                # 获取response
                response = requests.get(url, headers=self.headers,timeout=1)
                # 将response变为字符串
                response_str = response.content.decode()
                # 将response_str变为etree.HTML对象
                html = etree.HTML(response_str)
    
                # 将lxml实例化的html添加到html_queue
                self.html_queue.put(html)
    
                """
                表明先前排队的任务已经完成,
                由队列使用者在线程中使用。
                对于用于获取任务的每个get(),随后对task_done()的调用告诉队列该处理任务完成了
                如果join()当前处于阻塞状态,则将在处理完所有项目后恢复。
                """
                # task_done的时候,队列计数减一
                self.url_queue.task_done()
    
        def get_content(self):
            """
            返回content_list
            获取内容:
            用户头像连接、用户姓名、段子内容、段子详情页、点赞次数和评论次数
            user_image_href
            user_name
            content
            content_detail_href
            stats_vote
            stats_comments
            user_gender
            user_age
            """
            while self.html_queue.not_empty:
                
                # 创建一个content_list存放当前页下的所有div中包涵的段子信息
                content_list = list()
                # 从html_queue队列中获取单页的lxml化的html
                html = self.html_queue.get()
                # 获取单个页面的所有包含段子的div列表
                div_all = html.xpath("//div[@class='col1 old-style-col1']/div")
                # 创建一个列表,存当单页下所有的单个段子元素组成的字典
                # 获取单个段子div下的内容
                for div in div_all:
                    # 创建一个字典,存放单个div下的内容
                    content_dict = dict()
    
                    user_image_href = div.xpath(".//div[@class='author clearfix']//img/@src")
                    user_image_href = "https:"+user_image_href[0].split("?")[0].strip() if user_image_href else None
                    
                    user_name = div.xpath(".//div[@class='author clearfix']//img/@alt")
                    user_name = user_name[0].strip() if user_name else None
                    
                    content = div.xpath(".//a[@class='contentHerf']//span/text()")
                    content = content[0].strip() if content else None
                    
                    content_detail_href = div.xpath(".//a[@class='contentHerf']/@href")
                    content_detail_href ="https://www.qiushibaike.com"+content_detail_href[0].strip() if content_detail_href else None
                    
                    stats_vote = div.xpath(".//div[@class='stats']/span[@class='stats-vote']//i/text()")
                    stats_vote = stats_vote[0] if stats_vote else None
                    
                    stats_comment = div.xpath(".//div[@class='stats']/span[@class='stats-comments']//i/text()")
                    stats_comment = stats_comment[0] if stats_comment else None
                    
                    user_gender = div.xpath(".//div[@class='author clearfix']/div/@class")
                    user_gender = user_gender[0].split(" ")[-1].replace("Icon","").strip() if user_gender else None
    
                    user_age = div.xpath(".//div[@class='author clearfix']/div/text()")
                    user_age = user_age[0] if user_age else None
    
                    content_dict["user_image_href"] = user_image_href
                    content_dict["user_name"] = user_name
                    content_dict["user_gender"] = user_gender
                    content_dict["user_age"] = user_age
                    content_dict["content"] = content
                    content_dict["content_detail_href"] = content_detail_href
                    content_dict["stats_vote"] = stats_vote
                    content_dict["stats_comment"] = stats_comment
    
                    content_list.append(content_dict)
                
                # 将获得到的单页content_list放入content_queue中
                self.content_queue.put(content_list)
                # task_done的时候,队列计数减一
                self.html_queue.task_done()
                   
        def save_content_to_file(self):
            """
            保存文件为json
            """
            while self.content_queue.not_empty:
                content_list = self.content_queue.get()
                with open("qiushibaike_duanzi.json",'a',encoding='utf8') as f:
                    f.write(json.dumps(content_list,ensure_ascii=False,indent=2))
                    print("写入完成")
                # task_done的时候,队列计数减一
                self.content_queue.task_done()
        
        def run(self):
            """
            实现主要逻辑
            """
            start_ = time.time()
    
            # 创建线程列表
            thread_list = list()
            # 创建获取total_url的线程
            url_thread = Thread(target=self.get_total_url)
            # 添加url_thread到thread_list中
            thread_list.append(url_thread)
            # 创建获取response_html的线程
            html_thread = Thread(target=self.get_response_html)
            # 添加html_thread到thread_list中
            thread_list.append(html_thread)
            # 创建获取content的线程
            content_thread = Thread(target=self.get_content)
            # 添加content_thread到thread_list中
            thread_list.append(content_thread)
            # 创建保存content的线程
            savefile_thread = Thread(target=self.save_content_to_file)
            # 添加到thread_list
            thread_list.append(savefile_thread)
    
            # 方式一
            # for t in thread_list:
            #     # 将每个进程设置为守护进程,效果是主进程退出后,不等子进程执行完也退出
            #     # 进程守护文章:
            #     t.setDaemon(True)
            #     t.start()
            # # 当主线程等待,所有的队列为空的时候才能退出
            # self.url_queue.join()
            # self.html_queue.join()
            # self.content_queue.join()
    
    
            # 方式二
            def process_thread(t):
                # 设置守护进程:https://www.cnblogs.com/nuochengze/p/12882349.html
                t.daemon=True
                t.start()
    
            pool = Pool(10)
            pool.map(process_thread,thread_list)
    
            # 当主线程等待,所有的队列为空的时候才能退出
            self.url_queue.join()
            self.html_queue.join()
            self.content_queue.join()
    
            ends_ = time.time()
    
            print("运行时间:",ends_-start_)
    
            
    if __name__=='__main__':
        obj = Qsbk()
        obj.run()

    <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

    旧版

    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

    import requests
    from lxml import etree
    import json
    from queue import Queue
    import threading
    
    class Qsbk(object):
        def __init__(self):
            self.headers = {
                "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.138 Safari/537.36",
                "Referer": "https://www.qiushibaike.com/"
            }
            # 实例化三个队列,用来存放内容
            self.url_queue = Queue()
            self.html_queue = Queue()
            self.content_queue = Queue()
    
        def get_total_url(self):
            """
            获取了所有的页面url,并且返回url_list
            return:url_list
            现在放入url_queue队列中保存
            """
            url_temp = "https://www.qiushibaike.com/text/page/{}/"
            url_list = list()
            for i in range(1,13):
                # url_list.append(url_temp.format(i))
                # 将生成的url放入url_queue队列
                self.url_queue.put(url_temp.format(i))
    
        def parse_url(self):
            """
            发送请求,获取响应,同时etree处理html
            """
            while self.url_queue.not_empty:
                # 判断非空,为空时结束循环
    
                # 从队列中取出一个url
                url = self.url_queue.get()
                print("parsing url:",url)
                # 发送请求
                response = requests.get(url,headers=self.headers,timeout=10)
                # 获取html字符串
                html = response.content.decode()
                # 获取element类型的html
                html = etree.HTML(html)
                # 将生成的element对象放入html_queue队列
                self.html_queue.put(html)
                # Queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
                self.url_queue.task_done()
    
        def get_content(self):
            """
            解析网页内容,获取想要的信息
            """
            while self.html_queue.not_empty:
                items = list()
                html = self.html_queue.get()
                total_div = html.xpath("//div[@class='col1 old-style-col1']/div")
                for i in total_div:
    
                    author_img = i.xpath(".//a[@rel='nofollow']/img/@src")
                    author_img = "https"+author_img[0] if len(author_img)>0 else None
    
                    author_name = i.xpath(".//a[@rel='nofollow']/img/@alt")
                    author_name = author_name[0] if len(author_name)>0 else None
    
                    author_href = i.xpath("./a/@href")
                    author_href = "https://www.qiushibaike.com/"+author_href[0] if len(author_href)>0 else None
    
                    author_gender = i.xpath("./div[1]/div/@class")
                    author_gender = author_gender[0].split(" ")[-1].replace("Icon","").strip() if  len(author_gender)>0 else None
    
                    author_age = i.xpath("./div[1]/div/text()")
                    author_age = author_age[0] if len(author_age)>0 else None
    
                    content = i.xpath("./a/div/span/text()")
                    content = content[0].strip() if len(content)>0 else None
    
                    content_vote = i.xpath("./div[@class='stats']/span[@class='stats-vote']/i/text()")
                    content_vote = content_vote[0] if len(content_vote)>0 else None
    
                    content_comment_numbers = i.xpath("./div[@class='stats']/span[@class='stats-comments']/a/i/text()")
                    content_comment_numbers = content_comment_numbers[0] if len(content_comment_numbers)>0 else None
    
                    item = {
                        "author_name":author_name,
                        "author_age" :author_age,
                        "author_gender":author_gender,
                        "author_img":author_img,
                        "author_href":author_href,
                        "content":content,
                        "content_vote":content_vote,
                        "content_comment_numbers":content_comment_numbers,
                    }
                    items.append(item)
                self.content_queue.put(items)
                # task_done的时候,队列计数减一
                self.html_queue.task_done()
    
        def save_items(self):
            """
            保存items
            """
            while self.content_queue.not_empty:
                items = self.content_queue.get()
                with open("quishibaike.txt",'a',encoding='utf-8') as f:
                    for i in items:
                        json.dump(i,f,ensure_ascii=False,indent=2)
                self.content_queue.task_done()
    
        def run(self):
            # 获取url list
            thread_list = list()
            thread_url = threading.Thread(target=self.get_total_url)
            thread_list.append(thread_url)
    
            # 发送网络请求
            for i in range(10):
                thread_parse = threading.Thread(target=self.parse_url)
                thread_list.append(thread_parse)
    
            # 提取数据
            thread_get_content = threading.Thread(target=self.get_content)
            thread_list.append(thread_get_content)
    
            # 保存
            thread_save = threading.Thread(target=self.save_items)
            thread_list.append(thread_save)
    
    
            for t in thread_list:
                # 为每个进程设置为后台进程,效果是主进程退出子进程也会退出
                t.setDaemon(True)
                t.start()
            
            # 让主线程等待,所有的队列为空的时候才能退出
            self.url_queue.join()
            self.html_queue.join()
            self.content_queue.join()
    
    
    if __name__=="__main__":
        obj = Qsbk()
        obj.run()
  • 相关阅读:
    线段树 by yyb
    【SYZOJ279】滑稽♂树(树套树)
    【BZOJ2806】Cheat(后缀自动机,二分答案,动态规划,单调队列)
    【BZOJ2733】永无乡(线段树,并查集)
    【BZOJ4991】我也不知道题目名字是什么(线段树)
    【BZOJ4999】This Problem Is Too Simple!(线段树)
    【BZOJ1858】序列操作(线段树)
    【BZOJ1835】基站选址(线段树)
    【BZOJ2962】序列操作(线段树)
    【BZOJ1558】等差数列(线段树)
  • 原文地址:https://www.cnblogs.com/nuochengze/p/12861358.html
Copyright © 2011-2022 走看看