本篇我们以scrapy+selelum的方式来爬取爱基金网站(http://fund.10jqka.com.cn/datacenter/jz/)的基金业绩数据.
思路:我们以http://fund.10jqka.com.cn/datacenter/jz/网站作为起始,首先抓取页面中基金的详细页面地址,类似于http://fund.10jqka.com.cn/004551/的链接,在组装成http://fund.10jqka.com.cn/004551/historynet.html#historynet形式的地址,抓取其页面下的净值列表数据。
1、首先我们定义需要抓取的字段,如下:
class fundajjItem(scrapy.Item): # define the fields for your item here like: # name = scrapy.Field() #产品编号,日期,单位净值(元),累计净值(元),fqnet,增长值(元),增长率 product_id = Field() date = Field() net = Field() totalnet = Field() fqnet = Field() inc = Field() rate = Field()
2、从初始地址http://fund.10jqka.com.cn/datacenter/jz/抓取基金详细地址;因为初始页面无法通过直接请求得到网页源代码,所以通过selenium直接的防止直接返回网页源代码,而基金业绩数据的页面可以通过直接请求网址得到,所以不用selenium抓取,直接使用下载器下载数据;所以在下载器中间件中我们使用网址的不同形式判断是否使用selenium返回网页源代码;
class SeleniumMiddleware():
#通过类方法from_crawler获取的参数必须放在__init__()方法的第一个参数位置上,除self;否则报错
def __init__(self, MP,timeout=30 ):
self.timeout = timeout
self.browser = webdriver.Chrome()
self.browser.maximize_window()
self.browser.set_page_load_timeout(self.timeout)
self.wait = WebDriverWait(self.browser,self.timeout)
self.MAX_PAGE = MP
@classmethod
def from_crawler(cls, crawler):
return cls(MP = crawler.settings.get('MAX_PAGE'))
def process_request(self, request, spider):
'''
1、在下载器中间件中对接使用selenium,输出源代码之后,构造htmlresponse对象,直接返回给spider解析页面,提取数据,并且也不在执行下载器下载页面动作
2、通过下载器执行器下载数据,不在通过selenium
3、当网址是http://fund.10jqka.com.cn/datacenter/jz/时,因无法直接获取网页源代码,使用selenim直接回去源代码返回的方式处理,
当网址是http://fund.10jqka.com.cn/000074/historynet.html#historynet类型时,使用框架的下载器下载数据,返回给spider文件处理
通过获取spider文件的url,根据url判断是否使用selenium下载数据
'''
url = request.url
if url.endswith("historynet.html#historynet"):
return None
else:
self.wait = WebDriverWait(self.browser, self.timeout)
try:
time.sleep(5)
self.browser.get(request.url)
#MAX_PAGE暂时先写死,在settings中配置 可以写成动态变量
#思路:观察目标网站,不断下拉滚动条加载新数据,每页80条数据,可以第一次获取页面总共有多少只基金产品总数据,
#除以80即为需要下拉的次数
for i in range(1, self.MAX_PAGE):
#执行js代码,将滚动条下拉到最下面
self.browser.execute_script('window.scrollTo(0, document.body.scrollHeight)')
time.sleep(2)
time.sleep(5)
response = self.browser.page_source
return HtmlResponse(url=request.url, body=response, request=request, encoding='utf-8',status=200)
except TimeoutException:
return HtmlResponse(url=request.url, status=500, request=request)
finally:
self.browser.close()
另外我们在下载器中间件中设置随机获取user-agent,伪装成各种各种的浏览器,MY_USER_AGENT在settings中以列表的形式配置。
#在下载器中间件中修改User-Agent的值,伪装成不同的浏览器 class RandomUserAgentMiddleware(): def __init__(self,UA): self.user_agents = UA @classmethod def from_crawler(cls, crawler): return cls(UA = crawler.settings.get('MY_USER_AGENT')) def process_request(self,request,spider): request.headers['User-Agent'] = random.choice(self.user_agents) def process_response(self,request, response, spider): return response
3、编写spider文件;parse()方法我们从初始页面获取到基金详细页面的地址,并在后面拼接 historynet.html#historynet,以获取净值列表页面的地址;然后在解析历史数据页面,获取基金业绩历史数据。返回给pipeline。
#解析由http://fund.10jqka.com.cn/datacenter/jz/ 防止返回的response,返回的网址为http://fund.10jqka.com.cn/000074/historynet.html#historynet格式 def parse(self, response): soup = BeautifulSoup(response.text, 'lxml') funds = soup.find_all(name='tbody', id='containerMain') for fund in funds: for f in fund.find_all(name='tr', rel='tpl'): for h in f.find_all(name='a', field='name'): url = h['href'] + 'historynet.html#historynet' yield Request(url = url , callback= self.parse_fund_detail) def parse_fund_detail(self,response): url = response.url prod_id = url.split('/')[-2] soup = BeautifulSoup(response.text, 'lxml') datas = soup.find_all(name='script', type='text/javascript') for data in datas: if data.text == '': continue else: item = fundajjItem() #使用我们之前定义的item data = data.text data_list = data.split('=') d_list = data_list[1].replace(';', '') dj_list = json.loads(d_list) for dj in dj_list: dj['fundid'] = prod_id item['product_id'] = dj.get('fundid') item['date'] = dj.get('date') item['net'] = dj.get('net') item['totalnet'] = dj.get('totalnet') item['fqnet'] = dj.get('fqnet') item['inc'] = dj.get('inc') item['rate'] = dj.get('rate') yield item
4、修改pipeline,将获取到的数据存储到mysql和mongodb数据库中。
class MongoPipeline(object): def __init__(self,mongo_url,mongo_db,collection): self.mongo_url = mongo_url self.mongo_db = mongo_db self.collection = collection @classmethod def from_crawler(cls,crawler): return cls( mongo_url=crawler.settings.get('MONGO_URL'), mongo_db = crawler.settings.get('MONGO_DB'), collection = crawler.settings.get('COLLECTION') ) def open_spider(self,spider): self.client = pymongo.MongoClient(self.mongo_url) self.db = self.client[self.mongo_db] def process_item(self,item, spider): name = self.collection self.db[name].insert(dict(item)) return item def close_spider(self,spider): self.client.close() class PymysqlPipeline(object): def __init__(self,mysql_host,mysql_port,mysql_user,mysql_passwd,mysql_db): self.host= mysql_host self.port=mysql_port self.user=mysql_user self.passwd = mysql_passwd self.db=mysql_db @classmethod def from_crawler(cls,crawler): return cls( mysql_host = crawler.settings.get('MYSQL_HOST'), mysql_port = crawler.settings.get('MYSQL_PORT'), mysql_user=crawler.settings.get('MYSQL_USER'), mysql_passwd = crawler.settings.get('MYSQL_PASSWD'), mysql_db = crawler.settings.get('MYSQL_DB') ) def open_spider(self,spider): self.dbconn = pymysql.connect(host=self.host, user=self.user, password=self.passwd, port=self.port, db=self.db) self.dbcur = self.dbconn.cursor() def process_item(self,item, spider): items = dict(item) fund_list = [] fund_list.append(items.get('product_id')) fund_list.append(items.get('date')) fund_list.append(float(items.get('net'))) fund_list.append(float(items.get('totalnet'))) fund_list.append(float(items.get('fqnet'))) fund_list.append(float(items.get('inc'))) fund_list.append(float(items.get('rate'))) self.dyn_insert_sql('Fund_date',tuple(fund_list),self.dbconn,self.dbcur) def close_spider(self,spider): self.dbconn.close() def dyn_insert_sql(self,tablename, data, dbconn, cursor): tablename = tablename sql = "select GROUP_CONCAT(COLUMN_name,'') from information_schema.COLUMNS where table_name = %s ORDER BY ordinal_position " cursor.execute(sql, tablename) tup = cursor.fetchone() # 动态构造sql语句 sql = 'INSERT INTO {table}({keys}) VALUES {values}'.format(table=tablename, keys=tup[0], values=data) # 使用try-except语句块控制事务的原子性 try: if cursor.execute(sql): dbconn.commit() except: dbconn.rollback()
5、设置settings文件
配置mysql和mongo的链接信息
MONGO_URL='localhost' MONGO_DB='test' COLLECTION='Funddate' MYSQL_HOST='localhost' MYSQL_PORT=3306 MYSQL_USER='root' MYSQL_PASSWD='123456' MYSQL_DB='test'
设置item_pipeline,激活mongo和mysql的pipeline组件
ITEM_PIPELINES = { 'scrapyfundajj.pipelines.MongoPipeline': 300, 'scrapyfundajj.pipelines.PymysqlPipeline': 310, }
设置DOWNLOADER_MIDDLEWARES,激活自动以的下载器中间件
DOWNLOADER_MIDDLEWARES = { #禁用掉框架内置的UserAgentMiddleware,使用自定义的RandomUserAgentMiddleware 'scrapy.downloadermiddleware.useragent.UserAgentMiddleware': None, 'scrapyfundajj.middlewares.RandomUserAgentMiddleware': 543, 'scrapyfundajj.middlewares.SeleniumMiddleware': 544, 'scrapyfundajj.middlewares.ScrapyfundajjDownloaderMiddleware':545, }
其他设置
设置不遵循爬取协议 ROBOTSTXT_OBEY = False #下载器在下载同一个网站下一个页面前需要等待的时间。该选项可以用来限制爬取速度, 减轻服务器压力。同时也支持小数 DOWNLOAD_DELAY = 3
至此,所有需要的工作已经全部处理完成,运行项目,获取数据即可。
完整代码路径:https://gitee.com/liangxinbin/Scrpay/tree/master/scrapyfundajj