url測試腳本4
###功能 #1:多線程批量url測試 #2:如需進行比對,則需修改不采用多線程 ``
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # __author__ = import requests from requests.packages import urllib3 from concurrent.futures import ThreadPoolExecutor, as_completed import time from datetime import datetime import os def get_status_code(url): """ 獲取單個URL的狀態碼和詳細信息 """ try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' } start_time = time.time() # Disable warnings and verification (development only) urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) response = requests.get(url, headers=headers, timeout=8, allow_redirects=True,verify=False) end_time = time.time() return { "url": url, "final_url": response.url, "status_code": response.status_code, "success": True, "error_message": None, "response_time": end_time - start_time, "content_length": len(response.content), "redirected": url != response.url } except requests.exceptions.Timeout: return { "url": url, "final_url": url, "status_code": "Timeout", "success": False, "error_message": "請求超時(8秒)", "response_time": None, "content_length": 0, "redirected": False } except requests.exceptions.ConnectionError: return { "url": url, "final_url": url, "status_code": "Connection Error", "success": False, "error_message": "連接錯誤", "response_time": None, "content_length": 0, "redirected": False } except requests.exceptions.RequestException as e: return { "url": url, "final_url": url, "status_code": "Request Error", "success": False, "error_message": str(e), "response_time": None, "content_length": 0, "redirected": False } def read_urls_from_file(filename="urls.txt"): """ 從文件讀取URL列表 """ urls = [] try: if not os.path.exists(filename): print(f"文件 {filename} 不存在") with open(filename, 'r', encoding='utf-8') as f: for line in f: url = line.strip() if url and not url.startswith('#'): if not url.startswith(('http://', 'https://')): url = 'https://' + url urls.append(url) print(f"從 {filename} 讀取了 {len(urls)} 個URL") return urls except Exception as e: print(f"讀取文件錯誤: {e},使用示例URL") def save_results_to_file(results, filename=None): """ 將結果保存到文件 """ if filename is None: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"url_check_results_{timestamp}.txt" try: with open(filename, 'w', encoding='utf-8') as f: # 文件頭部 f.write("=" * 70 + "\n") f.write("URL狀態碼檢查報告\n") f.write("=" * 70 + "\n") f.write(f"生成時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"檢查URL總數: {len(results)}\n") # 統計信息 success_count = sum(1 for r in results if r['success']) error_count = len(results) - success_count f.write(f"成功: {success_count}, 失敗: {error_count}\n") f.write("=" * 70 + "\n\n") # 按狀態碼分組顯示摘要 status_groups = {} for result in results: status = str(result['status_code']) if status not in status_groups: status_groups[status] = [] status_groups[status].append(result) f.write("狀態碼統計:\n") f.write("-" * 50 + "\n") for status in sorted(status_groups.keys()): count = len(status_groups[status]) f.write(f"{status}: {count} 個URL\n") f.write("\n") # 詳細結果 f.write("詳細結果:\n") f.write("=" * 70 + "\n") for i, result in enumerate(results, 1): status_icon = "?" if result['success'] else "?" f.write(f"{status_icon} [{i}/{len(results)}] {result['url']}\n") f.write(f" 狀態碼: {result['status_code']}\n") if result['redirected']: f.write(f" 重定向: {result['final_url']}\n") if result['response_time']: f.write(f" 響應時間: {result['response_time']:.3f}秒\n") if result['content_length'] > 0: f.write(f" 內容大小: {result['content_length']} 字節\n") if result['error_message']: f.write(f" 錯誤信息: {result['error_message']}\n") f.write("\n") return filename except Exception as e: print(f"保存文件時出錯: {e}") return None def batch_check_urls(urls, max_workers=8): """ 批量檢查URL狀態碼 """ if not urls: return [] results = [] total = len(urls) print(f"開始檢查 {total} 個URL...") print(f"并發數: {max_workers}") start_time = time.time() with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_url = {executor.submit(get_status_code, url): url for url in urls} for i, future in enumerate(as_completed(future_to_url), 1): try: result = future.result() results.append(result) # 每處理5個URL顯示一次進度 if i % 5 == 0 or i == total: progress = i / total * 100 print(f"進度: {i}/{total} ({progress:.1f}%)") except Exception as e: url = future_to_url[future] error_result = { "url": url, "final_url": url, "status_code": "Error", "success": False, "error_message": f"處理錯誤: {e}", "response_time": None, "content_length": 0, "redirected": False } results.append(error_result) end_time = time.time() total_time = end_time - start_time # 顯示統計信息 success_count = sum(1 for r in results if r['success']) print(f"\n檢查完成!") print(f"總耗時: {total_time:.2f}秒") print(f"平均每個URL: {total_time / len(urls):.3f}秒") print(f"成功: {success_count}, 失敗: {len(urls) - success_count}") return results def main(): """ 主函數 - 直接運行,不需要參數 """ print("=" * 50) print("URL狀態碼批量檢查工具") print("=" * 50) # 讀取URL列表 urls = read_urls_from_file() if not urls: print("沒有找到URL,創建示例文件...") print(f"將檢查以下URL:") for i, url in enumerate(urls, 1): print(f" {i}. {url}") print("\n開始檢查...(按Ctrl+C可中斷)") try: # 批量檢查URL results = batch_check_urls(urls, max_workers=6) # 保存結果到文件 output_file = save_results_to_file(results) if output_file: print(f"\n結果已保存到: {output_file}") # 顯示狀態碼統計 status_stats = {} for result in results: status = str(result['status_code']) status_stats[status] = status_stats.get(status, 0) + 1 print("\n狀態碼統計:") for status, count in sorted(status_stats.items()): print(f" {status}: {count}個") input("\n按回車鍵退出...") except KeyboardInterrupt: print("\n用戶中斷操作") except Exception as e: print(f"程序執行出錯: {e}") input("按回車鍵退出...") if __name__ == "__main__": # 直接運行主函數 main()
浙公網安備 33010602011771號