import os
import oss2
from oss2.credentials import StaticCredentialsProvider
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import concurrent.futures
from threading import Lock
import time
endpoint = "https://oss-cn-beijing.aliyuncs.com"
region = "cn-beijing"
bucket_name = "image-browser"
env_access_key_id = "oss_image_browser_access_key_id"
env_access_key_secret = "oss_image_browser_access_key_secret"
class AsyncOSSUploader:
def __init__(self, workers: int = 10):
"""
初始化異步OSS上傳器
Args:
workers: 最大并發(fā)工作線程數(shù)
"""
provider = StaticCredentialsProvider(
access_key_id=os.getenv(env_access_key_id),
access_key_secret=os.getenv(env_access_key_secret)
)
auth = oss2.ProviderAuthV4(provider)
self.bucket = oss2.Bucket(auth, endpoint, bucket_name, region=region)
self.workers = workers
self.lock = Lock()
self.uploaded_count = 0
self.failed_count = 0
def get_file_headers(self, file_path: str) -> Dict[str, str]:
"""根據(jù)文件類型獲取合適的HTTP頭"""
headers = {
'x-oss-storage-class': 'Standard'
}
# 根據(jù)文件擴展名設置Content-Type
ext = Path(file_path).suffix.lower()
content_types = {
'.html': 'text/html',
'.css': 'text/css',
'.js': 'application/javascript',
'.json': 'application/json',
'.png': 'image/png',
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.gif': 'image/gif',
'.svg': 'image/svg+xml',
'.ico': 'image/x-icon',
'.webp': 'image/webp',
'.txt': 'text/plain',
'.md': 'text/markdown',
'.xml': 'application/xml',
'.pdf': 'application/pdf',
'.zip': 'application/zip',
'.woff': 'font/woff',
'.woff2': 'font/woff2',
'.ttf': 'font/ttf',
'.eot': 'application/vnd.ms-fontobject'
}
if ext in content_types:
headers['Content-Type'] = content_types[ext]
if Path(file_path).name == 'index.html':
headers['Cache-Control'] = 'no-cache'
return headers
def _upload_single_file(self, file_path: Path, oss_key: str) -> bool:
"""上傳單個文件(線程安全)"""
try:
headers = self.get_file_headers(str(file_path))
with open(file_path, 'rb') as f:
self.bucket.put_object(oss_key, f, headers=headers)
with self.lock:
self.uploaded_count += 1
print(f"? 上傳成功 ({self.uploaded_count}): {file_path.name} -> {oss_key}")
return True
except Exception as e:
with self.lock:
self.failed_count += 1
print(f"? 上傳失敗 ({self.failed_count}): {file_path.name} -> {e}")
return False
def upload(self, src_path: str, dst_path: str, include_root: bool = False) -> bool:
"""
上傳文件或目錄到OSS
Args:
src_path: 本地源路徑(文件或目錄)
dst_path: OSS目標路徑
include_root:
- True: 整個目錄上傳到dst_path目錄中 (類似 cp src dst)
- False: 目錄內(nèi)容上傳到dst_path目錄中 (類似 cp -rf src/* dst)
Returns:
bool: 上傳是否成功
"""
src_path = Path(src_path)
dst_path = dst_path.rstrip('/')
if not src_path.exists():
print(f"? 源路徑不存在: {src_path}")
return False
start_time = time.time()
try:
if src_path.is_file():
# 上傳單個文件
return self._upload_single_file_sync(src_path, dst_path)
elif src_path.is_dir():
# 上傳目錄
return self._upload_directory(src_path, dst_path, include_root)
else:
print(f"? 不支持的路徑類型: {src_path}")
return False
except Exception as e:
print(f"? 上傳過程中發(fā)生錯誤: {e}")
return False
finally:
elapsed_time = time.time() - start_time
print(f"?? 總耗時: {elapsed_time:.2f}秒")
def _upload_single_file_sync(self, file_path: Path, dst_path: str) -> bool:
"""同步上傳單個文件"""
try:
if dst_path is None:
dst_path = ""
if dst_path.endswith('/') or not dst_path:
if dst_path:
oss_key = f"{dst_path}/{file_path.name}"
else:
oss_key = file_path.name
else:
oss_key = dst_path
oss_key = oss_key.lstrip('/')
print(f"?? 上傳文件: {file_path} -> {oss_key}")
return self._upload_single_file(file_path, oss_key)
except Exception as e:
print(f"? 文件上傳失敗 {file_path}: {e}")
return False
def _upload_directory(self, dir_path: Path, dst_path: str, include_root: bool) -> bool:
"""上傳目錄"""
print(f"?? 開始上傳目錄: {dir_path} -> {dst_path}")
print(f"?? 模式: {'包含根目錄' if include_root else '僅內(nèi)容'}")
print(f"?? 并發(fā)上傳 (最大 {self.workers} 線程)")
# 收集所有需要上傳的文件
upload_tasks = []
for file_path in dir_path.rglob('*'):
if file_path.is_file():
if include_root:
relative_path = str(file_path)
else:
relative_path = file_path.relative_to(dir_path)
relative_path = str(relative_path)
oss_key = f"{dst_path}/{relative_path}".replace("\\", "/")
oss_key = oss_key.lstrip('/')
upload_tasks.append((file_path, oss_key))
total_files = len(upload_tasks)
print(f"?? 發(fā)現(xiàn) {total_files} 個文件需要上傳")
if total_files == 0:
print("?? 沒有文件需要上傳")
return True
if total_files == 1:
# 單個文件直接上傳
file_path, oss_key = upload_tasks[0]
return self._upload_single_file(file_path, oss_key)
else:
# 多個文件使用并發(fā)上傳
return self._upload_concurrent(upload_tasks)
def _upload_concurrent(self, upload_tasks: List[Tuple[Path, str]]) -> bool:
"""并發(fā)上傳文件"""
print("? 使用并發(fā)上傳模式")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
# 提交所有上傳任務
future_to_task = {
executor.submit(self._upload_single_file, file_path, oss_key): (file_path, oss_key)
for file_path, oss_key in upload_tasks
}
# 等待所有任務完成
for future in concurrent.futures.as_completed(future_to_task):
file_path, oss_key = future_to_task[future]
try:
future.result() # 獲取結(jié)果,如果有異常會在這里拋出
except Exception as e:
print(f"? 任務異常 {file_path}: {e}")
print(f"?? 并發(fā)上傳完成: 成功 {self.uploaded_count} 個文件,失敗 {self.failed_count} 個文件")
return self.failed_count == 0
def upload(src_path: str, dst_path: str, include_root: bool = False, workers: int = 10) -> bool:
"""
便捷的上傳函數(shù)
Args:
src_path: 本地源路徑
dst_path: OSS目標路徑
include_root: 是否包含根目錄
workers: 最大并發(fā)工作線程數(shù)
Returns:
bool: 上傳是否成功
"""
uploader = AsyncOSSUploader(workers=workers)
return uploader.upload(src_path, dst_path, include_root)
if __name__ == "__main__":
upload('node_modules', '/da1', include_root=True, workers=10)