etree和協程爬明朝那些事、協程和解密爬網吧電影、scrapy爬4399游戲、
1、etree和協程爬明朝那些事
import requests
from lxml import etree
import asyncio
import aiohttp
import aiofiles
import os
# 1. 拿到主頁面的源代碼 (不需要異步)
# 2. 拿到頁面源代碼之后. 需要解析出 <卷名>, <章節, href>
headers = {
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36"
}
def get_chaptor_info(url):
resp = requests.get(url, headers=headers)
resp.encoding = "UTF-8"
page_source = resp.text
# 開始解析
tree = etree.HTML(page_source)
# 作業, 請解釋出每個循環在這里的作用?
result = []
divs = tree.xpath("http://div[@class='mulu']") # 每一個div就是一卷
for div in divs:
trs = div.xpath(".//table/tr") # 一堆tr
juan_name = trs[0].xpath(".//a/text()")
juan_name = "".join(juan_name).strip().replace(":", "_")
for tr in trs[1:]: # 93
tds = tr.xpath("./td")
for td in tds:
txt = td.xpath(".//text()")
href = td.xpath(".//@href")
txt = "".join(txt).replace(" ", "").strip()
href = "".join(href)
dic = {
"chapter_name": txt,
"chapter_url": href,
"juan_name": juan_name
}
result.append(dic)
return result
async def download_one(url, file_path):
print("我要下載文章了")
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
page_source = await resp.text(encoding="utf-8")
# 拿到文章
tree = etree.HTML(page_source)
content = tree.xpath("http://div[@class='content']//p//text()")
content = "".join(content).replace("\n", "").replace("\r", "").replace(" ", "").strip()
# 寫入文件
async with aiofiles.open(file_path, mode="w", encoding="utf-8") as f:
await f.write(content)
print("恭喜你。 下載了一篇文章!", file_path)
async def download_chapter(chaptor_list):
tasks = []
for chaptor in chaptor_list: # {juan: xxx, name:xxx, href: xxx}
juan = chaptor['juan_name'] # 文件夾名
name = chaptor['chapter_name'] # 文件名 前言.txt
url = chaptor['chapter_url'] # 用來下載 -> 異步任務
if not os.path.exists(juan): # 判斷文件夾是否存在
os.makedirs(juan) # 如果不存在就創建
# 給出文件的真正的保存路徑
file_path = f"{juan}/{name}.txt" # 74
f = download_one(url, file_path)
t = asyncio.create_task(f)
tasks.append(t)
break # 測試的時候
await asyncio.wait(tasks)
def main():
url = "https://www.mingchaonaxieshier.com/"
chaptor_list = get_chaptor_info(url)
# print(chaptor_list)
# 開始上協程. 進行異步下載
asyncio.run(download_chapter(chaptor_list))
if __name__ == '__main__':
main()
2、協程和解密爬網吧電影
#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=1263000,RESOLUTION=1280x528(不加密的m3u8)
/20211030/89ZfL7VX/hls/index.m3u8
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:4
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="https://vo1.123188kk.com/20211030/89ZfL7VX/hls/key.key"(加密的m3u8)
#EXTINF:2.44,
https://vo1.123188kk.com/20211030/89ZfL7VX/hls/3YKZ9LsK.ts

# 整體步驟 => 網吧電影
1. 想辦法找到M3U8文件
2. 判別(人工)是否需要下載第二層M3U8
3. 提取ts文件的下載路徑
4. 下載
5. 判別是否需要解密
6. 如果需要解密, 拿到秘鑰
7. 解密
8. 根據M3U8的正確順序來合并所有的ts文件 => MP4
import requests
from lxml import etree
import re
from urllib.parse import urljoin
import os # 執行cmd/控制臺上的命令
import asyncio
import aiohttp
import aiofiles
from Crypto.Cipher import AES # pip install pycryptodome
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/100.0.4896.75 Safari/537.36"
}
def get_iframe_src(url): # 拿到iframe的src
resp = requests.get(url, headers=headers)
tree = etree.HTML(resp.text)
src = tree.xpath("http://iframe/@src")[0]
return src
def get_m3u8_url(url):
resp = requests.get(url, headers=headers)
obj = re.compile(r'url: "(?P<m3u8>.*?)"', re.S)
m3u8 = obj.search(resp.text).group("m3u8") # B
return m3u8
def download_m3u8(url): # https://a.ak-kk.com/20211030/89ZfL7VX/index.m3u8
resp = requests.get(url, headers=headers)
with open("first.m3u8", mode="w", encoding="utf-8") as f:
f.write(resp.text)
# 這個位置的錯誤. 價值5分鐘
with open("first.m3u8", mode='r', encoding="utf-8") as f2:
for line in f2: # 一行一行的讀
if line.startswith("#"): # 以#開頭
continue # 拜拜
# 此時的line就是第二層M3U8的地址
line = line.strip() # 注意要strip() 否則會有意想不到的收獲
line = urljoin(url, line) # 拼接一下
# 下載第二層M3U8
resp = requests.get(line, headers=headers)
with open("second.m3u8", mode="w", encoding="utf-8") as f3:
f3.write(resp.text)
break # 可以加, 也可以不加
async def download_one(url, sem):
async with sem: # 使用信號量控制訪問頻率
file_name = url.split("/")[-1]
file_path = "./解密前/" + file_name
print(file_name, "開始工作了!")
for i in range(10): # 重試10次
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers) as resp:
content = await resp.content.read()
# 寫入文件
async with aiofiles.open(file_path, mode="wb") as f:
await f.write(content)
print(file_name, "下載完成!")
break
except Exception as e:
print(file_name, "出錯了, 馬上重試", e) # 給個提示. 看到錯誤信息
async def download_all_videos():
# 信號量, 用來控制協程的并發量
sem = asyncio.Semaphore(100) # 網吧電影中極個別電影需要控制在5左右
# 1. 讀取文件
tasks = []
with open("second.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip() # 不寫. 你會得到意想不到的收獲
# 此時line就是下載地址
# 2. 創建任務
t = asyncio.create_task(download_one(line, sem))
tasks.append(t)
# 3. 統一等待
await asyncio.wait(tasks)
def get_key():
with open("second.m3u8", mode="r", encoding="utf-8") as f:
file_content = f.read() # 讀取到所有內容
obj = re.compile(r'URI="(?P<key_url>.*?)"')
key_url = obj.search(file_content).group("key_url")
resp = requests.get(key_url, headers=headers) # 發請求, 拿秘鑰
return resp.content # 直接拿字節. 為了解密的時候. 直接丟進去就可以了.
async def desc_one(file_path, key):
file_name = file_path.split("/")[-1]
new_file_path = "./解密后/" + file_name
# 解密
async with aiofiles.open(file_path, mode="rb") as f1, \
aiofiles.open(new_file_path, mode="wb") as f2:
content = await f1.read()
# 解密
# 固定邏輯, 創建一個加密器
aes = AES.new(key=key, mode=AES.MODE_CBC, IV=b"0000000000000000")
new_content = aes.decrypt(content)
await f2.write(new_content) # 寫入新文件
print(new_file_path, "解密成功")
# 解密的協程邏輯
# 讀M3U8文件. 拿到文件名稱和路徑
# 每個ts文件一個任務
# 在每個任務中. 解密即可
async def desc_all(key):
tasks = []
with open("second.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
file_name = line.split("/")[-1]
file_path = "./解密前/" + file_name
# 創建任務. 去解密
t = asyncio.create_task(desc_one(file_path, key))
tasks.append(t)
await asyncio.wait(tasks)
def merge():
# 視頻片段合成
# B站視頻. 不適用這個.
# 需要一個命令
# windows: copy /b a.ts+b.ts+c.ts xxx.mp4
# linux/mac: cat a.ts b.ts c.ts > xxx.mp4
# 共同的坑:
# 1. 執行命令 太長了不行. 需要分段合并
# 2. 執行命令的時候. 容易出現亂碼. 采用popen來執行命令. 就可以避免亂碼
# 3. 你只需要關注. 是否合并成功了
# os.system("dir") # 會有亂碼
# r = os.popen("dir")
# print(r.read()) # 可以暫時性的避免亂碼
# 拿到所有文件名.和正確的合并順序
file_list = []
with open("second.m3u8", mode="r", encoding="utf-8") as f:
for line in f:
if line.startswith("#"):
continue
line = line.strip()
file_name = line.split("/")[-1]
file_list.append(file_name)
# 進入到文件夾內
os.chdir("./解密后") # 更換工作目錄
# file_list 所有文件名稱
# 分段合并
n = 1
temp = [] # [a.ts, b.ts, c.ts] =?=> a.ts+b.ts+c.ts
for i in range(len(file_list)):
# 每 20 個合并一次
file_name = file_list[i]
temp.append(file_name)
if i != 0 and i % 20 == 0: # 20和一次(第一次合并有21個)
# 可以合并一次了
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
r = os.popen(cmd)
print(r.read())
temp = [] # 新列表
n = n + 1
# 需要把剩余的ts進行合并
cmd = f"copy /b {'+'.join(temp)} {n}.ts"
r = os.popen(cmd)
print(r.read())
n = n + 1
# 第二次大合并 1.ts + 2.ts + 3.ts xxx.mp4
last_temp = []
for i in range(1, n):
last_temp.append(f"{i}.ts")
# 最后一次合并
cmd = f"copy /b {'+'.join(last_temp)} 春夏秋冬又一春.mp4"
r = os.popen(cmd)
print(r.read())
# 回來
os.chdir("../") # ../ 上層文件夾
def main():
# url = "http://www.wbdy.tv/play/63690_1_1.html"
# # 1.拿到iframe的src屬性值
# src = get_iframe_src(url)
# print(src)
# # 2. 發送請求到iframe的src路徑. 獲取到M3U8地址
# src = urljoin(url, src)
# m3u8_url = get_m3u8_url(src)
# print(m3u8_url)
# # 3. 下載m3u8文件
# download_m3u8(m3u8_url)
# # 4. 下載視頻. 上協程下載視頻
# event_loop = asyncio.get_event_loop()
# event_loop.run_until_complete(download_all_videos())
# # 5. 拿秘鑰
# key = get_key()
# # 6. 解密
# event_loop = asyncio.get_event_loop()
# event_loop.run_until_complete(desc_all(key))
# print("全部完成")
# 合成
merge()
if __name__ == '__main__':
main()
3、scrapy爬4399游戲
創建爬蟲項目:scrapy startproject mySpider_2

目錄結構說明圖如下:

進入項目所在文件夾并創建蟲子
cd mySpider_2
scrapy genspider youxi 4399.com

完善蟲子youxi.py中的內容
import scrapy
class ChongchongSpider(scrapy.Spider): # 繼承scrapy的Spider
name = 'chongchong' # 該名字非常關鍵, 我們在啟動該爬蟲的時候需要這個名字
allowed_domains = ['4399.com'] # 爬蟲抓取的域.限制該spider抓取的域名, 只要不符合該域名的一概過掉
# 起始頁,起始url, 在引擎開始工作的時候. 自動的包裝成一個請求對象
# 引擎進行調度. 交給下載器獲取頁面源代碼,幫你封裝成響應對象
# 引擎把響應對象交給spider進行解析, 解析函數就是 下面的parse
start_urls = ['http://www.4399.com/flash/game100.htm']
# 解析start_urls返回的響應,不能亂改,參數**kwargs 根據你的喜好進行增加 # 形參 => 變量
# 不是我調用的. 是引擎自動調用.參數也是引擎自動傳遞
def parse(self, response, **kwargs):
# response.text # 頁面源代碼
# response.xpath() # 通過xpath方式提取
# response.css() # 通過css方式提取
# response.json() # 提取json數據
# 用我們最熟悉的方式: xpath提取游戲名稱, 游戲類別, 發布時間等信息
li_list = response.xpath("http://*[@id='list']/li")
result = []
for li in li_list:
# extract_first() 提取第一個, 它的好處是. 不會越界. 如果沒有東西. 這里獲取到的是None
name = li.xpath("./div[1]/a//text()").extract_first()
leibie = li.xpath("./span[1]/a/text()").extract_first()
shijian = li.xpath("./span[2]/text()").extract_first()
# print(name, leibie, shijian)
# yield 相當于臨時的返回一個數據, 函數繼續運行,生成器函數
# yield返回只能是以下內容:
# 字典, item, 是數據, 去pipeline保存數據
# request, 繼續請求 去調度器的請求隊列
# None 結束,其他內容一律報錯
yield {"name": name, "leibie": leibie, "shijian": shijian}
運行蟲子:cd D:\pachong_test\mySpider_2\mySpider_2\spiders
scrapy crawl youxi
修改settings.py文件中的pipeline信息,前面是pipeline的類名地址,后面是優先級, 優先級月低越先執行
BOT_NAME = "mySpider_2"
SPIDER_MODULES = ["mySpider_2.spiders"]
NEWSPIDER_MODULE = "mySpider_2.spiders"
# 配置日志級別
LOG_LEVEL = "WARNING" # 最大限度的保留錯誤信息. 而又不會被一些亂七八糟的日志影響
# CRITICAL50=> 非常非常嚴重的錯誤. 解析器級別的
# ERROR 40=> 報錯, 程序掛了
# WARNING 30=> 警告, 過時警告, 不會影響程序的執行.
# INFO 20=> 一些提示信息, print("下載成功")
# DEBUG 10=> 碎嘴子. 啥玩意都記錄
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36'
# ROBOTS: 君子協議
ROBOTSTXT_OBEY = False
# CONCURRENT_REQUESTS = 32
# DOWNLOAD_DELAY = 3
# CONCURRENT_REQUESTS_PER_DOMAIN = 16
# CONCURRENT_REQUESTS_PER_IP = 16
COOKIES_ENABLED = False # 這個要打開。 否則下面的cookie無效的
# TELNETCONSOLE_ENABLED = False
DEFAULT_REQUEST_HEADERS = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en',
"Cookie": "fvlid=1642167433528aCtTRzzJxa5w; sessionid=25e76ed4-ac76-4c18-86ef-9f05f56e5f71; area=110114; che_sessionid=01720F78-6018-468C-A3E8-35235321AF81%7C%7C2022-01-14+21%3A37%3A13.520%7C%7C0; listuserarea=110100; sessionip=221.218.212.121; Hm_lvt_d381ec2f88158113b9b76f14c497ed48=1652356283; sessionvisit=3d18f224-a438-45a4-849d-531c3f4587d8; sessionvisitInfo=25e76ed4-ac76-4c18-86ef-9f05f56e5f71|www.autohome.com.cn|100533; che_sessionvid=4B36F1DE-CAF9-47AF-B6E6-A4BA52B82875; userarea=110100; ahpvno=5; UsedCarBrowseHistory=0%3A43581488; Hm_lpvt_d381ec2f88158113b9b76f14c497ed48=1652357364; ahuuid=1A75FF15-842E-4369-8720-FD12B13EEB5E; showNum=8; sessionuid=25e76ed4-ac76-4c18-86ef-9f05f56e5f71; v_no=7; visit_info_ad=01720F78-6018-468C-A3E8-35235321AF81||4B36F1DE-CAF9-47AF-B6E6-A4BA52B82875||-1||-1||7; che_ref=www.autohome.com.cn%7C0%7C100533%7C0%7C2022-05-12+20%3A09%3A19.594%7C2022-05-12+20%3A05%3A10.988; carDownPrice=1"
}
# SPIDER_MIDDLEWARES = {
# "mySpider_2.middlewares.Myspider2SpiderMiddleware": 543,
# }
# DOWNLOADER_MIDDLEWARES = {
# "mySpider_2.middlewares.Myspider2DownloaderMiddleware": 543,
# }
# EXTENSIONS = {
# "scrapy.extensions.telnet.TelnetConsole": None,
# }
# 前面是pipeline的類名地址,后面是優先級, 優先級越低越先執行
ITEM_PIPELINES = {
# 后面的數字表示優先級, 數字越小, 優先級越高
# 與引擎之間的距離
# 'mySpider_2.pipelines.CaiPipeline': 120,
# 'mySpider_2.pipelines.MySQLPipeline': 150,
# 'mySpider_2.pipelines.MongoPipeline': 180,
# "mySpider_2.pipelines.Myspider2Pipeline": 210,
# 'mySpider_2.pipelines.TuPipeline': 240,
# 'mySpider_2.pipelines.MyTuPipeline': 270,
# 'mySpider_2.pipelines.ChePipeline': 300,
# 'mySpider_2.pipelines.JiaPipeline': 330,
'mySpider_2.pipelines.ShiPipeline': 360,
}
# 下載圖片. 必須要給出一個配置:總路徑配置
IMAGES_STORE = "./imgs"
# AUTOTHROTTLE_ENABLED = True
# AUTOTHROTTLE_START_DELAY = 5
# AUTOTHROTTLE_MAX_DELAY = 60
# AUTOTHROTTLE_TARGET_CONCURRENCY = 1.0
# AUTOTHROTTLE_DEBUG = False
# HTTPCACHE_ENABLED = True
# HTTPCACHE_EXPIRATION_SECS = 0ITEM_PIPELINES
# HTTPCACHE_DIR = "httpcache"
# HTTPCACHE_IGNORE_HTTP_CODES = []
# HTTPCACHE_STORAGE = "scrapy.extensions.httpcache.FilesystemCacheStorage"
REQUEST_FINGERPRINTER_IMPLEMENTATION = "2.7"
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
FEED_EXPORT_ENCODING = "utf-8"
DOWNLOAD_DELAY = 3 # 汽車之家案例。 必須加上這個。
編寫管道pipeline.py對數據進行簡單的保存,
這個方法的聲明不能動!!! 在spider返回的數據會自動的調用這里的process_item方法,把它改了. 管道就斷了
from itemadapter import ItemAdapter
import pymysql
import pymongo
import scrapy
# ImagesPipeline 圖片專用的管道
from scrapy.pipelines.images import ImagesPipeline
class Myspider2Pipeline:
# process_item: 在引擎得到數據后. 進行數據類型判斷之后. 如果是數據.
# 引擎會自動的調用pipeline中的process_item函數
# item, 就是數據
# spider, 數據是從哪個爬蟲穿過來的???
def process_item(self, item, spider):
print("我是pipeline, 我接收到了", item)
return item
class CaiPipeline:
# 希望。 在程序跑起來的時候。打開一個w模式的文件
# 在獲取數據的時候正常寫入
# 在程序結束的時候。 關閉f
# 僅限于pipeline固定的寫法.
# open_spider, 爬蟲在開始的時候。 執行
def open_spider(self, spider_name):
self.f = open("xxx.csv", mode="w", encoding="utf-8")
# close_spider, 爬蟲結束的時候。 執行
def close_spider(self, spider_name):
self.f.close()
# process_item 的作用就是接受spider返回的數據
# spider每次返回一條數據. 這里都會自動的執行一次process_item
# 數據以參數的形式傳遞過來. item
def process_item(self, item, spider):
# print(spider.name)
# print("這里是管道", item['qi'], item['blue_ball'], item['red_ball'])
# 存儲數據,文件, mysql, mongodb, redis
self.f.write(item['qi'])
self.f.write(",")
self.f.write("_".join(item['red_ball']))
self.f.write(",")
self.f.write(item['blue_ball'])
self.f.write("\n")
# self.f.close() # 這里不能寫
return item # return在process_item中的邏輯, 是將數據傳遞給一下管道
# 存MySQL
# 準備表. 創建好表.
class MySQLPipeline:
def open_spider(self, spider_name):
# 連接mysql
self.conn = pymysql.connect(
host="127.0.0.1",
port=3306,
database="cai",
user="root",
password="root"
)
def close_spider(self, spider_name):
self.conn.close()
def process_item(self, item, spider):
# 存儲數據
try: # 代碼調試期間. 可以考慮不添加try...except...為了能看到更加完整的錯誤信息
cur = self.conn.cursor()
qi = item['qi']
# red_ball = "_".join(item['red_ball'])
red_ball = item['red_ball']
blue_ball = item['blue_ball']
sql = f"insert into ssq(qi, red_ball, blue_ball) values ('{qi}', \"{red_ball}\", '{blue_ball}')"
cur.execute(sql)
self.conn.commit()
except Exception as e:
print(e)
if cur:
cur.close()
self.conn.rollback()
return item
# 存MongoDB
class MongoPipeline:
def open_spider(self, spider_name):
self.conn = pymongo.MongoClient(
host="127.0.0.1",
port=27017
)
self.db = self.conn['python']
def close_spider(self, spider_name):
self.conn.close()
def process_item(self, item, spider):
self.db.ssq.insert_one({"qi": item['qi'], "red_ball": item['red_ball'], "blue_ball": item['blue_ball']})
return item # 給到下一個管道
class TuPipeline:
def process_item(self, item, spider):
print("img_src是:", item['img_src'])
# 一個存儲方案.
# import requests
return item
# scrapy的方案
class MyTuPipeline(ImagesPipeline):
# 1. 發送請求(下載圖片, 文件, 視頻,xxx)
def get_media_requests(self, item, info):
url = item['img_src']
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.82 Safari/537.36",
"Referer": "https://desk.zol.com.cn/dongman/"
}
yield scrapy.Request(url=url, headers=headers, meta={"sss": url}) # 直接返回一個請求對象即可
# 2. 圖片的存儲路徑
# 完整的路徑: IMAGES_STORE + file_path()的返回值
# 在這個過程中. 文件夾自動創建
def file_path(self, request, response=None, info=None, *, item=None):
# 可以準備文件夾
img_path = "dongman"
# 準備文件名字
# 坑: response.url 沒辦法正常使用
# file_name = response.url.split("/")[-1] # 直接用響應對象拿到url
# print("response:", file_name)
file_name = item['img_src'].split("/")[-1] # 用item拿到url
print("item:", file_name)
file_name = request.meta['sss'].split("/")[-1]
print("meta:", file_name)
real_path = img_path + "/" + file_name # 文件夾路徑拼接
return real_path # 返回文件存儲路徑即可
# 3. 可能需要對item進行更新
def item_completed(self, results, item, info):
# print(results)
for r in results:
print(results)
print("results是:", r[1]['path'])
return item # 一定要return item 把數據傳遞給下一個管道
class ChePipeline:
def process_item(self, item, spider):
return item
class JiaPipeline:
def process_item(self, item, spider):
return item
class ShiPipeline:
def process_item(self, item, spider):
return item
自定義數據傳輸結構item.py文件
import scrapy
class Myspider2Item(scrapy.Item):
# define the fields for your item here like:
# name = scrapy.Field()
pass
class CaiItem(scrapy.Item):
# 提前定義數據結構
qi = scrapy.Field()
blue_ball = scrapy.Field()
red_ball = scrapy.Field()
class CheItem(scrapy.Item):
# define the fields for your item here like:
name = scrapy.Field()
pass

ssq蟲子獲取響應的數據給管道→pipelines管道中的process_item保存數據→items定義數據結構→setting定義配置→

浙公網安備 33010602011771號