zoukankan      html  css  js  c++  java
  • Scrapy实战篇(七)之爬取爱基金网站基金业绩数据

      本篇我们以scrapy+selelum的方式来爬取爱基金网站(http://fund.10jqka.com.cn/datacenter/jz/)的基金业绩数据.

      思路:我们以http://fund.10jqka.com.cn/datacenter/jz/网站作为起始,首先抓取页面中基金的详细页面地址,类似于http://fund.10jqka.com.cn/004551/的链接,在组装成http://fund.10jqka.com.cn/004551/historynet.html#historynet形式的地址,抓取其页面下的净值列表数据。

    1、首先我们定义需要抓取的字段,如下:

    class fundajjItem(scrapy.Item):
        # define the fields for your item here like:
        # name = scrapy.Field()
        #产品编号,日期,单位净值(元),累计净值(元),fqnet,增长值(元),增长率
        product_id = Field()
        date = Field()
        net = Field()
        totalnet = Field()
        fqnet = Field()
        inc = Field()
        rate = Field()

    2、从初始地址http://fund.10jqka.com.cn/datacenter/jz/抓取基金详细地址;因为初始页面无法通过直接请求得到网页源代码,所以通过selenium直接的防止直接返回网页源代码,而基金业绩数据的页面可以通过直接请求网址得到,所以不用selenium抓取,直接使用下载器下载数据;所以在下载器中间件中我们使用网址的不同形式判断是否使用selenium返回网页源代码;

    class SeleniumMiddleware():
    #通过类方法from_crawler获取的参数必须放在__init__()方法的第一个参数位置上,除self;否则报错
    def __init__(self, MP,timeout=30 ):
    self.timeout = timeout
    self.browser = webdriver.Chrome()
    self.browser.maximize_window()
    self.browser.set_page_load_timeout(self.timeout)
    self.wait = WebDriverWait(self.browser,self.timeout)
    self.MAX_PAGE = MP

    @classmethod
    def from_crawler(cls, crawler):
    return cls(MP = crawler.settings.get('MAX_PAGE'))


    def process_request(self, request, spider):
    '''
    1、在下载器中间件中对接使用selenium,输出源代码之后,构造htmlresponse对象,直接返回给spider解析页面,提取数据,并且也不在执行下载器下载页面动作

    2、通过下载器执行器下载数据,不在通过selenium

    3、当网址是http://fund.10jqka.com.cn/datacenter/jz/时,因无法直接获取网页源代码,使用selenim直接回去源代码返回的方式处理,
    当网址是http://fund.10jqka.com.cn/000074/historynet.html#historynet类型时,使用框架的下载器下载数据,返回给spider文件处理
    通过获取spider文件的url,根据url判断是否使用selenium下载数据
    '''
    url = request.url
    if url.endswith("historynet.html#historynet"):
    return None
    else:
    self.wait = WebDriverWait(self.browser, self.timeout)
    try:

    time.sleep(5)
    self.browser.get(request.url)

    #MAX_PAGE暂时先写死,在settings中配置 可以写成动态变量
    #思路:观察目标网站,不断下拉滚动条加载新数据,每页80条数据,可以第一次获取页面总共有多少只基金产品总数据,
    #除以80即为需要下拉的次数
    for i in range(1, self.MAX_PAGE):
    #执行js代码,将滚动条下拉到最下面
    self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
    time.sleep(2)

    time.sleep(5)
    response = self.browser.page_source
    return HtmlResponse(url=request.url, body=response, request=request, encoding='utf-8',status=200)
    except TimeoutException:
    return HtmlResponse(url=request.url, status=500, request=request)
    finally:
    self.browser.close()

    另外我们在下载器中间件中设置随机获取user-agent,伪装成各种各种的浏览器,MY_USER_AGENT在settings中以列表的形式配置。

    #在下载器中间件中修改User-Agent的值,伪装成不同的浏览器
    class RandomUserAgentMiddleware():
        def __init__(self,UA):
            self.user_agents = UA
    
        @classmethod
        def from_crawler(cls, crawler):
            return cls(UA = crawler.settings.get('MY_USER_AGENT'))
    
        def process_request(self,request,spider):
            request.headers['User-Agent'] = random.choice(self.user_agents)
    
        def process_response(self,request, response, spider):
            return response

    3、编写spider文件;parse()方法我们从初始页面获取到基金详细页面的地址,并在后面拼接 historynet.html#historynet,以获取净值列表页面的地址;然后在解析历史数据页面,获取基金业绩历史数据。返回给pipeline。

    #解析由http://fund.10jqka.com.cn/datacenter/jz/ 防止返回的response,返回的网址为http://fund.10jqka.com.cn/000074/historynet.html#historynet格式
        def parse(self, response):
            soup = BeautifulSoup(response.text, 'lxml')
            funds = soup.find_all(name='tbody', id='containerMain')
            for fund in funds:
                for f in fund.find_all(name='tr', rel='tpl'):
                    for h in f.find_all(name='a', field='name'):
                        url = h['href'] + 'historynet.html#historynet'
                        yield Request(url = url , callback= self.parse_fund_detail)
    
        def parse_fund_detail(self,response):
            url = response.url
            prod_id = url.split('/')[-2]
            soup = BeautifulSoup(response.text, 'lxml')
            datas = soup.find_all(name='script', type='text/javascript')
            for data in datas:
                if data.text == '':
                    continue
                else:
                    item = fundajjItem()    #使用我们之前定义的item
                    data = data.text
                    data_list = data.split('=')
                    d_list = data_list[1].replace(';', '')
                    dj_list = json.loads(d_list)
                    for dj in dj_list:
                        dj['fundid'] = prod_id
                        item['product_id'] = dj.get('fundid')
                        item['date'] = dj.get('date')
                        item['net'] = dj.get('net')
                        item['totalnet'] = dj.get('totalnet')
                        item['fqnet'] = dj.get('fqnet')
                        item['inc'] = dj.get('inc')
                        item['rate'] = dj.get('rate')
                        yield item

    4、修改pipeline,将获取到的数据存储到mysql和mongodb数据库中。

    class MongoPipeline(object):
    
        def __init__(self,mongo_url,mongo_db,collection):
            self.mongo_url = mongo_url
            self.mongo_db = mongo_db
            self.collection = collection
    
        @classmethod
        def from_crawler(cls,crawler):
            return cls(
                mongo_url=crawler.settings.get('MONGO_URL'),
                mongo_db = crawler.settings.get('MONGO_DB'),
                collection = crawler.settings.get('COLLECTION')
            )
    
        def open_spider(self,spider):
            self.client = pymongo.MongoClient(self.mongo_url)
            self.db = self.client[self.mongo_db]
    
        def process_item(self,item, spider):
            name = self.collection
            self.db[name].insert(dict(item))
            return item
    
        def close_spider(self,spider):
            self.client.close()
    
    class PymysqlPipeline(object):
        def __init__(self,mysql_host,mysql_port,mysql_user,mysql_passwd,mysql_db):
            self.host= mysql_host
            self.port=mysql_port
            self.user=mysql_user
            self.passwd = mysql_passwd
            self.db=mysql_db
    
        @classmethod
        def from_crawler(cls,crawler):
            return cls(
                mysql_host = crawler.settings.get('MYSQL_HOST'),
                mysql_port = crawler.settings.get('MYSQL_PORT'),
                mysql_user=crawler.settings.get('MYSQL_USER'),
                mysql_passwd = crawler.settings.get('MYSQL_PASSWD'),
                mysql_db = crawler.settings.get('MYSQL_DB')
            )
    
        def open_spider(self,spider):
            self.dbconn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, port=self.port, db=self.db)
            self.dbcur = self.dbconn.cursor()
    
        def process_item(self,item, spider):
            items = dict(item)
            fund_list = []
            fund_list.append(items.get('product_id'))
            fund_list.append(items.get('date'))
            fund_list.append(float(items.get('net')))
            fund_list.append(float(items.get('totalnet')))
            fund_list.append(float(items.get('fqnet')))
            fund_list.append(float(items.get('inc')))
            fund_list.append(float(items.get('rate')))
            self.dyn_insert_sql('Fund_date',tuple(fund_list),self.dbconn,self.dbcur)
    
        def close_spider(self,spider):
            self.dbconn.close()
    
        def dyn_insert_sql(self,tablename, data, dbconn, cursor):
            tablename = tablename
            sql = "select GROUP_CONCAT(COLUMN_name,'') from information_schema.COLUMNS where table_name = %s ORDER BY ordinal_position "
            cursor.execute(sql, tablename)
            tup = cursor.fetchone()
            # 动态构造sql语句
            sql = 'INSERT INTO {table}({keys}) VALUES {values}'.format(table=tablename, keys=tup[0], values=data)
            # 使用try-except语句块控制事务的原子性
            try:
                if cursor.execute(sql):
                    dbconn.commit()
            except:
                dbconn.rollback()

    5、设置settings文件

    配置mysql和mongo的链接信息

    MONGO_URL='localhost'
    MONGO_DB='test'
    COLLECTION='Funddate'
    
    MYSQL_HOST='localhost'
    MYSQL_PORT=3306
    MYSQL_USER='root'
    MYSQL_PASSWD='123456'
    MYSQL_DB='test'

    设置item_pipeline,激活mongo和mysql的pipeline组件

    ITEM_PIPELINES = {
       'scrapyfundajj.pipelines.MongoPipeline': 300,
       'scrapyfundajj.pipelines.PymysqlPipeline': 310,
    }

    设置DOWNLOADER_MIDDLEWARES,激活自动以的下载器中间件

    DOWNLOADER_MIDDLEWARES = {
        #禁用掉框架内置的UserAgentMiddleware,使用自定义的RandomUserAgentMiddleware
       'scrapy.downloadermiddleware.useragent.UserAgentMiddleware': None,
       'scrapyfundajj.middlewares.RandomUserAgentMiddleware': 543,
       'scrapyfundajj.middlewares.SeleniumMiddleware': 544,
       'scrapyfundajj.middlewares.ScrapyfundajjDownloaderMiddleware':545,
    }

    其他设置

    设置不遵循爬取协议
    ROBOTSTXT_OBEY = False
    #下载器在下载同一个网站下一个页面前需要等待的时间。该选项可以用来限制爬取速度, 减轻服务器压力。同时也支持小数
    DOWNLOAD_DELAY = 3

    至此,所有需要的工作已经全部处理完成,运行项目,获取数据即可。

    完整代码路径:https://gitee.com/liangxinbin/Scrpay/tree/master/scrapyfundajj

     
  • 相关阅读:
    python计算文件MD5
    #每日一练 使用迭代器函数和初始种子值构建列表。
    python接收从CMD传过来的值
    ping IP,通与不通执行不同操作
    bat按回车执行命令
    adb执行简单命令,用于刷页面访问次数
    #每日一练 合并字典,并为每个字创建一个列表
    #每日一练 按提供函数,返回差异化
    #每日一练 根据过滤内容,把值分成两组
    #每日一练 根据索引对列表进行排序
  • 原文地址:https://www.cnblogs.com/lxbmaomao/p/10389874.html
Copyright © 2011-2022 走看看