requests+lxml爬取电子书
#!/usr/bin/python3
# -*- config:utf-8 -*-
import os,sys
from lxml import etree
import requests
import time
import shutil
from urllib.parse import quote,unquote
from fake_useragent import UserAgent
import threading
class Book():
def __init__(self,book_name):
self.baseurl = "https://www.biquge.lu"
self.book = book_name
#搜索书,获取book_url
def search_book(self):
keywords = quote(self.book,'utf-8') #将书名url编码
search_url = self.baseurl + "/search/?ie=gbk&q=%s"%keywords
# print(search_url)
try:
search_r = requests.get(search_url,timeout=30)
tree = etree.HTML(search_r.text)
a_tag = tree.xpath('//span//a')
for tag in a_tag:
text = tag.xpath("./text()")
# print(text)
if self.book in text:
print("----------------搜索到[%s]----------------"%self.book)
# print(tag.xpath("./@href")[0])
book_url = tag.xpath("./@href")[0]
break
if book_url != "":
return book_url
else:
print("----------------未搜索到[%s]----------------"%self.book)
return False
except:
print("----------------未搜索到[%s]----------------"%self.book)
return False
#获取章节url
def book_page(self,url):
book_url = self.baseurl + url
book_r = requests.get(book_url,timeout=30)
book_tree = etree.HTML(book_r.text)
zhangjies = book_tree.xpath('//dl//dd/a')
# 获取章节列表
j = 0
for zhangjie in zhangjies:
if "第一章" in zhangjie.xpath('./text()')[0]: #找到第一章
break
else:
j += 1
zhangjies = zhangjies[j::] #获取到所有章节列表
#获取所有章节URL列表
zhangjie_urls = []
for zhangjie in zhangjies:
zhangjie_url = self.baseurl + zhangjie.xpath('./@href')[0]
zhangjie_urls.append(zhangjie_url)
return zhangjie_urls
def write_book(self,page,zhangjie_urls):
zhangjie_path = os.path.join(self.book,str(page))
if not os.path.exists(zhangjie_path):
os.makedirs(zhangjie_path)
ua = UserAgent(path="./fake_useragent_0.1.11.json")
#代理IP
proxies_list = [
{"http":"60.176.71.110:9000"},
{"http":"117.90.252.170:9000"},
{"http":"211.159.149.238:8118"},
]
for i in range(len(zhangjie_urls)):
zhangjie_url = zhangjie_urls[i]
# print(zhangjie_url)
zhangjie_file = os.path.join(zhangjie_path,str(i)+".txt")
# print("----------------%s----------------"%zhangjie_file)
headers = {'User-Agent':ua.random}
for i in range(10):
for proxies in proxies_list:
try:
html = requests.get(zhangjie_url,timeout=60,headers=headers,proxies=proxies)
test = etree.HTML(html.text)
with open(zhangjie_file,'w',encoding='utf-8') as f:
for i in test.xpath('//div[@class="content"]/*'):
txt = i.xpath('./text()')
for j in txt:
if j.strip() != "":
f.write(j)
f.writelines('
')
break
except:
continue
if os.path.exists(zhangjie_file):
break
else:
if os.path.exists(zhangjie_file):
print("%s下载失败"%zhangjie_file)
def down_book(self):
_url = self.search_book()
if not _url:
sys.exit()
if os.path.exists(self.book):
shutil.rmtree(self.book)
os.mkdir(self.book)
# print(_url)
zhangjie_urls = self.book_page(_url)
print(len(zhangjie_urls))
pages = len(zhangjie_urls) // 50 + 1
Threads = []
for page in range(pages):
# self.write_book(page,zhangjie_urls)
if page < pages - 1:
zhangjie_urls_t = zhangjie_urls[50*page:50*(page+1)]
else:
zhangjie_urls_t = zhangjie_urls[50*page:]
t = threading.Thread(target=self.write_book,args=(page,zhangjie_urls_t))
Threads.append(t)
print("-------------[%s]开始下载----------------"%self.book)
try:
for t in Threads:
t.start()
for t in Threads:
t.join()
print("-------------[%s]下载成功----------------"%self.book)
return True
except Exception as e:
print('-------------[%s]下载失败----------------'%self.book)
print(e)
return False
def books(self):
if self.down_book():
# if True:
print('-------------文件处理中----------------')
dirs = len(os.listdir(self.book))
with open(self.book+".txt",'w',encoding='utf-8') as f:
for dir in range(dirs):
dir = os.path.join(self.book,str(dir))
files = len(os.listdir(dir))
for file in range(files):
with open(os.path.join(dir,"%s.txt"%file),'r',encoding='utf-8') as f2:
s = f2.read()
f.write(s)
shutil.rmtree(self.book)
print('----------------完毕----------------')
if __name__ == "__main__":
book_name = "完美世界"
book = Book(book_name)
book.books()