zoukankan      html  css  js  c++  java
  • 利用爬虫技术采集国外肺炎疫情数据

    前言:随着国内肺炎疫情的逐渐好转,国外的疫情却越来越严重,其中原因,相必大家都心领神会,想到这里,我打算采用自身所学的技术采集下最新的国外数据,掌握最新的动态,希望能有一天也能看到不再增长的疫情传播。

    前期准备:作为爬虫,我们首先就要选择一个合适的目标网站,这里我们选择的是丁香园的数据,如下图所示

    找到目标网站后,我们需要对网站提供的数据来源进行分析,找到其真实的数据请求,我们打开浏览器的F12,看下network里的请求,从上到下依次分析,当我们宣召到如下图所示的请求中,可以发现他的响应中似乎包含了我们想要的数据,当然这只是不确定的猜测,我们可以继续往下找,如果下面没有更符合的,那么说明这个就极有可能是我们想要的;

    确定是这个请求后,我们需要分析这个响应的内容

    通过分析我们发现他的数据是以脚本的方式存在于代码里的,那么我们岂不是可以直接通过正则就可以得到这些数据了么,因此我先进行了如下测试

    res = requests.get(self.url)
    res.encoding = 'utf - 8'
    pat0 = re.compile('window.getListByCountryTypeService2true = ([sS]*?)</script>')
    data_list = pat0.findall(res.text)
    data = data_list[0].replace('}catch(e){}', '')
    true = True
    false = False
    data = eval(data)

    这个地方的self.url就是上面我们说到的请求,这段代码的大致意思是我们通过requests库发起这个请求,解析响应的数据,通过正则表达式获取window.getListByCountryTypeService2true这一段的代码,其中就包含了我们需要的数据,然后过滤不需要的一些字符串。

    获取到数据后,我们可以观察下,它的数据是JSON数组的格式,其实到这里我们就已经拿到我们想要的了

    这里我们可以看到他这个里面statisticsData字段,这个数据我们通过分析得知这是每个国家的一些详情数据,这也是一个请求,直接用requests请求便可以获取到其中的数据,类似下图的代码

    def getStatisticsData(self,url,province_name):
            reponse = requests.get(url)
            json_data = reponse.json()
            data = json_data['data']
            for item in data:
                dateId = item['dateId']
                item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
                item['province_name'] = province_name
                self.save_incr_mysql(item)
            # 保存所有数据至json文件
            update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
            self.save_incr_json(data,province_name,update_time)

    下面一步就是将数据保存到数据库,核心代码如下

    # 保存数据增长趋势到数据库
        def save_incr_mysql(self,item):
            # 
            query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
            values = [item['modify_time'],item['province_name']]
            self.cursor.execute(query_sql,values)
            data = self.cursor.fetchone()
            if(data['count'] == 0):
                sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr
                ,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
                ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
                values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
                ,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]      
                self.cursor.execute(sql,values)
                self.conn.commit()
    
        # 保存最新的数据到数据库
        def save_last_mysql(self,item):
            # 更新所有历史数据的is_new字段
            update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
            values= [item['province_name']]
            self.cursor.execute(update_sql,values)
            self.conn.commit()
            sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count
            ,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
            ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
            values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
            ,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]      
            self.cursor.execute(sql,values)
            self.conn.commit()

    无非就是只保存最新的数据到数据库而已

    完整的爬虫代码如下:

    import re
    import time
    import json
    import datetime
    import requests
    import pymysql
    import pandas as pd
    import os
    
    """
    采集丁香园国外的疫情数据
    """
    class VirusSupervise(object):
        def __init__(self):
            self.url = 'https://3g.dxy.cn/newh5/view/pneumonia?scene=2&amp;clicktime=1579582238&amp;enterid=1579582238&amp;from=timeline&amp;isappinstalled=0'
            self.all_data = list()
            host_ip = "127.0.0.1"  # 你的mysql服务器地址
            host_user = "xxxxx" #你的数据库用户名
            password = "xxxx"  # 你的mysql密码
            db = 'feiyanyiqing'
            port = 3306
            charset= 'utf8'
            self.conn = pymysql.connect(host=host_ip, port=port, user=host_user, passwd=password, db=db, charset=charset)
            self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
    
        def request_page(self):
            """
            请求页面数据
            """
            res = requests.get(self.url)
            res.encoding = 'utf - 8'
            pat0 = re.compile('window.getListByCountryTypeService2true = ([sS]*?)</script>')
            data_list = pat0.findall(res.text)
            data = data_list[0].replace('}catch(e){}', '')
            true = True
            false = False
            data = eval(data)
            return data
    
    
    
        def getStatisticsData(self,url,province_name):
            reponse = requests.get(url)
            json_data = reponse.json()
            data = json_data['data']
            for item in data:
                dateId = item['dateId']
                item['modify_time'] = time.strptime(str(dateId), "%Y%m%d")
                item['province_name'] = province_name
                self.save_incr_mysql(item)
            # 保存所有数据至json文件
            update_time = time.strftime('%Y-%m-%d',time.localtime(time.time()))
            self.save_incr_json(data,province_name,update_time)
    
        def filtration_data(self):
            """
            过滤数据
            """
            data = self.request_page()
            print(data)
            result = []
            for item in data:
                # 省份/国家
                provinceName = item['provinceName']
                provinceId = item['provinceId']
                # 国家/州
                continents = item['continents']
                # 当前确诊人数
                currentConfirmedCount = item['currentConfirmedCount']
                # 确诊总人数
                confirmedCount = item['confirmedCount']
                # 治愈人数
                curedCount = item['curedCount']
                # 死亡人数
                deadCount = item['deadCount']
                # 疑似病例
                suspectedCount = item['suspectedCount']
                # 城市类型
                countryType = item['countryType']
                # 更新时间
                if('中国' == provinceName):
                    modifyTime = datetime.datetime.now()
                else:
                    modifyTime = item['modifyTime']
                    modifyTime = float(modifyTime/1000)
                    timeArray = time.localtime(modifyTime)
                    modifyTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
                info = {'province_name':provinceName,'province_id':provinceId,'continents':continents
                ,'current_confirmed_count':currentConfirmedCount,'confirmed_count':confirmedCount,'is_new':1
                ,'cured_count':curedCount,'dead_count':deadCount,'suspected_count':suspectedCount,'country_type':countryType,'modify_time':modifyTime}
                self.save_last_mysql(info)
                result.append(info)
                # 静态数据 每日各项数据的变化
                if(hasattr(item, 'statisticsData')):
                    self.getStatisticsData(item['statisticsData'],provinceName)
    
        def save_incr_json(self,info,province_name,update_time):
            file_dir = os.path.abspath(os.path.join(os.getcwd(), "jsonData"))
            filename= file_dir+'/'+update_time
            if(not os.path.exists(filename)):
                os.makedirs(filename)
            filename = filename+'/'+province_name+'.json'
            with open(filename,'w',encoding = 'utf-8') as file_obj:
                json.dump(info,file_obj,ensure_ascii=False)
    
    
        # 保存数据增长趋势到数据库
        def save_incr_mysql(self,item):
            # 
            query_sql = 'select count(1) as count from feiyan_incr where modify_time = %s and province_name = %s'
            values = [item['modify_time'],item['province_name']]
            self.cursor.execute(query_sql,values)
            data = self.cursor.fetchone()
            if(data['count'] == 0):
                sql = ("INSERT feiyan_incr(province_name,modify_time,confirmed_count,confirmed_incr
                ,cured_count,cured_incr,dead_count,dead_incr,current_confirmed_count,current_confirmed_incr"
                ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
                values =[item['province_name'],item['modify_time'],item['confirmedCount'],item['confirmedIncr'],item['curedCount']
                ,item['curedIncr'],item['deadCount'],item['deadIncr'],item['currentConfirmedCount'],item['currentConfirmedIncr']]      
                self.cursor.execute(sql,values)
                self.conn.commit()
    
        # 保存最新的数据到数据库
        def save_last_mysql(self,item):
            # 更新所有历史数据的is_new字段
            update_sql = 'update feiyan_data set is_new = 0 where is_new = 1 and province_name = %s'
            values= [item['province_name']]
            self.cursor.execute(update_sql,values)
            self.conn.commit()
            sql = ("INSERT feiyan_data(province_name,province_id,continents,current_confirmed_count
            ,confirmed_count,cured_count,dead_count,suspected_count,country_type,modify_time,is_new"
            ") VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)")
            values =[item['province_name'],item['province_id'],item['continents'],item['current_confirmed_count'],item['confirmed_count']
            ,item['cured_count'],item['dead_count'],item['suspected_count'],item['country_type'],item['modify_time'],item['is_new']]      
            self.cursor.execute(sql,values)
            self.conn.commit()
    
    
    if __name__ == '__main__':
        sup = VirusSupervise()
        sup.filtration_data()
    

    做好这一步我们的采集工作已经完成了,美中不足的是每次都要我主动去运行,很不方便,因此我编写了一个定时器去每天定时采集数据,代码如下

    import time
    import datetime
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
    from feiyan_spider import VirusSupervise
    
    # 错误监控
    def my_listener(event):
        if event.exception:
            print ('任务出错了!!!!!!')
        else:
            print ('任务照常运行...')
    
    def feiyan_spider():
        print("开始采集:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))))
        vs = VirusSupervise()
        vs.filtration_data()
    
    # 任务
    def start():
        print('创建任务')
        #创建调度器:BlockingScheduler
        scheduler = BlockingScheduler()
        scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 添加爬取的定时任务:每天的早上7点执行
        scheduler.add_job(feiyan_spider, 'cron', hour=8, minute=10)
        scheduler.start()
    
    
    if __name__ == "__main__":
        start()

    至此,全部工作就算完成了,其实是很简单的,无非就是要多花点心思去研究目标网站,下一篇文章将介绍我利用这些数据开发的基于python的flask框架的肺炎疫情分析系统。

    本文首发于https://www.bizhibihui.com/blog/article/29

  • 相关阅读:
    Android 工程师眼里的大前端:GMTC 2018 参会总结
    Android 工程师眼里的大前端:GMTC 2018 参会总结
    Android 工程师眼里的大前端:GMTC 2018 参会总结
    你所不知道的Python | 字符串连接的秘密
    你所不知道的Python | 字符串连接的秘密
    你所不知道的Python | 字符串连接的秘密
    你所不知道的Python | 字符串连接的秘密
    java基础(一)
    java基础(一)
    《SQL Server企业级平台管理实践》读书笔记——关于SQL Server数据库的还原方式
  • 原文地址:https://www.cnblogs.com/chengrongkai/p/13043736.html
Copyright © 2011-2022 走看看