import os
import oss2
import json
import logging
import threading
import traceback
from concurrent.futures import ThreadPoolExecutor
# ====================== 可配置參數 ======================
THREAD_COUNT = 10 # 線程池大小,可調整
# 阿里云 OSS 訪問信息
access_key_id = ''
access_key_secret = ''
endpoint = '' # 內網
bucket_name = ''
# 本地目錄路徑
local_directory = '/path/logsfile/logs'
# 記錄上傳狀態的文件
state_file = 'upload_state.txt'
temp_state_file = 'upload_state.tmp'
# 日志文件
log_file = 'upload.log'
error_log_file = 'error.log'
# ====================== 日志配置(增加線程 ID) ======================
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] [%(threadName)s | Thread-%(thread)d] - %(message)s',
handlers=[
logging.FileHandler(log_file, encoding='utf-8'),
logging.StreamHandler()
]
)
error_logger = logging.getLogger("error")
error_handler = logging.FileHandler(error_log_file, encoding='utf-8')
error_handler.setLevel(logging.ERROR)
error_logger.addHandler(error_handler)
# ====================== OSS 連接 ======================
auth = oss2.Auth(access_key_id, access_key_secret)
bucket = oss2.Bucket(auth, endpoint, bucket_name)
# ====================== 讀取上次上傳狀態 ======================
upload_state = {}
if os.path.exists(state_file) and os.path.getsize(state_file):
try:
with open(state_file, 'r') as f:
upload_state = json.load(f)
except (json.JSONDecodeError, IOError) as e:
error_logger.error(f"Error reading state file: {str(e)}")
upload_state = {}
# 預加載已上傳文件,提高查詢效率
uploaded_files = set(upload_state.keys())
# 線程鎖(確保多線程安全更新狀態文件)
lock = threading.Lock()
# ====================== 上傳文件函數 ======================
def upload_file(local_file_path, oss_object_key):
thread_id = threading.get_ident() # 獲取線程編號
try:
bucket.put_object_from_file(oss_object_key, local_file_path)
logging.info(f'[Thread-{thread_id}] Uploaded {local_file_path} -> {oss_object_key}')
# 記錄上傳狀態
with lock:
upload_state[oss_object_key] = os.path.getmtime(local_file_path)
uploaded_files.add(oss_object_key)
# 上傳成功后刪除文件
try:
os.remove(local_file_path)
logging.info(f'[Thread-{thread_id}] Deleted {local_file_path}')
except OSError as e:
error_logger.error(f"[Thread-{thread_id}] Failed to delete {local_file_path}: {str(e)}")
except oss2.exceptions.OssError as e:
error_logger.error(f"[Thread-{thread_id}] OSS upload failed for {local_file_path}: {str(e)}")
except Exception as e:
error_logger.error(f"[Thread-{thread_id}] Unexpected error while uploading {local_file_path}: {traceback.format_exc()}")
# ====================== 遞歸上傳目錄(多線程) ======================
def upload_directory_to_oss(local_directory, bucket, oss_directory='logs'):
with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
futures = []
for root, _, files in os.walk(local_directory):
for file in files:
local_file_path = os.path.join(root, file)
oss_object_key = os.path.join(oss_directory, os.path.relpath(local_file_path, local_directory))
# 判斷是否需要上傳
if oss_object_key not in uploaded_files or os.path.getmtime(local_file_path) > upload_state.get(oss_object_key, 0):
futures.append(executor.submit(upload_file, local_file_path, oss_object_key))
# 等待所有任務完成
for future in futures:
future.result()
# ====================== 開始執行上傳 ======================
upload_directory_to_oss(local_directory, bucket)
# ====================== 保存上傳狀態(保證完整性) ======================
try:
with open(temp_state_file, 'w') as f:
json.dump(upload_state, f)
# 原子替換,防止狀態文件損壞
os.replace(temp_state_file, state_file)
logging.info("Upload state file updated successfully.")
except IOError as e:
error_logger.error(f"Failed to write state file: {str(e)}")