數據采集與融合技術實踐--作業三
數據采集與融合技術作業三
??1.相關信息及鏈接
| 名稱 | 信息及鏈接 |
|---|---|
| 學號姓名 | 102202108 王露潔 |
| 本次作業要求鏈接 | https://edu.cnblogs.com/campus/fzu/2024DataCollectionandFusiontechnology/homework/13287 |
| 作業①所在碼云鏈接 | https://gitee.com/wanglujieeee/crawl_project/tree/master/作業3.1 |
| 作業②所在碼云鏈接 | https://gitee.com/wanglujieeee/crawl_project/tree/master/作業3.2 |
| 作業③所在碼云鏈接 | https://gitee.com/wanglujieeee/crawl_project/tree/master/作業3.3 |
??2.作業內容
作業①:
??要求:指定一個網站,爬取這個網站中的所有的所有圖片,例如:中國氣象網(http://www.weather.com.cn)。使用scrapy框架分別實現單線程和多線程的方式爬取。 –務必控制總頁數(學號尾數2位)、總下載的圖片數量(尾數后3位)等限制爬取的措施。
???輸出信息:將下載的Url信息在控制臺輸出,并將下載的圖片存儲在images子文件中,并給出截圖。 Gitee文件夾鏈接
??解決思路及代碼實現
1.前情提要:
本人學號:102202108,所以按題目要求,應爬取8個頁面,一共爬取圖片的數目為108張(理論上,出現的具體問題后再詳細解釋)
2.spider.py文件(主要板塊)
-->設置計數器:
total_images:統計已下載的圖片數量。
max_images:設置最大下載的圖片數量(108張)。
max_pages:設置最大爬取的頁面數量(8頁)。
pages_crawled:記錄已爬取的頁面數量。
import scrapy
from scrapy.exceptions import CloseSpider # 導入CloseSpider異常,用于關閉爬蟲
from urllib.parse import urljoin # 導入urljoin用于處理URL拼接
class WeatherSpider(scrapy.Spider):
name = 'weather_spider' # 爬蟲名稱
allowed_domains = ['weather.com.cn'] # 允許的域名
start_urls = ['http://www.weather.com.cn'] # 起始URL,爬蟲從這里開始
custom_settings = {
'ITEM_PIPELINES': {
'weather_images.pipelines.ImagePipeline': 1, # 啟用圖片下載管道
},
'IMAGES_STORE': 'images', # 圖片存儲路徑
'LOG_LEVEL': 'INFO', # 日志級別
'CONCURRENT_REQUESTS': 1, # 設置為單線程
}
total_images = 0 # 統計下載的圖片數量
max_images = 108 # 設置最大下載數量
max_pages = 8 # 設置最大爬取頁面數量
pages_crawled = 0 # 已爬取頁面數量
-->解析響應的 parse 方法:
1.使用CSS選擇器提取頁面中所有的圖片URL。
2.利用 urljoin 將相對URL轉換為絕對URL,并過濾出有效的HTTP URL。
3.如果沒有找到有效的圖片URL,記錄日志并返回。
def parse(self, response):
# 從響應中提取所有圖片的src屬性
image_urls = response.css('img::attr(src)').getall()
# 將相對URL轉換為絕對URL
image_urls = [urljoin(response.url, url) for url in image_urls]
# 過濾出有效的HTTP URL
image_urls = [url for url in image_urls if url.startswith('http')]
if not image_urls: # 如果沒有找到有效的圖片URL
self.log('No valid image URLs found on this page.')
return # 直接返回,不進行后續操作
-->處理有效的圖片URL:
1.遍歷每個有效的圖片URL。
2.如果已下載的圖片數量小于最大限制,產出一個包含圖片URL的字典,并記錄下載的URL。
3.增加已下載的圖片數量計數。
# 遍歷有效的圖片URL
for url in image_urls:
if self.total_images < self.max_images: # 如果還未達到最大圖片數量
yield {'image_urls': [url]} # 產出包含圖片URL的字典
self.log(f'Download URL: {url}') # 記錄下載的URL
self.total_images += 1 # 增加已下載的圖片數量
-->檢查是否達到最大圖片數量:
如果達到最大圖片數量,記錄日志并拋出 CloseSpider 異常,關閉爬蟲。
# 檢查是否達到最大圖片數量
if self.total_images >= self.max_images:
self.log(f'Reached max image limit: {self.max_images}. Closing spider.')
raise CloseSpider(reason='Reached max image limit') # 關閉爬蟲
-->處理分頁:
1.增加已爬取的頁面數量計數。
2.如果已爬取的頁面數量小于最大數量,查找下一個頁面的鏈接,并使用 response.follow 方法繼續爬取
# 處理分頁,這里假設有一個簡單的分頁機制
self.pages_crawled += 1 # 增加已爬取頁面數量
if self.pages_crawled < self.max_pages: # 如果還未達到最大頁面數量
next_page = response.css('a::attr(href)').get() # 獲取下一個頁面的鏈接
if next_page: # 如果找到下一個頁面的鏈接
yield response.follow(next_page, self.parse) # 繼續爬取下一個頁面
3.pipeline.py文件(主要板塊)
-->定義 ImagePipeline 類:
繼承自 ImagesPipeline,使其具備圖片下載的基本功能。
import scrapy
from scrapy.pipelines.images import ImagesPipeline # 導入Scrapy的圖片下載管道
import logging # 導入日志模塊,用于記錄下載錯誤信息
from scrapy.exceptions import DropItem # 導入DropItem異常,用于丟棄無法處理的項目
class ImagePipeline(ImagesPipeline):
-->get_media_requests 方法:
該方法負責生成圖片下載請求。
1.遍歷 item['image_urls'] 中的所有圖片URL。
2.檢查URL是否有效(非 None)。
3.使用 scrapy.Request 創建請求,并添加瀏覽器的 User-Agent 頭,以避免被反爬蟲機制拒絕。
def get_media_requests(self, item, info):
# 遍歷每個圖片URL
for image_url in item['image_urls']:
if image_url: # 確保 URL 不為 None
# 返回一個新的請求,設置 User-Agent 偽裝成瀏覽器
yield scrapy.Request(image_url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
})
-->item_completed 方法:
1.該方法在請求完成后被調用,處理下載結果。
2.初始化一個列表 failed_urls 用于記錄下載失敗的URL。
3.遍歷結果 results,檢查每個請求的狀態:
如果下載失敗(ok 為 False),則將失敗的URL添加到 failed_urls 列表中。
4.如果有失敗的URL,記錄錯誤日志,并拋出 DropItem 異常,表示該項目由于下載失敗而被丟棄。
5.如果所有下載都成功,返回包含成功下載的項目。
# 處理請求完成后的結果
def item_completed(self, results, item, info):
failed_urls = [] # 用于存儲下載失敗的URL
# 遍歷請求結果
for ok, x in results:
if not ok: # 如果下載失敗
failed_urls.append(x.value) # 將失敗的URL加入列表
if failed_urls: # 如果有失敗的URL
# 記錄錯誤日志
logging.error(f'Failed to download images for URLs: {failed_urls}')
# 丟棄包含下載失敗的項目
raise DropItem(f"Image download failed for: {failed_urls}")
return item # 返回成功下載的項目
4.item.py文件
-->定義數據項類 WeatherImageItem:
這個類繼承自 scrapy.Item,用于定義爬取到的數據結構。Scrapy中的Item相當于一個容器,用于存放爬蟲提取的數據。
-->定義字段:
image_urls:使用 scrapy.Field() 創建一個字段,專門用于存儲圖片的URL列表。這使得爬蟲在處理數據時可以方便地存取與圖片相關的信息。
import scrapy
class WeatherImageItem(scrapy.Item):
image_urls = scrapy.Field() # 定義一個字段,用于存儲圖片的URL列表
5.setting.py文件
DOWNLOAD_TIMEOUT = 15
BOT_NAME = 'weather_images'
SPIDER_MODULES = ['weather_images.spiders']
NEWSPIDER_MODULE = 'weather_images.spiders'
# 存儲圖片路徑
IMAGES_STORE = r'C:\example_scrapy\weather_images\images'
# 啟用圖片下載管道
ITEM_PIPELINES = {
'weather_images.pipelines.ImagePipeline': 1,
}
# 日志級別
LOG_LEVEL = 'INFO'
# 設置并發請求數
CONCURRENT_REQUESTS = 16 # 單線程或者多線程只需修改這里
# 控制請求延遲和并發請求
DOWNLOAD_DELAY = 1
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# 啟用重試
RETRY_ENABLED = True
MEDIA_ALLOW_REDIRECTS = True # 允許重定向
??運行結果截圖(這里不滿108張,詳情見問題思考模塊)

??問題思考和心得體會
我這里的代碼反反復復修改和運行了很多次,主要是修改pipeline.py文件,因為在終端運行的結果如下所示:

可以看到這里顯示很多圖片都下載失敗,這可能是導致爬取的圖片不滿108張的原因,然后我問了chatGPT,上面說問題可能出在文件路徑問題,文件權限問題,圖片 URL 的有效性,網絡問題等,我一一檢查并按照要求修改之后,仍然出現上述問題,所以我覺得可能網站使用了反爬蟲技術,或者真的網絡也有問題導致圖片下載失敗。爬蟲程序的編寫應該基本上沒什么問題,基本上都定位到了圖片所在的url,只是下載出現了問題。
本次的心得體會有:
我剛開始做這道題的時候,就被題目迷惑住了,因為我打開這個天氣網發現它根本不能翻頁,而且看上去也沒有多少張圖片可以爬取,因此我在課上疑惑了半天。后來才后知后覺,我們可以從網頁的html文件中檢索出帶有href屬性的a標簽,并獲取屬性的值就可以得到一個新的網址,經過檢查和處理之后就可以繼續進行訪問,這就是我們之前學到的對網站進行爬取的過程。
除此之外,我還學到一個scrapy框架中用于處理圖像下載的一個內置管道類:ImagesPipeline。在該類中可以直接調用方法:get_media_requests(self, item, info)來接收一個 item(包含圖像 URL 的項目)并生成用于下載每個圖像的請求。當所有請求完成之后會自動調用另一個函數:item_completed(self, results, item, info),它會檢查哪些圖像下載成功,哪些圖像下載失敗。所以這個類極大地簡化了圖像下載的過程。
還有關于單線程和多線程的設置,我本來以為很復雜,需要分別建立兩個項目,但是在這里只需要修改配置文件中的CONCURRENT_REQUESTS的值就可以了,真的很簡單方便。
最后是關于我個人的思考,我總是過于依賴AI,對很多代碼邏輯缺乏自己的思考。我對一些帶有反爬機制的網站也是束手無策,只能讓AI幫我想辦法,如果它解決不了,那就是真的解決不了了。另外不同網站的爬取應該按照實際情況采取不同的爬取數據的方法,比如靜態網頁和動態網頁需要不同的方法,我現在還是有點迷糊,希望能通過做題慢慢理解吧。
作業②:
??要求:熟練掌握 scrapy 中 Item、Pipeline 數據的序列化輸出方法;使用scrapy框架+Xpath+MySQL數據庫存儲技術路線爬取股票相關信息。 候選網站:東方財富網:https://www.eastmoney.com/
???MySQL數據庫存儲和輸出格式如下: 表頭英文命名例如:序號id,股票代碼:bStockNo……,由同學們自行定義設計

??解決思路及代碼實現
1.前情提要:本次作業使用scrapy框架,selenium方法爬取數據,使用MySQL存儲數據,最后把爬取的數據導出為csv文件放在gitee倉庫里了
2.items.py文件
--> 這里定義了一個關于股票信息的數據項,方便存放爬蟲提取的關于股票的數據。
import scrapy
class StockScraperItem(scrapy.Item):
bStockNo = scrapy.Field()
bStockName = scrapy.Field()
bLatestPrice = scrapy.Field()
bChangePercent = scrapy.Field()
bChangeAmount = scrapy.Field()
bVolume = scrapy.Field()
bAmplitude = scrapy.Field()
bHigh = scrapy.Field()
bLow = scrapy.Field()
bOpen = scrapy.Field()
bPreviousClose = scrapy.Field()
pass
3.spider.py文件
--> 這里的start_requests方法定義了一個初始請求生成方式 。發起請求的方式使用了SeleniumRequest,這是一個特殊的請求方式,可以在scrapy中使用selenium。目的是使用selenium等待頁面完全加載之后再進行解析操作(因為有些頁面是通過JavaScript動態加載的)。該方法的參數:url=url 指定了請求的目標 URL;callback=self.parse 指定了響應到達后的回調函數(這里是 parse 方法,下面有定義),用于處理響應;wait_time=10 指定了等待時間。
from scrapy_selenium import SeleniumRequest
from scrapy.spiders import Spider
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
class StockSpider(Spider):
name = 'stock_spider'
start_urls = ['https://quote.eastmoney.com/center/gridlist.html#hs_a_board']
def start_requests(self):
for url in self.start_urls:
yield SeleniumRequest(url=url, callback=self.parse, wait_time=10)
--> 這個 clean_decimal 方法的作用是清理和轉換包含百分號、單位(“億”或“萬”)或千位分隔符的字符串,并將其轉換為適當的浮點數格式。
def clean_decimal(self, value):
if value:
value = value.strip().replace(',', '')
if '%' in value:
try:
return float(value.replace('%', '')) / 100
except ValueError:
return None
if '億' in value:
try:
return float(value.replace('億', '')) * 1e8
except ValueError:
return None
elif '萬' in value:
try:
return float(value.replace('萬', '')) * 1e4
except ValueError:
return None
try:
return float(value)
except ValueError:
return None
return None
--> 查看網頁原html文檔,找到所要爬取數據的標簽元素,以便進行定位和爬取。

--> 該 parse 方法用于解析網頁表格中的股票數據,并將提取的信息存儲在 item 字典中。它使用 Selenium 來等待表格元素加載,然后提取并清理數據。“driver = response.request.meta['driver']”獲取 Selenium 的 WebDriver 實例,用于執行等待和動態加載。使用 WebDriverWait 等待頁面中的表格行加載完成,以確保解析時數據已經在頁面上。最后進行字段的提取和生成item項。
def parse(self, response):
driver = response.request.meta['driver']
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.XPATH, '//table//tr'))
)
rows = response.xpath('//table//tr')
for row in rows:
item = {}
item['bStockNo'] = row.xpath('./td[2]/a/text()').get()
item['bStockName'] = row.xpath('./td[@class="mywidth"]/a/text()').get()
item['bLatestPrice'] = self.clean_decimal(row.xpath('./td[@class="mywidth2"][1]/span/text()').get())
item['bChangePercent'] = self.clean_decimal(row.xpath('./td[@class="mywidth2"][2]/span/text()').get())
item['bChangeAmount'] = self.clean_decimal(row.xpath('./td[7]/span/text()').get())
item['bVolume'] = self.clean_decimal(row.xpath('./td[8]/text()').get())
item['bAmplitude'] = self.clean_decimal(row.xpath('./td[10]/text()').get())
item['bHigh'] = self.clean_decimal(row.xpath('./td[11]/span/text()').get())
item['bLow'] = self.clean_decimal(row.xpath('./td[12]/span/text()').get())
item['bOpen'] = self.clean_decimal(row.xpath('./td[13]/span/text()').get())
item['bPreviousClose'] = self.clean_decimal(row.xpath('./td[14]/text()').get())
yield item
--> 這里是進行一個翻頁機制。剛開始執行前面的代碼發現只能爬取到一頁的數據,所以就使用selenium來模擬翻頁的動作,以便進行下一頁數據的爬取。首先也是查看原html文件來查看包含“下一頁”按鈕的元素的位置。

--> 使用顯示等待,等到“下一頁”這個按鈕可以被點擊時,通過 next_button.is_displayed() 和 next_button.is_enabled() 進一步驗證按鈕的可見性和啟用狀態。如果“下一頁”按鈕可用,則點擊它,并記錄操作日志。之后再次使用顯示等待使得表格數據加載完成,使用 SeleniumRequest 發送請求,調用 parse 方法遞歸處理新頁面數據,避免過濾掉相同的 URL。
try:
next_button = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.XPATH, '//a[contains(@class, "next paginate_button")]'))
)
if next_button.is_displayed() and next_button.is_enabled():
self.logger.info("Clicking 'Next' button for pagination.")
next_button.click()
# 增加延遲或等待下一頁數據加載完成
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.XPATH, "http://table//tr"))
)
# 重新調用 parse 函數繼續爬取
yield SeleniumRequest(
url=driver.current_url,
callback=self.parse,
wait_time=10,
dont_filter=True
)
else:
self.logger.info("No more pages to crawl or next button disabled.")
except Exception as e:
self.logger.error(f"Error during pagination: {e}")
4.pipeline.py文件
--> 打開MySQL,建立數據庫和表格,以便后續插入數據。


--> open_spider方法會在 Scrapy 爬蟲啟動時自動調用,使管道與數據庫初始化,這里主要是進行 MySQL 數據庫的連接,并且創建一個游標對象 self.cursor,用于執行 SQL 語句。
import mysql.connector
import csv
class MySQLPipeline:
def open_spider(self, spider):
try:
# 連接 MySQL 數據庫
self.conn = mysql.connector.connect(
host='localhost', # 數據庫主機
user='root', # 用戶名
password='Wlj98192188?', # 密碼,替換為你自己的密碼
database='stock_data' # 數據庫名
)
self.cursor = self.conn.cursor()
print("MySQL connection established")
except mysql.connector.Error as err:
print(f"Error: {err}")
raise
--> close_spider 方法,在 Scrapy 爬蟲結束時自動調用,負責在爬蟲關閉前導出數據并關閉數據庫連接。調用管道的 export_to_csv 方法(下面有定義),將爬取的數據從數據庫導出為 CSV 文件通過 self.conn.commit() 提交事務,確保數據庫中保存了爬蟲獲取的所有數據。
def close_spider(self, spider):
try:
# 在關閉數據庫連接前導出數據到 CSV 文件
self.export_to_csv()
# 提交數據并關閉數據庫連接
if hasattr(self, 'conn'):
self.conn.commit()
self.cursor.close()
self.conn.close()
print("MySQL connection closed")
except AttributeError:
print("No database connection to close.")
except mysql.connector.Error as err:
print(f"Error closing MySQL connection: {err}")
--> process_item 方法,用于將每個爬取的數據項插入到 MySQL 數據庫中的 stocks 表中。每次插入后提交事務,可以確保數據在每次插入后立即寫入數據庫,避免批量插入時因程序中斷導致數據丟失。
def process_item(self, item, spider):
try:
# 插入數據到 MySQL
insert_query = """
INSERT INTO stocks (bStockNo, bStockName, bLatestPrice, bChangePercent, bChangeAmount, bVolume, bAmplitude, bHigh, bLow, bOpen, bPreviousClose)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
item['bStockNo'],
item['bStockName'],
item['bLatestPrice'],
item['bChangePercent'],
item['bChangeAmount'],
item['bVolume'],
item['bAmplitude'],
item['bHigh'],
item['bLow'],
item['bOpen'],
item['bPreviousClose']
)
self.cursor.execute(insert_query, values)
self.conn.commit() # Commit after each insert
return item
except mysql.connector.Error as err:
print(f"Error inserting data: {err}")
return None
--> export_to_csv 方法,用于將 stocks 表中的數據導出到 CSV 文件中。rows = self.cursor.fetchall() 從查詢結果中獲取所有行數據,結果保存在 rows 列表中。self.cursor.description 返回字段描述信息,用于動態獲取表頭。[i[0] for i in self.cursor.description] 提取每個字段的名稱,并存入 headers 列表中。使用 csv.writer(file) 創建 CSV 寫入器 writer,最后將表頭和內容寫入文件。
def export_to_csv(self):
# 從數據庫中查詢所有數據并導出到 CSV 文件
export_query = "SELECT * FROM stocks"
try:
self.cursor.execute(export_query)
rows = self.cursor.fetchall()
headers = [i[0] for i in self.cursor.description] # 獲取表頭
with open('exported_stock_data.csv', mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
writer.writerow(headers) # 寫入表頭
writer.writerows(rows) # 寫入數據行
print("Data exported to exported_stock_data.csv")
except mysql.connector.Error as err:
print(f"Error exporting data: {err}")
5.middlewares.py文件
--> 定義 CustomSeleniumMiddleware 類,用于在 Scrapy 中自定義 Selenium 中間件,以便在爬取過程中使用 Chrome 瀏覽器自動化,從而使Scrapy 可以加載 JavaScript 內容的網頁,并在無界面環境下穩定地執行頁面爬取。
from scrapy_selenium import SeleniumMiddleware
from selenium.webdriver.chrome.options import Options
from selenium import webdriver
class CustomSeleniumMiddleware(SeleniumMiddleware):
@classmethod
def from_crawler(cls, crawler):
# 設置 Chrome 選項
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--no-sandbox")
# 直接在 ChromeDriver 中指定 executable_path
driver = webdriver.Chrome(executable_path="D:/chromedriver-win64/chromedriver.exe", options=chrome_options)
# 設置超時
driver.set_page_load_timeout(30)
driver.implicitly_wait(10)
# 返回自定義的 SeleniumMiddleware
middleware = cls(driver_name='chrome', driver_executable_path="D:/chromedriver-win64/chromedriver.exe",
driver_arguments=chrome_options.arguments, browser_executable_path=None)
return middleware
6.setting.py文件
--> 配置文件為 Scrapy 項目 stock_scraper 設置了相關的抓取和數據存儲設置,特別是在爬蟲中集成了 Scrapy-Selenium,使其能夠處理需要瀏覽器渲染的動態內容。
BOT_NAME = "stock_scraper"
SPIDER_MODULES = ["stock_scraper.spiders"]
NEWSPIDER_MODULE = "stock_scraper.spiders"
# 激活 MySQL Pipeline
ITEM_PIPELINES = {
'stock_scraper.pipelines.MySQLPipeline': 1,
}
REDIRECT_ENABLED = False
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "stock_scraper (+http://www.yourdomain.com)"
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
# settings.py
# 啟用 Scrapy-Selenium
# 導入 Service 類
from selenium.webdriver.chrome.service import Service
# 設置 Chrome 驅動服務
SELENIUM_DRIVER_NAME = 'chrome'
SELENIUM_DRIVER_EXECUTABLE_PATH = 'D:/chromedriver-win64/chromedriver.exe'
SELENIUM_DRIVER_ARGUMENTS = ['--headless', '--disable-gpu', '--no-sandbox']
# 使用 Service 類指定 ChromeDriver 路徑
from scrapy_selenium.middlewares import SeleniumMiddleware
class CustomSeleniumMiddleware(SeleniumMiddleware):
def __init__(self, *args, **kwargs):
service = Service(executable_path=SELENIUM_DRIVER_EXECUTABLE_PATH)
kwargs['service'] = service
super().__init__(*args, **kwargs)
# 使用自定義的中間件
DOWNLOADER_MIDDLEWARES = {
'stock_scraper.middlewares.CustomSeleniumMiddleware': 800,
}
??運行結果截圖
--> 檢查MySQL中的數據(這里只顯示前時行,有些數據空了(不知啥原因),不過無傷大雅):

--> 導出的csv文件(結果有兩萬多行,這里只截了最后的一部分):

??心得體會
--> 之前在理論課上剛做了關于selenium用法的作業,然后這次就使用scapy+selenium的方式來做,我本來以為可以直接使用,但是實踐之后才知道在scrapy中使用selenium要定義專門的Scrapy 的中間件--scrapy_selenium,它允許 Scrapy 和 Selenium WebDriver 一起工作,從而支持抓取包含 JavaScript 動態渲染內容的網頁。這也是第一次編寫middlewares.py這個文件(之前都是只編寫其他四個文件就行了,因為scrapy框架默認只處理靜態頁面的爬取),在這個文件中,我們要定義一個中間件的類,這個類繼承于SeleniumMiddleware類--scrapy_selenium 中的默認 Selenium 中間件基類,用來自定義啟動谷歌瀏覽器的參數設置。這道題花了很長的時間,中間出現了很多各種各樣的問題,由于問題實在太多了,沒有辦法截圖一一在這里展示,在爬蟲,管道,中間件這三個文件的每個模塊幾乎都出現了問題,不過好在最后都一一解決了。所以感覺做爬蟲的作業真的很需要耐心,每次數據是否爬取成功都是由多個因素來決定的,這種情況下唯一的好處就是在你做出來的那一刻會覺得非常驚喜和開心。當然熟練了以后肯定就能把速度提上去了,所以再接再厲吧。
作業③:
??要求:熟練掌握 scrapy 中 Item、Pipeline 數據的序列化輸出方法;使用scrapy框架+Xpath+MySQL數據庫存儲技術路線爬取外匯網站數據。 候選網站:中國銀行網:https://www.boc.cn/sourcedb/whpj/
???輸出信息: Gitee文件夾鏈接

??解決思路和代碼實現
1.前情提要
--> 這道題和上一題類似,這里我也是使用了Scrapy框架+Selenium方法+MySQL數據庫的方式,不過不同的是在這個網頁的html文件中我沒有辦法直接找到所要爬取數據的標簽元素,這就意味著這些數據是由JavaScript腳本動態生成的,沒有辦法直接查看。因此就只能在請求發起后收到的響應中來找經過渲染后的頁面信息,以便進行后面數據的定位和提取。

2.item.py文件
--> 定義相關數據項
import scrapy
class ForexScraperItem(scrapy.Item):
currency = scrapy.Field()
tbp = scrapy.Field() # 現鈔買入價
cbp = scrapy.Field() # 現鈔賣出價
tsp = scrapy.Field() # 現匯買入價
csp = scrapy.Field() # 現匯賣出價
time = scrapy.Field()
pass
3.spider.py文件
--> 在發起請求并獲取響應后打印出響應中的page_source,觀察和找出數據所在的標簽的定位方法。

--> 這是找到的關于表格數據的標簽格式(因為之前在終端運行時打印出的數據被后面的數據頂上去找不到了,只能在與gpt的聊天記錄中找了)

--> 在分析過它的結構之后就能寫爬蟲代碼了(內容跟前面類似這里就不多說了)。
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import scrapy
from forex_scraper.items import ForexScraperItem
from selenium import webdriver
class ForexSpider(scrapy.Spider):
name = "forex"
start_urls = ["https://www.boc.cn/sourcedb/whpj/"]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
options = webdriver.ChromeOptions()
options.add_argument("--headless")
self.driver = webdriver.Chrome(options=options)
def parse(self, response):
self.driver.get(response.url)
try:
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//table[@width="100%"]'))
)
page_source = self.driver.page_source
response = scrapy.Selector(text=page_source)
rows = response.xpath('//table[@width="100%"]/tbody/tr')
for row in rows[1:]: # Skip the header row
item = ForexScraperItem()
# Use default values if any field is missing
item['currency'] = row.xpath('.//td[1]/text()').get(default="").strip()
item['tsp'] = row.xpath('.//td[2]/text()').get(default=None)
item['tbp'] = row.xpath('.//td[3]/text()').get(default=None)
item['csp'] = row.xpath('.//td[4]/text()').get(default=None)
item['cbp'] = row.xpath('.//td[5]/text()').get(default=None)
item['time'] = row.xpath('.//td[8]/text()').get(default=None)
yield item
finally:
self.driver.quit()
4.pipeline.py文件
--> 在這里也是首先連上MySQl數據庫,不過不同的是這次表格是直接在這里創建的(提前建好了db)
import pymysql
from pymysql import IntegrityError
import pandas as pd
from sqlalchemy import create_engine
class ForexScraperPipeline:
def open_spider(self, spider):
# 連接數據庫
self.conn = pymysql.connect(
host='localhost',
user='root',
password='Wlj98192188?',
db='forex_data',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor
)
self.cursor = self.conn.cursor()
# 使用 SQLAlchemy 創建引擎
self.engine = create_engine('mysql+pymysql://root:Wlj98192188?@localhost/forex_data')
# 創建表格,如果表格不存在
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS forex_data (
id INT AUTO_INCREMENT PRIMARY KEY,
currency VARCHAR(255) NOT NULL,
tbp FLOAT DEFAULT NULL,
cbp FLOAT DEFAULT NULL,
tsp FLOAT DEFAULT NULL,
csp FLOAT DEFAULT NULL,
time TIME DEFAULT NULL
)
""")
self.conn.commit()
--> 插入數據并最終導入到csv文件中
def process_item(self, item, spider):
try:
# 檢查字段是否有數據,避免空字段
if item.get('currency'):
# 插入數據
self.cursor.execute("""
INSERT INTO forex_data (currency, tbp, cbp, tsp, csp, time)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
item.get('currency'),
item.get('tbp') or None,
item.get('cbp') or None,
item.get('tsp') or None,
item.get('csp') or None,
item.get('time') or None
))
self.conn.commit()
except IntegrityError as e:
spider.logger.error(f"Database integrity error: {e}")
except Exception as e:
spider.logger.error(f"Failed to insert item: {e}")
return item
def close_spider(self, spider):
# 導出數據庫數據到 CSV 文件
try:
query = "SELECT currency, tbp, cbp, tsp, csp, time FROM forex_data"
data = pd.read_sql(query, self.engine)
data.to_csv('forex_data_export.csv', index=False, encoding='utf-8-sig')
spider.logger.info("Data successfully exported to forex_data_export.csv")
except Exception as e:
spider.logger.error(f"Failed to export data: {e}")
finally:
# 關閉數據庫連接
self.cursor.close()
self.conn.close()
5.middlewares.py文件
--> 這里與之前不同的是導入了一個scrapy.http.HtmlResponse,用于創建一個 HtmlResponse 對象,以便將 Selenium 獲取的網頁內容返回給 Scrapy。process_request方法是 Scrapy 的一個鉤子方法,用于處理每個請求,覆蓋了默認的 Scrapy 請求處理邏輯。將頁面源代碼通過 HtmlResponse 包裝并返回,使 Scrapy 將此頁面作為一個普通的響應對象處理。
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.http import HtmlResponse
class SeleniumMiddleware:
def __init__(self):
chrome_options = Options()
chrome_options.add_argument("--headless")
self.driver = webdriver.Chrome(options=chrome_options)
def process_request(self, request, spider):
self.driver.get(request.url)
try:
# 等待表格加載完成
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.XPATH, '//table'))
)
except Exception as e:
spider.logger.warning("頁面加載超時或找不到表格")
body = self.driver.page_source
return HtmlResponse(self.driver.current_url, body=body, encoding='utf-8', request=request)
def __del__(self):
self.driver.quit()
6.setting.py文件
--> 配置文件如下:
BOT_NAME = "forex_scraper"
SPIDER_MODULES = ["forex_scraper.spiders"]
NEWSPIDER_MODULE = "forex_scraper.spiders"
DOWNLOADER_MIDDLEWARES = {
'forex_scraper.middlewares.SeleniumMiddleware': 543,
}
ITEM_PIPELINES = {
'forex_scraper.pipelines.ForexScraperPipeline': 300,
}
# Crawl responsibly by identifying yourself (and your website) on the user-agent
#USER_AGENT = "forex_scraper (+http://www.yourdomain.com)"
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
??運行結果截圖
--> MySQl數據庫查詢(不好意思有點糊糊):

--> 導出的csv文件的內容:

??心得體會
--> 在做這道題查看網頁代碼的時候,整個人都愣住了,因為不管怎么樣都找不到所要數據的標簽元素,后來才知道它是動態加載的,需要JavaScript渲染填充后才會出現,上一題雖然也是動態加載,不過在原網頁代碼還是能夠找到對應標簽的。這里就體現出了使用selenium的重要性,如果只按照靜態頁面爬取數據的方式來爬取這次的數據根本什么都爬取不到,因為根本不知道標簽是啥樣的。所以這道題相比上道題可能難度又加了一點點。不過我在這道題上花費的時間是比上道題要短一些的,可能因為有上次的一點點“基礎”和“經驗”吧。在這里竟然一道題的題量都能積累可受益的經驗,那就說明我更應該多多做題了,那些平時作業做得很快的同學估計平時都沒閑著,才能越做越快,越做越好。

浙公網安備 33010602011771號