代码很早之前写的了,有的地方需要修改下才能用,提供个思路。
哔哩哔哩直达import requests
import asyncio
import aiohttp
import aiofiles
import parsel
import time
import os
"""
1.找到用户想下载小说的目录
2.爬取每个章节的子url和章节名字
3-1:多线程同步操作
1.每个章节翻页直到本章节结束,下载每个页面的文本并保存(大同小异,懒得弄了)
3-2:单线程异步操作找到每一章中下一页的url
1.下载每一章的操作为异步,每一页的操作为同步(本代码)
4.整合章节为一个文件
"""
# 获取用户检索的小说目录
def get_mulu():
book_name = input("请输入你想下载的小说名字并回车:")
url = f"https://m.bqgbi.com/user/search.html?q={book_name}" # 在笔趣阁中检索用户想看的小说
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.0.0 Safari/537.36 "
}
# 尝试爬取8次
for i in range(8):
resp = requests.get(url, headers=headers)
if resp.text != "1":
print(f"第{i + 1}次爬取成功")
for u in resp.json():
print("序号" + f"{resp.json().index(u) + 1}" + "书名" + u["articlename"], "作者" + u["author"],
"链接" + u["url_list"])
xuan_ze = input("检索结果如上,请输入想下载小说的序号:")
print("正在准备下载中请稍等")
mulu_url = "https://m.bqgbi.com" + resp.json()[int(xuan_ze) - 1][
"url_list"] + "list.html" # 找到对应的子url,拼接出书目录的url
return mulu_url
else:
print(f"爬取第{i + 1}次失败,请耐心等待")
time.sleep(1)
if i == 7:
print("很抱歉该书目前没有检索到,请稍后重试或者关闭")
# 异步下载每一章节中每一页的小说内容并下载
async def get_source(url, name):
shu_zi1 = 1
shu_zi2 = 1
while shu_zi1 == shu_zi2:
try:
shu_zi1 = url.rsplit('/', 1)[1].rsplit('.', 1)[0]
except:
shu_zi1 = url.rsplit("/", 1)[1].rsplit("_", 1)[0]
timeout = aiohttp.ClientTimeout(total=600) # 将超时时间设置为600秒
connector = aiohttp.TCPConnector(limit=50) # 将并发数量降低
async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/116.0.0.0 Safari/537.36 "
}
async with session.get(url, headers=headers) as response:
selector1 = parsel.Selector(text=await response.text(encoding='utf-8')) # 将请求的源代码传给parsel
url = selector1.css("a::attr(href) ").getall()[4]
try:
shu_zi2 = url.rsplit("/", 1)[1].rsplit("_", 1)[0]
except:
shu_zi2 = url.rsplit('/', 1)[1].rsplit('.', 1)[0]
url = "https://m.bqgbi.com" + url
book = selector1.css("#chaptercontent ::text").getall() # 提取小说章节内容
del book[-3:-1] # 删除乱码
try:
for i in book:
async with aiofiles.open(name + ".txt", mode="a", encoding="utf-8") as f:
await f.write(i.strip())
await f.write("\n")
except:
pass
print(f"{name}下载完成")
# 获取每一章的url和章节名字,并且整合下载好的txt文本
async def main():
tasks = []
resp = requests.get(get_mulu()) # 提取书页源代码
selector1 = parsel.Selector(resp.text) # 将请求的源代码传给parsel
urls = selector1.css(".book_last dd a::attr(href)").getall() # 提取小说章节url列表
names = selector1.css(".book_last dd a::text").getall() # 提取章节名字
for url, name in zip(urls, names):
url = "https://m.bqgbi.com" + url
tasks.append(asyncio.create_task(get_source(url, name)))
await asyncio.wait(tasks)
# 整合文本
dirPath = r"/".join(os.getcwd().split("\\")) # 所有txt位于的文件夹路径
files = [x + ".txt" for x in names] # 按顺序的txt列表
del files[0]
res = ""
i = 0
for file in files:
if file.endswith(".txt"):
try:
i += 1
title = "第%s章 %s" % (i, file[0:len(file) - 4])
with open(dirPath + "/" + file, "r", encoding='utf-8') as file:
content = file.read()
file.close()
append = "\n%s\n\n%s" % (title, content)
res += append
except:
pass
with open(dirPath + "/outfile.txt", "w", encoding='utf-8') as outFile:
outFile.write(res.strip())
outFile.close()
print("整合完成 ,请关闭,全书一共" + str(len(res)) + "字")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())