zoukankan      html  css  js  c++  java
  • 利用爬虫技术采集国外肺炎疫情数据

    前言:随着国内肺炎疫情的逐渐好转,国外的疫情却越来越严重,其中原因,相必大家都心领神会,想到这里,我打算采用自身所学的技术采集下最新的国外数据,掌握最新的动态,希望能有一天也能看到不再增长的疫情传播。

    前期准备:作为爬虫,我们首先就要选择一个合适的目标网站,这里我们选择的是丁香园的数据,如下图所示

    找到目标网站后,我们需要对网站提供的数据来源进行分析,找到其真实的数据请求,我们打开浏览器的F12,看下network里的请求,从上到下依次分析,当我们宣召到如下图所示的请求中,可以发现他的响应中似乎包含了我们想要的数据,当然这只是不确定的猜测,我们可以继续往下找,如果下面没有更符合的,那么说明这个就极有可能是我们想要的;

    确定是这个请求后,我们需要分析这个响应的内容

    通过分析我们发现他的数据是以脚本的方式存在于代码里的,那么我们岂不是可以直接通过正则就可以得到这些数据了么,因此我先进行了如下测试

    res = requests.get(self.url)
    res.encoding = 'utf - 8'
    pat0 = re.compile('window.getListByCountryTypeService2true = ([sS]*?)</script>')
    data_list = pat0.findall(res.text)
    data = data_list[0].replace('}catch(e){}', '')
    true = True
    false = False
    data = eval(data)

    这个地方的self.url就是上面我们说到的请求,这段代码的大致意思是我们通过requests库发起这个请求,解析响应的数据,通过正则表达式获取window.getListByCountryTypeService2true这一段的代码,其中就包含了我们需要的数据,然后过滤不需要的一些字符串。

    获取到数据后,我们可以观察下,它的数据是JSON数组的格式,其实到这里我们就已经拿到我们想要的了

    这里我们可以看到他这个里面statisticsData字段,这个数据我们通过分析得知这是每个国家的一些详情数据,这也是一个请求,直接用requests请求便可以获取到其中的数据,类似下图的代码

    def getStatisticsData(self,url,province_name):
            reponse = requests.get(url)
            json_data = reponse.json()
            data = json_data['data']
            for item in data:
                dateId = item['dateId']
                item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
                item['province_name'] = province_name
                self.save_incr_mysql(item)
            # 保存所有数据至json文件
            update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
            self.save_incr_json(data,province_name,update_time)

    下面一步就是将数据保存到数据库,核心代码如下

    # 保存数据增长趋势到数据库
        def save_incr_mysql(self,item):
            # 
            query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
            values = [item['modify_time'],item['province_name']]
            self.cursor.execute(query_sql,values)
            data = self.cursor.fetchone()
            if(data['count'] == 0):
                sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr
                ,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
                ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
                values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
                ,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]      
                self.cursor.execute(sql,values)
                self.conn.commit()
    
        # 保存最新的数据到数据库
        def save_last_mysql(self,item):
            # 更新所有历史数据的is_new字段
            update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
            values= [item['province_name']]
            self.cursor.execute(update_sql,values)
            self.conn.commit()
            sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count
            ,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
            ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
            values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
            ,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]      
            self.cursor.execute(sql,values)
            self.conn.commit()

    无非就是只保存最新的数据到数据库而已

    完整的爬虫代码如下:

    import re
    import time
    import json
    import datetime
    import requests
    import pymysql
    import pandas as pd
    import os
    
    """
    采集丁香园国外的疫情数据
    """
    class VirusSupervise(object):
        def __init__(self):
            self.url = 'https://3g.dxy.cn/newh5/view/pneumonia?scene=2&amp;clicktime=1579582238&amp;enterid=1579582238&amp;from=timeline&amp;isappinstalled=0'
            self.all_data = list()
            host_ip = "127.0.0.1"  # 你的mysql服务器地址
            host_user = "xxxxx" #你的数据库用户名
            password = "xxxx"  # 你的mysql密码
            db = 'feiyanyiqing'
            port = 3306
            charset= 'utf8'
            self.conn = pymysql.connect(host=host_ip, port=port, user=host_user, passwd=password, db=db, charset=charset)
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def request_page(self):
            """
            请求页面数据
            """
            res = requests.get(self.url)
            res.encoding = 'utf - 8'
            pat0 = re.compile('window.getListByCountryTypeService2true = ([sS]*?)</script>')
            data_list = pat0.findall(res.text)
            data = data_list[0].replace('}catch(e){}', '')
            true = True
            false = False
            data = eval(data)
            return data
    
    
    
        def getStatisticsData(self,url,province_name):
            reponse = requests.get(url)
            json_data = reponse.json()
            data = json_data['data']
            for item in data:
                dateId = item['dateId']
                item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
                item['province_name'] = province_name
                self.save_incr_mysql(item)
            # 保存所有数据至json文件
            update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
            self.save_incr_json(data,province_name,update_time)
    
        def filtration_data(self):
            """
            过滤数据
            """
            data = self.request_page()
            print(data)
            result = []
            for item in data:
                # 省份/国家
                provinceName = item['provinceName']
                provinceId = item['provinceId']
                # 国家/州
                continents = item['continents']
                # 当前确诊人数
                currentConfirmedCount = item['currentConfirmedCount']
                # 确诊总人数
                confirmedCount = item['confirmedCount']
                # 治愈人数
                curedCount = item['curedCount']
                # 死亡人数
                deadCount = item['deadCount']
                # 疑似病例
                suspectedCount = item['suspectedCount']
                # 城市类型
                countryType = item['countryType']
                # 更新时间
                if('中国' == provinceName):
                    modifyTime = datetime.datetime.now()
                else:
                    modifyTime = item['modifyTime']
                    modifyTime = float(modifyTime/1000)
                    timeArray = time.localtime(modifyTime)
                    modifyTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
                info = {'province_name':provinceName,'province_id':provinceId,'continents':continents
                ,'current_confirmed_count':currentConfirmedCount,'confirmed_count':confirmedCount,'is_new':1
                ,'cured_count':curedCount,'dead_count':deadCount,'suspected_count':suspectedCount,'country_type':countryType,'modify_time':modifyTime}
                self.save_last_mysql(info)
                result.append(info)
                # 静态数据 每日各项数据的变化
                if(hasattr(item, 'statisticsData')):
                    self.getStatisticsData(item['statisticsData'],provinceName)
    
        def save_incr_json(self,info,province_name,update_time):
            file_dir = os.path.abspath(os.path.join(os.getcwd(), "jsonData"))
            filename= file_dir+'/'+update_time
            if(not os.path.exists(filename)):
                os.makedirs(filename)
            filename = filename+'/'+province_name+'.json'
            with open(filename,'w',encoding = 'utf-8') as file_obj:
                json.dump(info,file_obj,ensure_ascii=False)
    
    
        # 保存数据增长趋势到数据库
        def save_incr_mysql(self,item):
            # 
            query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
            values = [item['modify_time'],item['province_name']]
            self.cursor.execute(query_sql,values)
            data = self.cursor.fetchone()
            if(data['count'] == 0):
                sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr
                ,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
                ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
                values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
                ,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]      
                self.cursor.execute(sql,values)
                self.conn.commit()
    
        # 保存最新的数据到数据库
        def save_last_mysql(self,item):
            # 更新所有历史数据的is_new字段
            update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
            values= [item['province_name']]
            self.cursor.execute(update_sql,values)
            self.conn.commit()
            sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count
            ,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
            ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
            values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
            ,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]      
            self.cursor.execute(sql,values)
            self.conn.commit()
    
    
    if __name__ == '__main__':
        sup = VirusSupervise()
        sup.filtration_data()
    

    做好这一步我们的采集工作已经完成了,美中不足的是每次都要我主动去运行,很不方便,因此我编写了一个定时器去每天定时采集数据,代码如下

    import time
    import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
    from feiyan_spider import VirusSupervise
    
    # 错误监控
    def my_listener(event):
        if event.exception:
            print ('任务出错了!!!!!!')
        else:
            print ('任务照常运行...')
    
    def feiyan_spider():
        print("开始采集:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
        vs = VirusSupervise()
        vs.filtration_data()
    
    # 任务
    def start():
        print('创建任务')
        #创建调度器:BlockingScheduler
        scheduler = BlockingScheduler()
        scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 添加爬取的定时任务:每天的早上7点执行
        scheduler.add_job(feiyan_spider, 'cron', hour=8, minute=10)
        scheduler.start()
    
    
    if __name__ == "__main__":
        start()

    至此,全部工作就算完成了,其实是很简单的,无非就是要多花点心思去研究目标网站,下一篇文章将介绍我利用这些数据开发的基于python的flask框架的肺炎疫情分析系统。

    本文首发于https://www.bizhibihui.com/blog/article/29

  • 相关阅读:
    Java+7入门经典 -1 简介
    优化算法动画演示Alec Radford's animations for optimization algorithms
    如何写科技论文How to write a technical paper
    开始学习深度学习和循环神经网络Some starting points for deep learning and RNNs
    用500行Julia代码开始深度学习之旅 Beginning deep learning with 500 lines of Julia
    用10张图来看机器学习Machine learning in 10 pictures
    ICLR 2013 International Conference on Learning Representations深度学习论文papers
    ICLR 2014 International Conference on Learning Representations深度学习论文papers
    卷积神经网络CNN(Convolutional Neural Networks)没有原理只有实现
    卷积神经网络Convolutional Neural Networks
  • 原文地址:https://www.cnblogs.com/chengrongkai/p/13043736.html
Copyright © 2011-2022 走看看