zoukankan      html  css  js  c++  java
  • 多任务爬虫

    一、多任务简介

    1、为什么要使用多任务爬虫?
    • 在大量的url需要请求时,单线程/单进程去爬取,速度太慢,此时cpu不工作,浪费cpu资源。
    • 爬取与写入文件分离,可以规避io操作,增加爬取速度,充分利用cpu。
    2、多任务分类
    • 进程:进程是操作资源分配的最小单位,一个运行的程序,至少包括一个进程,进程之间数据不能共享。(利用多核)
    • 线程:线程是cpu调度的最小单位,一个进程中至少含有一个线程,线程中数据是共享的,如果多个线程操作同一个对象时,需要考虑数据安全问题。(爬虫中最常用)
    • 协程:协程位于线程内部,如果一个线程中运行的代码,遇到IO操作时,切换到线程其他代码执行(最大程度的规避IO操作)
    2、如何提高程序的运行速度
    1、提高CPU的利用率

    假如我们的程序有只有一个线程,CPU就只处理这一个线程。如果在程序中遇到IO操作。此时CPU就不工作了。休息的这段时间,就浪费了CPU的资源。

    若我们的程序是多线程的,CPU会在这多个任务之间切换,如果其中一个线程阻塞了,CPU不会休息,会处理其他线程。

    2、增加CPU数量

    一个CPU同一时间只能护理一个任务,若我们增加CPU数量,那么多个CPU处理多个任务,也会提升程序的运行速度,例如使用多进程。

    二、python中的threading模块(开启多线程)

    cpython解释器下的 python中没有真正的多线程(因为多个线程不能同时在多核上执行,只能在一个CPU上进行多个线程的切换轮流执行,在视觉效果上看起来同时在执行),造成这个情况的原因是因为GIL(全局性解释器锁),在一个进程中,多个线程是数据共享的,如果不设置全局解释性锁,多个线程可能在同一时间对同一个变量进行操作,造成变量的引用计数不正确,影响其进行垃圾回收,所以需要加全局性解释器锁。

    2.1、多线程开启方法
    from threading import Thread
    1、使用函数
    t = Thread(
    					target=线程执行的任务(方法)名字,
    					args = 执行方法的参数,是一个元组
    				)---创建线程
    t.start()---启动线程
    
    2、使用类
    class Mythread(Thread)
    	def __init__(self,参数)
    		self.参数=参数
    		super(Mythread,self).__init__()
    	
    	def run(self):
    		将需要多任务执行的代码,添加到此处
    
    if __name__ == '__main__':
        my =  Mythread(参数)
        my.start()
    
    2.2、线程中常用的几个方法
    from threading import Thread, current_thread, enumerate, active_count
    import time
    import random
    
    
    class MyThread(Thread):
        def run(self):
            time.sleep(random.random())
            msg = "I'm" + self.name + "@" + str(i)  #self.name 当前线程名
            print(msg)
            print(current_thread().ident)  #当前线程的id号
            print(current_thread().is_alive()) #当前线程是否存活
    
    
    if __name__ == '__main__':
        t_list=[]
        for i in range(5):
            t = MyThread()
            t.start()
            t_list.append(t)
        while active_count() > 1:  #active_count() 当前存活线程数,包括主线程
            print(enumerate()) #enumerate() 当前存活线程列表,包括主线程
         for i  in t_list:
            i.join() #join方法,会使异步执行的多线程,变为同步执行,主线程会等i线程执行完,才会往下执行。
    
    2.3、守护线程

    守护线程,当一个子线程设置为守护线程时,该子线程会等待其他非守护子线程和主线程执行完成后,结束线程。

    from threading import Thread, current_thread
    import time
    
    
    def bar():
        while True:
            time.sleep(1)
            print(current_thread().name)
    
    
    def foo():
        print(f'{current_thread().name}开始了...')
        time.sleep(2)
        print(f'{current_thread().name}结束了...')
    
    
    if __name__ == '__main__':
        t1 = Thread(target=bar)
        t1.daemon = True #将t1设置为守护线程,
        t1.start()
        t2 = Thread(target=foo)
        t2.start()
    
    #执行结果
    Thread-2开始了...
    Thread-1
    Thread-1
    Thread-2结束了...
    
    2.4、锁

    在使用多线程爬虫的时候,有时候多个线程会对同一个文件进行读写。造成数据不安全,下面是一个Tencent招聘的例子,在写入excel文件中的时候,由于多个线程对同一个文件进行写入操作,造成数据不安全。

    import requests
    from jsonpath import jsonpath
    from excle_wirte import ExcelUtils
    from threading import Thread
    import os
    from multiprocessing import Lock
    import threading
    
    def get_content(url):
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36',
            'referer': 'https://careers.tencent.com/search.html'
        }
        print(url)
        res = requests.get(url, headers=headers).json()
        jp = jsonpath(res, '$.*.Posts.*')
        return jp
    
    
    def write_excel(filename, item_list, sheetname):
        if not os.path.exists(filename):
            ExcelUtils.write_to_excel(filename, item_list, sheetname)
        else:
            ExcelUtils.append_to_excel(filename, item_list)
    
    
    def main(i, lock):
        base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1585401795646&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=20&language=zh-cn&area=cn'
        content = get_content(base_url.format(i))
        with lock:   #加锁
            write_excel('tencent.xls', content, 'hr')
    
    
    if __name__ == '__main__':
        lock = Lock()  #创建锁
        for i in range(1, 11):
            t = Thread(target=main, args=(i, lock))
            t.start()
    
    2.5、生产者与消费者模型

    生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。

    例子:Tencent招聘生产者与消费者版本,我这里是用函数写的,当然也可以用类来写,会更加方便。
    import requests
    from jsonpath import jsonpath
    from excle_wirte import ExcelUtils
    from threading import Thread
    import os
    from multiprocessing import Lock
    from queue import Queue
    
    flag = False
    
    
    def ger_url_list(num, url_queue):
        base_url = 'https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1585401795646&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex={}&pageSize=20&language=zh-cn&area=cn'
        for i in range(1, num + 1):
            url_queue.put(base_url.format(i))
    
    
    def producer(url_queue, content_queue):
        headers = {
            'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36',
            'referer': 'https://careers.tencent.com/search.html'
        }
        while True:
            try:
                url = url_queue.get_nowait()
                res = requests.get(url, headers=headers).json()
                jp = jsonpath(res, '$.*.Posts.*')
                content_queue.put(jp)
            except Exception as e:
                break
    
    
    def consumer(content_queue, lock, filename, sheetname):
        while True:
            if content_queue.empty() and flag:
                break
            try:
                item_list = content_queue.get_nowait()
                with lock:
                    if not os.path.exists(filename):
                        ExcelUtils.write_to_excel(filename, item_list, sheetname)
                    else:
                        ExcelUtils.append_to_excel(filename, item_list)
            except Exception as e:
                pass
    
    
    if __name__ == '__main__':
        p_t_list = []
        url_queue = Queue()   #存放url的队列
        content_queue = Queue()  #网页内容队列
        ger_url_list(10, url_queue)  #往url队列添加url
        lock = Lock() #创建锁对象
        for i in range(4): # 开启四个线程来抓取网页内容
            p_t = Thread(target=producer, args=(url_queue, content_queue))
            p_t.start()
            p_t_list.append(p_t)
        for i in range(4): #四个线程来解析内容和写入文件
            t = Thread(target=consumer, args=(content_queue, lock, 'tencent.xls', 'hr'))
            t.start()
        for i in p_t_list:
            i.join()
        flag=True #判断标志,用来判断生产者是否生产完毕。
    
    
    2.6、多进程

    多进程一般用于处理计算密集型任务,在爬虫方面用的较少,因为多进程开启数量依赖于CPU核心数,且多进程开启操作系统需要为每个进程分配资源,效率不高。这里只简单说明python中使用的库和使用方法,注意进程间不能之间进行数据交换,需要依赖于IPC(Inter-Process Communication)进程间通信,提供了各种进程间通信的方法进行数据交换),常用方法为 队列和管道和Socket。当然还有第三方工具,例如RabbitMQredis

    from multiprocessing import Process
    1、使用函数
    t = Thread(
    					target=进程执行的任务(方法)名字,
    					args = 执行方法的参数,是一个元组
    				)---创建进程
    t.start()---启动进程
    
    2、使用类
    class MyProcess(Process)
    	def __init__(self,参数)
    		self.参数=参数
    		super(Mythread,self).__init__()
    	
    	def run(self):
    		将需要多任务执行的代码,添加到此处
    
    if __name__ == '__main__':
        my =  MyProcess(参数)
        my.start()
    

    multiprocessing这个库中有很多于多进程相关对象

    from multiprocessing import Queue, Pipe, Pool,等
    Queue:队列 
    Pipe:管道
    Pool:池(有另外的模块,统一了进程池,线程池的接口,使用更加方便)
    

    三、池

    3.1、什么是池

    池,包括线程池与进程池,一个池内,可以含有指定的线程数,或者是进程数,多个任务,从中拿取线程/进程执行任务,执行完成后,下一个任务再从池中拿取线程/进程。直到所有任务都执行完毕。

    3.2、为什么使用池
    • 可以比较好的控制开启线程/线程的数量,在提升效率的同时又控制住资源开销。
    • 可以指定回调函数,很方便的处理返回数据
    3.2、池的简单使用,以进程池为例,线程池一样的操作。
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    
    
    def fun(i):
        return i ** 2
    
    
    def pr(con):
        p = con.result()
        print(p)
    
    
    if __name__ == '__main__':
        p_pool = ProcessPoolExecutor(max_workers=4)  #创建一个含有四个进程的池
        for i in range(10): #10个任务
            p = p_pool.submit(fun, i)  #任务提交
            p.add_done_callback(pr)  #指定回调函数
        p_pool.shutdown()#关闭池
    #执行结果
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    
    3.3、池map方法使用,适合于简单参数
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    
    
    def fun(i):
        return i ** 2
       
    if __name__ == '__main__':
        p_pool = ProcessPoolExecutor(max_workers=4)
        p = p_pool.map(fun, range(10))
        print(list(p)) #map方法返回的是一个生成器,可通过强转或者循环取值。
    
    #执行结果
    [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    
  • 相关阅读:
    HDU 2073 无限的路
    HDU 2080 夹角有多大II
    if
    HDU 2094 产生冠军
    HDU 2076 夹角有多大(题目已修改,注意读题)
    HDU 2086 A1 = ?
    HDU 2069 Coin Change
    HDU 2095 find your present (2)
    android常用开发工具的用法
    android安装前期遇到的问题
  • 原文地址:https://www.cnblogs.com/hjnzs/p/12602531.html
Copyright © 2011-2022 走看看