主类:
from Dao.Requests_dao import Rqst from Dao.Csv_dao import Csv def paqu(x): print('爬取:知乎“英文取名”:') url='https://www.zhihu.com/topic/19561718/top-answers' RQST=Rqst()#初始化爬取工具对象 CSV=Csv()#初始化存储工具对象 #获取首页docx print('首页url:'+url) try: docx=RQST._init_(url) docx.raise_for_status() print('首页连接成功!') except: print('error:首页连接失败!') print('------------------------------------------------------------------------------') #爬取首页内所有的问题的url,指定数量,返回url列表 rs=RQST.find_all_question(x) #遍历所有的问题的url number=0 for i in rs: number+=1 print(number) data=RQST.get_content(i) CSV.save(data) if __name__ == '__main__': paqu(1)
爬取类:
import requests from bs4 import BeautifulSoup import urllib import json import re import os # import io # import sys # sys.stdout = io.TextIOWrapper(sys.stdout.buffer,encoding='gb18030') class Rqst: def _init_(self,url): headers={ 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36' } docx=requests.get(url,headers=headers) docx.encoding='utf-8' return docx #获取所有的url,形成列表返回 def find_all_question(self,number): #该界面为动态网页,所以用接口爬取json获取url apiurl='https://www.zhihu.com/api/v4/topics/19561718/feeds/essence?include=data%5B%3F%28target.type%3Dtopic_sticky_module%29%5D.target.data%5B%3F%28target.type%3Danswer%29%5D.target.content%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%3Bdata%5B%3F%28target.type%3Dtopic_sticky_module%29%5D.target.data%5B%3F%28target.type%3Danswer%29%5D.target.is_normal%2Ccomment_count%2Cvoteup_count%2Ccontent%2Crelevant_info%2Cexcerpt.author.badge%5B%3F%28type%3Dbest_answerer%29%5D.topics%3Bdata%5B%3F%28target.type%3Dtopic_sticky_module%29%5D.target.data%5B%3F%28target.type%3Darticle%29%5D.target.content%2Cvoteup_count%2Ccomment_count%2Cvoting%2Cauthor.badge%5B%3F%28type%3Dbest_answerer%29%5D.topics%3Bdata%5B%3F%28target.type%3Dtopic_sticky_module%29%5D.target.data%5B%3F%28target.type%3Dpeople%29%5D.target.answer_count%2Carticles_count%2Cgender%2Cfollower_count%2Cis_followed%2Cis_following%2Cbadge%5B%3F%28type%3Dbest_answerer%29%5D.topics%3Bdata%5B%3F%28target.type%3Danswer%29%5D.target.annotation_detail%2Ccontent%2Chermes_label%2Cis_labeled%2Crelationship.is_authorized%2Cis_author%2Cvoting%2Cis_thanked%2Cis_nothelp%3Bdata%5B%3F%28target.type%3Danswer%29%5D.target.author.badge%5B%3F%28type%3Dbest_answerer%29%5D.topics%3Bdata%5B%3F%28target.type%3Darticle%29%5D.target.annotation_detail%2Ccontent%2Chermes_label%2Cis_labeled%2Cauthor.badge%5B%3F%28type%3Dbest_answerer%29%5D.topics%3Bdata%5B%3F%28target.type%3Dquestion%29%5D.target.annotation_detail%2Ccomment_count%3B&limit=10&offset=' headers={ 'user-agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36' } n=0#计数 rs=[]#url列表 i=0 #爬取指定条数的问题的url while True: docx=requests.get(apiurl+str(i),headers=headers) i+=10 docx.encoding='unicode' urlst=[] if os.path.exists(r"saveurl.csv"): with open('saveurl.csv','r',encoding='utf-8-sig') as f: urlst=f.readlines() for j in range(10): js=json.loads(docx.text)['data'][j] try: r='https://www.zhihu.com/question/'+str(js.get('target').get('question').get('id'))+'/answer/'+str(js.get('target').get('id')) if r+' ' in urlst: break else: with open('saveurl.csv','a',encoding='utf-8-sig') as f: f.write(r+' ') rs.append(r) n+=1 except: pass if n>=number: return rs if n>=number: return rs #获取问题 def get_content_question(self,bs): rs=bs.find('div',{'class':'QuestionHeader'}).find('h1') question=rs.text return question #获取问题回答 def get_content_answerContent(self,bs): rs=bs.find('div',{'class':'QuestionAnswer-content'}).find_all('p') content='' for i in rs: content=content+i.text respond=content.replace('/','or').replace('/n','').replace(',',',') return respond #获取作者信息 def get_content_authorInfo(self,bs): b1=bs.find('div',{'class':'ContentItem AnswerItem'}) #作者的名字 b1_1=b1.find('div',{'class':'AuthorInfo-content'}).find('a',{'data-za-detail-view-element_name':'User'}) try: author='作者名称:'+b1_1.text except: author='作者名称:匿名用户' #作者的描述 b1_2=b1.find('div',{'class':'ztext AuthorInfo-badgeText'}) try: author_describe='作者描述:'+b1_2.text.replace(' ',',') except: author_describe='作者描述:无信息' #作者的回答的赞同人数 b1_3=b1.find('meta',{'itemprop':'upvoteCount'}) try: author_agreeNumber='赞同人数:'+b1_3.get('content') except: author_agreeNumber='赞同人数:无信息' #回答发布日期 b1_4=b1.find('meta',{'itemprop':'dateCreated'}) try: author_dateCreated='发布日期:'+b1_4.get('content').replace('T','*').replace('.000Z','') except: author_dateCreated='发布日期:无信息' #回答编辑日期 b1_5=b1.find('meta',{'itemprop':'dateModified'}) try: author_dateModified='编辑日期:'+b1_5.get('content').replace('T','*').replace('.000Z','') except: author_dateModified='编辑日期:无信息' data=[ author, author_describe, author_agreeNumber, author_dateCreated, author_dateModified ] return data #获取讨论 def get_content_discuss(self,bs): #得到讨论api rs=bs.find('div',{'class':'Card AnswerCard'}).find('div',{'class':'ContentItem AnswerItem'}) url='https://www.zhihu.com/api/v4/answers/'+rs.get('name')+'/root_comments?order=normal&limit=20&offset=0&status=open' #请求api得到json docx=self._init_(url) docx.encoding='utf-8' discusses=[] for i in json.loads(docx.text).get('data'): user=i['author']['member']['name'] discuss=i['content'].replace('<p>','').replace('</p>','').replace('</br>','').replace('<br>','').replace(',',',').replace(' ','') disc=re.sub("<.*?>",'','#'+user+'#:'+discuss) discusses.append(disc) return discusses #获取所有内容 def get_content(self,url): docx=self._init_(url) try: print('链接:',url) docx.raise_for_status() print('连接成功!') except: print('error:连接失败!') try: print('正在爬取数据。。。') bs=BeautifulSoup(docx.text,'html.parser') #获取问题 question=self.get_content_question(bs) #获取问题回答 content=self.get_content_answerContent(bs) #获取作者信息 author_data=self.get_content_authorInfo(bs) #获取讨论 discusses=self.get_content_discuss(bs) data={ 'question':question, 'url':url, 'content':content, 'discusses':discusses, 'author_data':author_data } print('数据爬取成功!') return data except: print('error:数据爬取失败!')
保存类:
import os class Csv(): def save(self,data): print('正在保存数据。。。') try: judge=True if os.path.exists(r"save.csv"): judge=False with open('save.csv','a',encoding='utf-8-sig') as f: #写入表头 if judge: head=['问题','链接','回答','答主信息','讨论'] f.write(','.join(head)+' ') #写入第一行 row1=[data['question'],data['url'],data['content'],data['author_data'][0],data['discusses'][0]] f.write(','.join(row1)+' ') #写入第二行及以后内容 row=max(len(data['discusses']),len(data['author_data'])) for i in range(1,row): row=['','','','',''] row[0]='' row[1]='' row[2]='' try: row[3]=data['author_data'][i] except: row[3]='' try: row[4]=data['discusses'][i] except: row[4]='' f.write(','.join(row)+' ') f.write(' ') print('Save successfully!') except: print('error:保存失败!')