zoukankan      html  css  js  c++  java
  • 爬虫实战(二)—利用requests、selenium爬取王者官网、王者营地APP数据及pymongo详解

    概述

    可关注微信订阅号 loak 查看实际效果。

    代码已托管github,地址为:https://github.com/luozhengszj/LOLGokSpider ,包括了项目的所有代码。

    本文主要介绍以下内容:

    • 使用selenium 和 requests爬取王者荣耀官网、王者营地APP数据,使用BeautifulSoup和正则进行数据解析;
    • 爬取的IP代理使用了redis搭建的代理池;
    • 数据通过pymongo保存到Mongodb中,并且通过提供接口进行数据的查询与显示;
    • 项目运行日志记录功能logging;
    • centos服务器的定时任务;
    • 数据的查询与显示可通过订阅号进行方便的查询。

    爬虫实现

    跟上一篇博文类似,我们想要爬取某些数据,同样首先要分析网站、APP程序的页面,确定要爬取的元素->分析网站及元素的加载->抓取数据

    • 确定爬取的元素
      我们百度搜索王者荣耀,进入其官网,可以发现有一个下拉选项“英雄资料”,地址为:https://pvp.qq.com/web201605/herolist.shtmlweb201605/herolist.shtml , 在这里我们可以看到所有英雄。
      点击某个英雄,进入详情页面,我们可以发现官网有对该英雄的简介、技能、加点、出装、铭文的推荐。所以我们可以确定这些内容可以从这里进行爬取。再进行分析“王者营地”APP(博主分析时,版本为:3.44.204),选择 战绩->游戏工具->英雄榜,我们可以看到该APP对实时数据的显示,主要包括了上下中辅野的热度、胜率、登场率、ban率、技能、出装、铭文、克制、被克制的内容。
      通过上面的分析,确定了在王者官网爬取英雄的铭文、出装、技能,在王者营地APP爬取 热度、胜率、登场率、ban率、克制、被克制

    • 分析网站及元素的加载

    • 抓取数据
      通过以上的分析,我们直接可以编写代码了。为了简单,王者荣耀官网数据的抓取,直接使用chrome selenium,而王者营地的则采用request模块。
      官网爬取某一英雄详细信息:

      def get_one_hero_detail(hero_url, gok_hero):
         proxy = get_proxy()
         chrome_options.add_argument('--proxy-server=http://' + proxy)
         browser.get(hero_url)
      
         tmp1 = wait.until(
             EC.presence_of_element_located(
                 (By.XPATH, '/html/body/div[3]/div[2]/div/div[2]/div[1]/div[2]/p[1]/span'))).text
         tmp2 = browser.find_element_by_xpath('/html/body/div[3]/div[2]/div/div[2]/div[1]/div[2]/p[3]/span').text
         gok_hero.skill = ['主:' + tmp1, '副:' + tmp2]
      
         zh_skill = browser.find_element_by_xpath('/html/body/div[3]/div[2]/div/div[2]/div[1]/div[2]/p[5]/span').text
         gok_hero.zh_skill = zh_skill
      
         mingwen1 = browser.find_element_by_xpath('/html/body/div[3]/div[2]/div/div[1]/div[3]/div[2]/ul/li[1]/p[1]/em').text
         mingwen2 = browser.find_element_by_xpath('/html/body/div[3]/div[2]/div/div[1]/div[3]/div[2]/ul/li[2]/p[1]/em').text
         mingwen3 = browser.find_element_by_xpath('/html/body/div[3]/div[2]/div/div[1]/div[3]/div[2]/ul/li[3]/p[1]/em').text
         gok_hero.mingwen = [mingwen1, mingwen2, mingwen3]
      
         browser.implicitly_wait(5)
         builds = browser.find_elements_by_xpath('//*[@id="Jname"]')
         list_tmp = []
         for item in builds:
             list_tmp.append(item.get_attribute("innerHTML"))
         gok_hero.first_build = list_tmp[:6]
         gok_hero.second_build = list_tmp[6:12]
      
         time.sleep(1)
         return gok_hero
      

      王者营地APP根据位置爬取数据及解析的代码:

      all_hero_msg = []
      
      
      def get_hero_rank(lu):
          retry_count = 5
          proxy = get_proxy()
          while retry_count > 0:
              try:
                  # 获取英雄列表
                  proxies = {
                      "http": "http://" + proxy
                  }
                  data_tmp = gok_interface_log['post_data_20190623']
                  data_tmp.update({'position': lu})
                  herohtml = requests.post(url=gok_interface_log['post_url_20190623'],
                                           data=data_tmp,
                                           proxies=proxies).text
                  return herohtml
              except urllib.error.URLError as e:
                  if isinstance(e.reason, socket.timeout):
                      retry_count -= 1
                      if retry_count == 2:
                          # 出错3, 删除代理池中代理
                          delete_proxy(proxy)
                          proxy = get_proxy()
              except Exception as e:
                  log.logger.error('get_hero_rank爬取失败!' + str(e)+lu)
                  if retry_count == 3:
                      delete_proxy(proxy)
                      proxy = get_proxy()
          return None
      
      
      def parse_hero_rank(rank_data, version, position):
          rank_data = ast.literal_eval(rank_data)
          hero_rank_list = str(rank_data.get('data').get('list'))[1:-1]
          hero_items = ast.literal_eval(hero_rank_list)
          for item in hero_items:
              gok = GokClass()
              gok.version = version
              gok.day = gok_config['GOK_INSERT_TIME']
              gok.heroid = item['heroId']
              gok.heroname = item['heroInfo'][0]['heroName']
              gok.herotype = position  # 英雄走哪路
              gok.herotypename = item['heroInfo'][0]['heroCareer']
              gok.tRank = item['tRank']
              gok.winpercent = item['winRate']
              gok.gameactpercnt = item['showRate']
              gok.banRate = item['banRate']
              all_hero_msg.append(gok)
      

      为了不报错,请各位还是直接查看仓库的完整代码吧~~

    redis维护IP代理池

    可以查看上一篇博文,里面有完整的操作。https://blog.csdn.net/luoz_java/article/details/92741358

    pymongo

    mongodb的可视化,我发现了一款很好的工具,就是 robo3t ,百度搜索即可,免费版的。
    官网地址:https://docs.mongodb.com/
    pymongo的操作主要为以下内容:

    • 创建集合
     #!/usr/bin/python3
      
     import pymongo
      
     client = pymongo.MongoClient(mongo_config['MONGO_URL'])
     db = client[mongo_config['MONGO_DB']]
    

    在 MongoDB 中,集合只有在内容插入后才会创建! 就是说,创建集合(数据表)后要再插入一个文档(记录),集合才会真正创建。

    • 判断集合是否已存在
     #!/usr/bin/python3
      
     import pymongo
      
     client = pymongo.MongoClient(mongo_config['MONGO_URL'])
     db = client[mongo_config['MONGO_DB']]
     collist = db. list_collection_names()
     if "sites" in collist:   # 判断 sites 集合是否存在
       print("集合已存在!")
    
    • 插入集合

      • insert_one()
      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
      search_set = db[mongo_config['USER_FIND_TYPE']]
      tmp = search_set.insert_one({'user_id': user_id, 'game_type': type})
      # insert_one() 方法返回 InsertOneResult 对象,该对象包含 inserted_id 属性,它是插入文档的 id 值。
      print(tmp.inserted_id) # 5b2369cac315325f3698a1cf
      if tmp:
          return 'success'
      return None
      
      • insert_many()
      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
          search_set = db[mongo_config['USER_FIND_TYPE']]
          mylist = [
        { "name": "Taobao", "alexa": "100", "url": "https://www.taobao.com" },
        { "name": "QQ", "alexa": "101", "url": "https://www.qq.com" },
        { "name": "Facebook", "alexa": "10", "url": "https://www.facebook.com" },
        { "name": "知乎", "alexa": "103", "url": "https://www.zhihu.com" },
        { "name": "Github", "alexa": "109", "url": "https://www.github.com" }
      	]
       
      x = search_set.insert_many(mylist)
      	 
      # 输出插入的所有文档对应的 _id 值
      print(x.inserted_ids)
      # [ObjectId('5b236aa9c315325f5236bbb6'), ObjectId('5b236aa9c315325f5236bbb7'), ObjectId('5b236aa9c315325f5236bbb8'), ObjectId('5b236aa9c315325f5236bbb9'), ObjectId('5b236aa9c315325f5236bbba')]
      # insert_many() 方法返回 InsertManyResult 对象,该对象包含 inserted_ids 属性,该属性保存着所有插入文档的 id 值。
      
      • 插入指定 _id 的多个文档
      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
      search_set = db[mongo_config['USER_FIND_TYPE']]
          mylist = [
      	  { "_id": 1, "name": "RUNOOB", "cn_name": "菜鸟教程"},
      	  { "_id": 2, "name": "Google", "address": "Google 搜索"},
      	  { "_id": 3, "name": "Facebook", "address": "脸书"},
      	  { "_id": 4, "name": "Taobao", "address": "淘宝"},
      	  { "_id": 5, "name": "Zhihu", "address": "知乎"}
      	]
      	 
      x = mycol.insert_many(mylist)
      	 
      # 输出插入的所有文档对应的 _id 值
      print(x.inserted_ids)
      # [1, 2, 3, 4, 5]
      
    • 查询

      • find_one()
      • find()
      • 查询指定字段的数据
      • 根据指定条件查询
      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
      mycol = mydb[mongo_config['USER_FIND_TYPE']]
      
      x = mycol.find_one()
      
      list_col = mycol.find()
      
      # 将要返回的字段对应值设置为 1
      # 除了 _id 你不能在一个对象中同时指定 01,如果你设置了一个字段为 0,则其他都为 1,反之亦然。
      for x in mycol.find({},{ "_id": 0, "name": 1, "alexa": 1 }):
      	print(x)
      
      # 同时指定了 01 则会报错
      for x in mycol.find({},{ "name": 1, "alexa": 0 }):
      	print(x)
      
      # 根据条件查找
      mydoc = mycol.find_one({'user_id': user_id, 'game_type': type})
      
      • 高级查询
        与或非,与不用说了。下面说或、非
        limit()
        排序
      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
      mycol = mydb[mongo_config['USER_FIND_TYPE']]
      
      # 或
      x = mycol.find_one({"$or": [{"name": hero_another_name}, {"another1": hero_another_name}]})
      
      # 非
      # 读取 name 字段中第一个字母为 "R" 的数据,正则表达式修饰符条件为 {"$regex": "^R"} :
      x = mycol.find({ "name": { "$regex": "^R" } })
      
      # $in,常用于判断列表name是否存在luozheng元素
      x = mycol.find_one({"name": {'$in':['luozheng']}})
      
      # $regex,也可以写正则
      # 适用于匹配,如果字段a的值为'abc',如果我们想知道name的值是否包含‘b’,可以这样做find({'name':{'$regex':'b'}})
      x = mycol.find_one({"name": {'$regex':['luo']}})
      
      
      # limit()
      myresult = mycol.find().limit(3)
      
      # 同时指定了 01 则会报错
      for x in mycol.find({},{ "name": 1, "alexa": 0 }):
      	print(x)
      
      # sort  升序:pymongo.ASCENDING ( 1 ) 、 降序:pymongo.DESCENDING ( -1 )
      mydoc = mycol.sort([("field1",pymongo.ASCENDING), ("field2",pymongo.DESCENDING)])
      
    • 更新
      update_one()
      update_many()

      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
      mycol = mydb[mongo_config['USER_FIND_TYPE']]
      
      # update_one 该方法第一个参数为查询的条件,第二个参数为要修改的字段。
      mycol.update_one({ "alexa": "10000" }, { "$set": { "alexa": "12345" } })
      
      # 实例将查找所有以 F 开头的 name 字段,并将匹配到所有记录的 alexa 字段修改为 123:
      myquery = { "name": { "$regex": "^F" } }
      newvalues = { "$set": { "alexa": "123" } }
       
      x = mycol.update_many(myquery, newvalues)
      
    • 删除
      delete_one()
      delete_many()
      drop()

      #!/usr/bin/python3
       
      import pymongo
       
      client = pymongo.MongoClient(mongo_config['MONGO_URL'])
      db = client[mongo_config['MONGO_DB']]
      mycol = mydb[mongo_config['USER_FIND_TYPE']]
      
      # delete_one
      mycol.delete_one({ "name": "Taobao" })
      
      # delete_many
      myquery = { "name": {"$regex": "^F"} }
      
      x = mycol.delete_many(myquery)
      
      # delete_many() 方法如果传入的是一个空的查询对象,则会删除集合中的所有文档
      x = mycol.delete_many({})
      
      # 删除集合
      # 如果删除成功 drop() 返回 true,如果删除失败(集合不存在)则返回 false。
      mycol.drop()
      

    日志记录功能

    参照博文: https://www.cnblogs.com/nancyzhu/p/8551506.html

    • 介绍
      logging提供了一组便利的函数,用来做简单的日志。它们是 debug()、 info()、 warning()、 error() 和 critical()。

      默认等级是WARNING,这意味着仅仅这个等级及以上的才会反馈信息,除非logging模块被用来做其它事情。
      logging函数根据它们用来跟踪的事件的级别或严重程度来命名。标准级别及其适用性描述如下(以严重程度递增排序):

    级别何时使用
    DEBUG详细信息,一般只在调试问题时使用。
    INFO证明事情按预期工作。
    WARNING某些没有预料到的事件的提示,或者在将来可能会出现的问题提示。例如:磁盘空间不足。但是软件还是会照常运行。
    ERROR由于更严重的问题,软件已不能执行一些功能了。
    CRITICAL严重错误,表明软件已不能继续运行了。
    • 代码实例
    import logging
    from logging import handlers
    
    class Logger(object):
     level_relations = {
         'debug':logging.DEBUG,
         'info':logging.INFO,
         'warning':logging.WARNING,
         'error':logging.ERROR,
         'crit':logging.CRITICAL
     }#日志级别关系映射
    
     def __init__(self,filename,level='info',when='D',backCount=3,fmt='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'):
         self.logger = logging.getLogger(filename)
         format_str = logging.Formatter(fmt)#设置日志格式
         self.logger.setLevel(self.level_relations.get(level))#设置日志级别
         sh = logging.StreamHandler()#往屏幕上输出
         sh.setFormatter(format_str) #设置屏幕上显示的格式
         th = handlers.TimedRotatingFileHandler(filename=filename,when=when,backupCount=backCount,encoding='utf-8')#往文件里写入#指定间隔时间自动生成文件的处理器
         #实例化TimedRotatingFileHandler
         #interval是时间间隔,backupCount是备份文件的个数,如果超过这个个数,就会自动删除,when是间隔的时间单位,单位有以下几种:
         # S 秒
         # M 分
         # H 小时、
         # D 天、
         # W 每星期(interval==0时代表星期一)
         # midnight 每天凌晨
         th.setFormatter(format_str)#设置文件里写入的格式
         self.logger.addHandler(sh) #把对象加到logger里
         self.logger.addHandler(th)
    if __name__ == '__main__':
     log = Logger('all.log',level='debug')
     log.logger.debug('debug')
     log.logger.info('info')
     log.logger.warning('警告')
     log.logger.error('报错')
     log.logger.critical('严重')
     Logger('error.log', level='error').logger.error('error')         
    

    centos服务器的定时任务

    编辑定时任务:crontab -e
    查看定时任务:crontab -l
    如果是命令需要先后执行,可以使用 &&
    如果是后台运行并且多命令,记得先运行命令在nohup。
    例如以下是先杀进行,在启动进程:

      #定时重启服务
      2 0 * * * ps -ef | grep wxWeb.py | grep -v grep | awk '{print $2}' | xargs kill -9
      4 0 * * * cd /home/LOLGokSpider/Web && nohup /home/LOLGokEnv/bin/python /home/LOLGokSpider/Web/wxWeb.py > /home/LOLGokSpider/Web/wxRun.log 2>&1 &
    

    个人博客:Loak 正 - 关注人工智能及互联网的个人博客
    文章地址:爬虫实战(二)—利用requests、selenium爬取王者官网、王者营地APP数据及pymongo详解

  • 相关阅读:
    骨骼动画的原理及在Unity中的使用
    GUI之ScrollView的使用
    Java编程技术之浅析Java容器技术
    Linux系统 Centos7 环境基于Docker部署Rocketmq服务
    Linux系统环境基于Docker搭建Mysql数据库服务实战
    Java编程技术之浅析JVM内存
    Java编程技术之浅析SPI服务发现机制
    Linux系统环境基于Docker搭建系统基础镜像
    Linux Centos7 环境搭建Docker部署Zookeeper分布式集群服务实战
    Linux Centos7 环境基于Docker部署Zookeeper服务搭建实战
  • 原文地址:https://www.cnblogs.com/l0zh/p/13739733.html
Copyright © 2011-2022 走看看