zoukankan      html  css  js  c++  java
  • 爬虫连接mongodb、多线程多进程的使用

    一、连接mongodb

    1、            设置数据库 client=pymongo.MongoClient(‘localhost’)

    2、            db=client[‘lagou’]设置连接的数据库名称

    POSITION_NAME=’’ 、PAGE_SUM 、PAGE_SIZE 等为你设置的变量名称。
    3、DATA_NAME=’dataposition’   # # 指定数据库的名字
    4、设置保存在mongo数据库中的数据:
    def save_to_mongo(data):
        if db[DATA_NAME].update({'positionId': data['positionId']}, {'$set': data}, True):
            print('Saved to Mongo', data['positionId'])
        else:
            print('Saved to Mongo Failed', data['positionId'])
    这是以positionId为唯一标识,如果数据库里面已经存在有positionId,说明数据已经爬过了,不再更新。
     
    二、多进程设置和使用:
    1、导入多进程:from multiprocessing import Pool
    导入时间  import time
    2、start_time = time.time()
    pool = Pool()  # pool()参数:进程个数:默认的是电脑cpu的核的个数,如果要指定进程个数,这个进程个数要小于等于cpu的核数
    # 第一个参数是一个函数体,不需要加括号,也不需指定参数。。
    #  第二个参数是一个列表,列表中的每个参数都会传给那个函数体
    pool.map(to_mongo_pool,[i for i in range(PAGE_SUM)])
    # close它只是把进程池关闭
    pool.close()
    # join起到一个阻塞的作用,主进程要等待子进程运行完,才能接着往下运行
    pool.join()
    end_time = time.time()
    print("总耗费时间%.2f秒" % (end_time - start_time))
     

    to_mongo_pool:这个函数要设计好,就一个参数就够了,然后把它的参数放在列表里面,通过map高阶函数一次传给to_mongo_pool

    多线程的使用:

    多线程要配合队列使用:

    # coding=utf-8
    import requests
    from lxml import etree 
    import threading 导入线程
    from queue import Queue  导入队列



    # https://docs.python.org/3/library/queue.html#module-queue
    # 队列使用方法简介
    # q.qsize() 返回队列的大小
    # q.empty() 如果队列为空,返回True,反之False
    # q.full() 如果队列满了,返回True,反之False
    # q.full 与 maxsize 大小对应
    # q.get([block[, timeout]]) 获取队列,timeout等待时间
    # q.get_nowait() 相当q.get(False)
    # q.put(item) 写入队列,timeout等待时间
    # q.put_nowait(item) 相当q.put(item, False)
    # q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    # q.join() 实际上意味着等到队列为空,再执行别的操作


    class Lianjia:
        def __init__(self):
            self.url_temp = url = "https://gz.lianjia.com/ershoufang/pg{}/"
           
    self.headers = {
                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
            self.url_queue = Queue()
            self.html_queue = Queue()
            self.content_queue = Queue()

        def get_url_list(self):
            # return [self.url_temp.format(i) for i in range(1,14)]
           
    for i in range(1, 14):
                # 把13个索引页面的Url放进url_queue队列里
               
    self.url_queue.put(self.url_temp.format(i))

       定义运行函数

    def run(self):  # 实现主要逻辑
        thread_list = []
        # 1.url_list
        # threading.Thread不需要传参数,参数都是从队列里面取得
        t_url = threading.Thread(target=self.get_url_list)
        thread_list.append(t_url)
        # 2.遍历,发送请求,获取响应
        for i in range(20):  # 添加20个线程
            t_parse = threading.Thread(target=self.parse_url)
            thread_list.append(t_parse)
        # 3.提取数据
        for i in range(2):  # 添加2个线程
            t_html = threading.Thread(target=self.get_content_list)
            thread_list.append(t_html)
        # 4.保存
        t_save = threading.Thread(target=self.save_content_list)
        thread_list.append(t_save)
        for t in thread_list:
            t.setDaemon(True)  # 把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)
            t.start()

        for q in [self.url_queue, self.html_queue, self.content_queue]:
            q.join()  # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程

        print("主线程结束")

     代码如下:

    # coding=utf-8
    import requests
    from lxml import etree
    import threading
    from queue import Queue
    
    
    # https://docs.python.org/3/library/queue.html#module-queue
    # 队列使用方法简介
    # q.qsize() 返回队列的大小
    # q.empty() 如果队列为空,返回True,反之False
    # q.full() 如果队列满了,返回True,反之False
    # q.full 与 maxsize 大小对应
    # q.get([block[, timeout]]) 获取队列,timeout等待时间
    # q.get_nowait() 相当q.get(False)
    # q.put(item) 写入队列,timeout等待时间
    # q.put_nowait(item) 相当q.put(item, False)
    # q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
    # q.join() 实际上意味着等到队列为空,再执行别的操作
    
    
    class Lianjia:
        def __init__(self):
            self.url_temp = url = "https://gz.lianjia.com/ershoufang/pg{}/"
            self.headers = {
                "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.84 Safari/537.36"}
            self.url_queue = Queue()
            self.html_queue = Queue()
            self.content_queue = Queue()
    
        def get_url_list(self):
            # return [self.url_temp.format(i) for i in range(1,14)]
            for i in range(1, 14):
                # 把13个索引页面的Url放进url_queue队列里
                self.url_queue.put(self.url_temp.format(i))
    
        def parse_url(self):
            while True:
                # get方法和task_done搭配使用
                # 在put是队列+1,get和task_done一起使用时队列才会-1
                url = self.url_queue.get()
                print(url)
                response = requests.get(url, headers=self.headers)
                # 然后把索引页的响应页面放进html_queue队列里
                self.html_queue.put(response.content.decode())
                self.url_queue.task_done()
    
        def get_content_list(self):  # 提取数据
            while True:
                # 先从索引页响应页面html_queue队列里面取出索引页面
                html_str = self.html_queue.get()
    
                html = etree.HTML(html_str)
                div_list = html.xpath('//li[@class="clear LOGCLICKDATA"]')  # 分组
                content_list = []
                for div in div_list:
                    item = {}
                    item['title'] = div.xpath('.//div[@class="title"]/a/text()')
                    item['href'] = div.xpath('.//div[@class="title"]/a/@href')
                    item['totalPrice'] = div.xpath('.//div[@class="totalPrice"]/span/text()')
                    item['houseInfo'] = div.xpath('.//div[@class="houseInfo"]/text()')
                    content_list.append(item)
                # 把content_list放进content_queue里面
                self.content_queue.put(content_list)
                self.html_queue.task_done()
    
        def save_content_list(self):  # 保存
            while True:
                content_list = self.content_queue.get()
                for i in content_list:
                    print(i)
                    pass
                self.content_queue.task_done()
    
        def run(self):  # 实现主要逻辑
            thread_list = []
            # 1.url_list
            # threading.Thread不需要传参数,参数都是从队列里面取得
            t_url = threading.Thread(target=self.get_url_list)
            thread_list.append(t_url)
            # 2.遍历,发送请求,获取响应
            for i in range(20):  # 添加20个线程
                t_parse = threading.Thread(target=self.parse_url)
                thread_list.append(t_parse)
            # 3.提取数据
            for i in range(2):  # 添加2个线程
                t_html = threading.Thread(target=self.get_content_list)
                thread_list.append(t_html)
            # 4.保存
            t_save = threading.Thread(target=self.save_content_list)
            thread_list.append(t_save)
            for t in thread_list:
                t.setDaemon(True)  # 把子线程设置为守护线程,该线程不重要,主线程结束,子线程结束(子线程是while true不会自己结束)
                t.start()
    
            for q in [self.url_queue, self.html_queue, self.content_queue]:
                q.join()  # 让主线程等待阻塞,等待队列的任务完成(即队列为空时 )之后再进行主线程
    
            print("主线程结束")
    
    
    if __name__ == '__main__':
        qiubai =Lianjia()
        qiubai.run()
    
    # 所没有tast_done方法,程序最终会卡着不动,无法终止
  • 相关阅读:
    mac添加环境变量
    Flex 中文字体终极解决方案
    C# Label背景透明
    C# 字节数组和十六进制字符串之间转换的另类写法
    C# params 动态参数
    HttpFlexSession注册失败的怪问题
    sun.misc.BASE64Encoder找不到jar包的解决方法
    Eclipse jee 3.7常用插件安装手记
    GitHub安装缓慢甚至下载失败的解决办法
    subclipse解决JavaHL不可用的问题
  • 原文地址:https://www.cnblogs.com/wwthuanyu/p/9999613.html
Copyright © 2011-2022 走看看