zoukankan      html  css  js  c++  java
  • 动态调整线程数的python爬虫代码分享

      这几天在忙一个爬虫程序,一直在改进他,从一开始的单线程,好几秒一张图片(网络不好),,,到现在每秒钟十几张图片,,, 四个小时586万条数据,,,简直不要太爽 先上图

      

      最终写出来的程序,线程数已经可以动态调整了,贼暴力。。。峰值能稳定在50个线程,具体思路可以继续看

      这里终于用到了操作系统的知识,就是生产者和消费者的模型。。。(参考源码忘记记录了,抱歉

      先简单说一下目标网站的情况,目标网站是一个图片网站,有一个列表页,点进列表页之后,可以看到很多图片,这只爬虫的目的是收集这些图片链接(有了链接当然也能下载了...

      简单分析之后发现,在列表页,会向后台请求一个json格式的数据文件,然后js动态的把里面的id组合成一个链接,最终组成如下样式的链接

        

    http://www.xxxxxx.com/photo/json?page=1977

      显而易见,page参数就是指定页数的,那么,这里就可以先生成一个列表,用for循环把所有列表页的url加进去,接下来只需要遍历这个链接列表就好了。

      

    #首先构造产品队列
        for i in range(1,11613):
            url_list.append("http://www.xxxxxx.com/photo/json?page="+str(i));
        print('产生链接完成');

      接下来,就是启动生产者线程,通过列表里的url,获取到每一个详情页的id,进而拼接出详情页的url,接下来把生产的详情页url添加到一个任务队列里面就好了,这就是生产者的工作。

      

    #生产者
    def producer(url_list,in_queue):
        print('进入生产者线程');
        global flag;
        for url in url_list:
            html = open_page(url);  #获取总页json 得到每一个页的id 进而得到每个页的url
            if html == '0':
                continue;
            else:
                idurl_list = get_idurl(html);   #得到第n页的所有详情页url
                if len(idurl_list)==0:  #如果取不到url 直接进行下一页
                    continue;
                for idurl in idurl_list:
                    in_queue.put(idurl);
                    #print('生产完成一个');
        flag=1;
        print('产品生产完成');

      接着,需要在等待几秒钟,让生产者先生产一些产品。

      然后创建一个管理消费者的线程,能够创建新的消费者线程

        #线程管理线程
        consumer_thread = Thread(target=manger_thread,args=(in_queue,));
        consumer_thread.daemon = True;
        consumer_thread.start();

      线程里面的代码是这样的

      

    def manger_thread(in_queue):
        global thread_num;
        while True:
            if in_queue.qsize()>3000 and thread_num<80: #设置最大线程80
                consumer_thread = Thread(target=consumer,args=(in_queue,));
                consumer_thread.daemon = True;
                consumer_thread.start();
                thread_num+=1;
    

       简单解释一下,有一个全局变量,thread_num 这个就是用来调整进程数的依据,始终为消费者数目。

      接着,创建一个死循环,不停的判断任务队列中的产品数量,超过3000个,并且现在线程数小于80个,那就创建一个消费者线程。

      消费者代码:

      

    #消费者
    def consumer(in_queue):
        global count;
        global flag;
        global thread_num;
        print('进入消费者线程,队列长度: '+str(in_queue.qsize()));
        while True:
            if in_queue.qsize()<3000 and thread_num>10:   #队列中数量小于5000 并且线程数大于10 就取消一个线程
                thread_num-=1;
                return;
            html = open_page(in_queue.get());  #取得一个详情页链接开始取得源码
            if html == '0': #获取源码失败
                in_queue.task_done();   #虽然打开网页失败了 但是似乎还是得确认完成
                continue;
            image_url = get_url(html);  #得到详情页图片url列表
            save_url(image_url);    #保存链接
            #print('队列长度: '+str(in_queue.qsize()));
            count+=1;
            os.system('title '+'已爬组数:'+str(count)+'_队列长度:'+str(in_queue.qsize())+'_线程数:'+str(thread_num));
            in_queue.task_done();

      首先声明的几个全局变量是用来显示各种参数的

      这里依旧是一个死循环,循环中判断 任务队列中产品数量小于3000并且线程数大于10的话,那就退出这个线程。 通过线程管理线程以及这里的调整,队列长度稳定在3000

      然后打开网页源码,解析图片链接即可。

      值得一提的是,直接获取那个网页的源码,并不能得到图片的链接,需要对连接中字符串进行替换,,,具体怎么替换,需要查看js代码,然后用python源码实现一遍就好。

      下面放出所有的源码(要注意,代码中所有url全部都是修改了的,所以代码不能直接运行,,,如果想让他运行起来,可以私信我,或者留言给我

      

    #encoding:utf-8 
    import bs4;
    import urllib.request;
    import urllib.error; # abc
    from urllib.request import urlretrieve
    import time;
    import os;
    import json;
    from queue import Queue;
    import threading;
    from retrying import retry;
    from threading import Thread;
    
    count = 0;  #记录组数
    thread_num = 0; #线程数
    flag = 0; #生产者完成标志
    
    #打开网页 直接返回源码
    @retry(wait_fixed=1000,stop_max_attempt_number=50)  #异常重试
    def open_page(url):
        print('打开网页: '+url);
        header = {};
        header['User-Agent'] = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.81 Safari/537.36";
        req = urllib.request.Request(url,headers=header);
        response = urllib.request.urlopen(req,timeout=5);
        #times=str(time.time());
        #print('读取内容'+times);
        temp = response.read().decode('utf-8');
        #print('读取结束'+times);
        return temp;
    
    #保存url
    def save_url(url_list):  #考虑到追加字符串比较频繁 所以组合成一个大的字符串一起写入可以降低磁盘I/O (大概?
        #print('保存url: '+str(len(url_list)));
        file_handle = open('list.txt','a+');    #不存在文件就创建并且追加
        url_lists=[url+'
    ' for url in url_list];   #添加回车
        file_handle.writelines(url_lists);
        file_handle.close();
    
    #从详细页中得到照片的url 返回列表
    def get_url(html):
        image_url=[];
        try:
            if len(html)==0:   #传参是空的
                return [];
            soup = bs4.BeautifulSoup(html,'html.parser');   #解析html代码
            for img in soup.find_all('img'):    #这个循环用来得到
                if 'type3' in img['data-avaurl']:
                    str2 = img['data-avaurl'].replace('type3','https://xxxxxx9.com');
                if 'type4' in img['data-avaurl']:
                    str2 = img['data-avaurl'].replace('type4','https://xxxxxx4.com');
                if 'type5' in img['data-avaurl']:
                    str2 = img['data-avaurl'].replace('type5','https://xxxxxx1.com');
                image_url.append(str2);
        except Exception as e:  #返回空列表
            print('发生错误: '+e);
            return [];
        return image_url;
    
    #得到每个id 对应的详情页url 返回列表
    def get_idurl(html):
        idurl_list=[];
        if len(html)==0:    #传参为空 直接返回
            return [];
        for item in json.loads(html)['data']['items']:
            idurl_list.append("http://www.xxxxxxx.com/photo/show?id="+str(item['id'])); #获取到每一页的url
        return idurl_list;
    
    #生产者
    def producer(url_list,in_queue):
        print('进入生产者线程');
        global flag;
        for url in url_list:
            html = open_page(url);  #获取总页json 得到每一个页的id 进而得到每个页的url
            if html == '0':
                continue;
            else:
                idurl_list = get_idurl(html);   #得到第n页的所有详情页url
                if len(idurl_list)==0:  #如果取不到url 直接进行下一页
                    continue;
                for idurl in idurl_list:
                    in_queue.put(idurl);
                    #print('生产完成一个');
        flag=1;
        print('产品生产完成');
    
    #消费者
    def consumer(in_queue):
        global count;
        global flag;
        global thread_num;
        print('进入消费者线程,队列长度: '+str(in_queue.qsize()));
        while True:
            if in_queue.qsize()<3000 and thread_num>10:   #队列中数量小于5000 并且线程数大于10 就取消一个线程
                thread_num-=1;
                return;
            html = open_page(in_queue.get());  #取得一个详情页链接开始取得源码
            if html == '0': #获取源码失败
                in_queue.task_done();   #虽然打开网页失败了 但是似乎还是得确认完成
                continue;
            image_url = get_url(html);  #得到详情页图片url列表
            save_url(image_url);    #保存链接
            #print('队列长度: '+str(in_queue.qsize()));
            count+=1;
            os.system('title '+'已爬组数:'+str(count)+'_队列长度:'+str(in_queue.qsize())+'_线程数:'+str(thread_num));
            in_queue.task_done();
    
    def manger_thread(in_queue):
        global thread_num;
        while True:
            if in_queue.qsize()>3000 and thread_num<80: #设置最大线程80
                consumer_thread = Thread(target=consumer,args=(in_queue,));
                consumer_thread.daemon = True;
                consumer_thread.start();
                thread_num+=1;
                
        
    if __name__=='__main__':
        start_time = time.time();
        url_list = [];    #构造的产品集合
        in_queue = Queue(); #次级产品队列
        queue = Queue();    #线程队列
    
        #首先构造产品队列
        for i in range(1,11613):
            url_list.append("http://www.xxxxxxx.com/photo/json?page="+str(i));
        print('产生链接完成');
        
        producer_thread = Thread(target=producer,args=(url_list,in_queue,)); #创建生产者线程
        producer_thread.daemon = True;  #设置为守护线程,主线程不退出,子线程也不退出
        producer_thread.start();    #启动生产者线程,生产url
        
        time.sleep(15);
        
        #线程管理线程
        consumer_thread = Thread(target=manger_thread,args=(in_queue,));
        consumer_thread.daemon = True;
        consumer_thread.start();
        
        in_queue.join();    #阻塞,直到所有的次级产品消耗完毕
        print('所有产品消费完成,花费时间: '+str(time.time()-start_time)+'已爬组数: '+count);
        exit();

      因为我自己也是才开始写爬虫的原因,上面的代码很粗糙,,,但是我发誓,我有用心写。

      代码的缺点也很明显,就是不停的销毁线程,创建线程很耗费资源,,,这里需要改进,也许需要使用线程池(我的服务器CPU满载了,惊喜的是 网络页满载了,意味着,基本上速度最快了(带宽瓶颈

      动态调整线程的原因是因为,列表页的服务器和详情页图片的服务器不一样,这就意味着有时候任务队列中任务很多,有时候消费者又会饿着,浪费时间。

      还有就是,这次的目标网站几乎没有反爬措施(如果详情页图片链接需要替换不算反爬措施),,, 所以很顺利,也能很暴力 但是更多的网站都是有反爬的。。。需要混合代理服务器

      需要运行代码调试学习交流的朋友请在评论区留言或者发私信

      希望能帮助大家,更希望有大佬指导 谢谢 ^ _ ^

  • 相关阅读:
    vue系列---identify(生成图片验证码)插件
    vue中的锚链接跳转问题
    vue中怎样实现 路由拦截器
    Vue生命周期和考点
    Vue如何使用vue-area-linkage实现地址三级联动效果
    JS的Key-Val(键值对)设置Key为动态的方法
    web开发——在网页中引用字体包(.ttf),即嵌入特殊字体
    spring boot 实现多个 interceptor 并指定顺序
    BigDecimal加减乘除计算
    乐观锁解决并发问题
  • 原文地址:https://www.cnblogs.com/cjdty/p/11349560.html
Copyright © 2011-2022 走看看