<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      阿里云OSS python3 上傳

      import os
      import oss2
      from oss2.credentials import StaticCredentialsProvider
      from pathlib import Path
      from typing import Optional, Dict, Any, List, Tuple
      import concurrent.futures
      from threading import Lock
      import time
      
      endpoint = "https://oss-cn-beijing.aliyuncs.com"
      region = "cn-beijing"
      bucket_name = "image-browser"
      env_access_key_id = "oss_image_browser_access_key_id"
      env_access_key_secret = "oss_image_browser_access_key_secret"
      
      
      class AsyncOSSUploader:
          def __init__(self, workers: int = 10):
              """
              初始化異步OSS上傳器
              
              Args:
                  workers: 最大并發(fā)工作線程數(shù)
              """
              provider = StaticCredentialsProvider(
                  access_key_id=os.getenv(env_access_key_id),
                  access_key_secret=os.getenv(env_access_key_secret)
              )
              auth = oss2.ProviderAuthV4(provider)
              self.bucket = oss2.Bucket(auth, endpoint, bucket_name, region=region)
              self.workers = workers
              self.lock = Lock()
              self.uploaded_count = 0
              self.failed_count = 0
          
          def get_file_headers(self, file_path: str) -> Dict[str, str]:
              """根據(jù)文件類型獲取合適的HTTP頭"""
              headers = {
                  'x-oss-storage-class': 'Standard'
              }
              
              # 根據(jù)文件擴展名設置Content-Type
              ext = Path(file_path).suffix.lower()
              content_types = {
                  '.html': 'text/html',
                  '.css': 'text/css',
                  '.js': 'application/javascript',
                  '.json': 'application/json',
                  '.png': 'image/png',
                  '.jpg': 'image/jpeg',
                  '.jpeg': 'image/jpeg',
                  '.gif': 'image/gif',
                  '.svg': 'image/svg+xml',
                  '.ico': 'image/x-icon',
                  '.webp': 'image/webp',
                  '.txt': 'text/plain',
                  '.md': 'text/markdown',
                  '.xml': 'application/xml',
                  '.pdf': 'application/pdf',
                  '.zip': 'application/zip',
                  '.woff': 'font/woff',
                  '.woff2': 'font/woff2',
                  '.ttf': 'font/ttf',
                  '.eot': 'application/vnd.ms-fontobject'
              }
              
              if ext in content_types:
                  headers['Content-Type'] = content_types[ext]
              
              if Path(file_path).name == 'index.html':
                  headers['Cache-Control'] = 'no-cache'
              
              return headers
          
          def _upload_single_file(self, file_path: Path, oss_key: str) -> bool:
              """上傳單個文件(線程安全)"""
              try:
                  headers = self.get_file_headers(str(file_path))
                  
                  with open(file_path, 'rb') as f:
                      self.bucket.put_object(oss_key, f, headers=headers)
                  
                  with self.lock:
                      self.uploaded_count += 1
                      print(f"? 上傳成功 ({self.uploaded_count}): {file_path.name} -> {oss_key}")
                  
                  return True
                  
              except Exception as e:
                  with self.lock:
                      self.failed_count += 1
                      print(f"? 上傳失敗 ({self.failed_count}): {file_path.name} -> {e}")
                  return False
          
          def upload(self, src_path: str, dst_path: str, include_root: bool = False) -> bool:
              """
              上傳文件或目錄到OSS
              
              Args:
                  src_path: 本地源路徑(文件或目錄)
                  dst_path: OSS目標路徑
                  include_root: 
                      - True: 整個目錄上傳到dst_path目錄中 (類似 cp src dst)
                      - False: 目錄內(nèi)容上傳到dst_path目錄中 (類似 cp -rf src/* dst)
              
              Returns:
                  bool: 上傳是否成功
              """
              src_path = Path(src_path)
              dst_path = dst_path.rstrip('/')
              
              if not src_path.exists():
                  print(f"? 源路徑不存在: {src_path}")
                  return False
              
              start_time = time.time()
              
              try:
                  if src_path.is_file():
                      # 上傳單個文件
                      return self._upload_single_file_sync(src_path, dst_path)
                  elif src_path.is_dir():
                      # 上傳目錄
                      return self._upload_directory(src_path, dst_path, include_root)
                  else:
                      print(f"? 不支持的路徑類型: {src_path}")
                      return False
                      
              except Exception as e:
                  print(f"? 上傳過程中發(fā)生錯誤: {e}")
                  return False
              finally:
                  elapsed_time = time.time() - start_time
                  print(f"??  總耗時: {elapsed_time:.2f}秒")
          
          def _upload_single_file_sync(self, file_path: Path, dst_path: str) -> bool:
              """同步上傳單個文件"""
              try:
                  if dst_path is None:
                      dst_path = ""
                  
                  if dst_path.endswith('/') or not dst_path:
                      if dst_path:
                          oss_key = f"{dst_path}/{file_path.name}"
                      else:
                          oss_key = file_path.name
                  else:
                      oss_key = dst_path
                  
                  oss_key = oss_key.lstrip('/')
                  
                  print(f"?? 上傳文件: {file_path} -> {oss_key}")
                  return self._upload_single_file(file_path, oss_key)
                  
              except Exception as e:
                  print(f"? 文件上傳失敗 {file_path}: {e}")
                  return False
          
          def _upload_directory(self, dir_path: Path, dst_path: str, include_root: bool) -> bool:
              """上傳目錄"""
              print(f"?? 開始上傳目錄: {dir_path} -> {dst_path}")
              print(f"?? 模式: {'包含根目錄' if include_root else '僅內(nèi)容'}")
              print(f"?? 并發(fā)上傳 (最大 {self.workers} 線程)")
              
              # 收集所有需要上傳的文件
              upload_tasks = []
              
              for file_path in dir_path.rglob('*'):
                  if file_path.is_file():
                      if include_root:
                          relative_path = str(file_path)
                      else:
                          relative_path = file_path.relative_to(dir_path)
                      
                      relative_path = str(relative_path)
                      oss_key = f"{dst_path}/{relative_path}".replace("\\", "/")
                      oss_key = oss_key.lstrip('/')
                      
                      upload_tasks.append((file_path, oss_key))
              
              total_files = len(upload_tasks)
              print(f"?? 發(fā)現(xiàn) {total_files} 個文件需要上傳")
              
              if total_files == 0:
                  print("??  沒有文件需要上傳")
                  return True
              
              if total_files == 1:
                  # 單個文件直接上傳
                  file_path, oss_key = upload_tasks[0]
                  return self._upload_single_file(file_path, oss_key)
              else:
                  # 多個文件使用并發(fā)上傳
                  return self._upload_concurrent(upload_tasks)
          
      
          
          def _upload_concurrent(self, upload_tasks: List[Tuple[Path, str]]) -> bool:
              """并發(fā)上傳文件"""
              print("? 使用并發(fā)上傳模式")
              
              with concurrent.futures.ThreadPoolExecutor(max_workers=self.workers) as executor:
                  # 提交所有上傳任務
                  future_to_task = {
                      executor.submit(self._upload_single_file, file_path, oss_key): (file_path, oss_key)
                      for file_path, oss_key in upload_tasks
                  }
                  
                  # 等待所有任務完成
                  for future in concurrent.futures.as_completed(future_to_task):
                      file_path, oss_key = future_to_task[future]
                      try:
                          future.result()  # 獲取結(jié)果,如果有異常會在這里拋出
                      except Exception as e:
                          print(f"? 任務異常 {file_path}: {e}")
              
              print(f"?? 并發(fā)上傳完成: 成功 {self.uploaded_count} 個文件,失敗 {self.failed_count} 個文件")
              return self.failed_count == 0
      
      
      def upload(src_path: str, dst_path: str, include_root: bool = False, workers: int = 10) -> bool:
          """
          便捷的上傳函數(shù)
          
          Args:
              src_path: 本地源路徑
              dst_path: OSS目標路徑
              include_root: 是否包含根目錄
              workers: 最大并發(fā)工作線程數(shù)
          
          Returns:
              bool: 上傳是否成功
          """
          uploader = AsyncOSSUploader(workers=workers)
          return uploader.upload(src_path, dst_path, include_root)
      
      if __name__ == "__main__":
         upload('node_modules', '/da1', include_root=True, workers=10)
      

        

      posted @ 2025-07-19 13:10  浪浪辛  閱讀(14)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 人妻影音先锋啪啪AV资源| 亚洲天堂成人网在线观看| 美女内射福利大全在线看| 色老板精品无码免费视频| 夜色福利站WWW国产在线视频| 亚洲色最新高清AV网站| 久9re热视频这里只有精品免费| 日韩伦理片一区二区三区| 亚洲欧美成人久久综合中文网| 中文午夜乱理片无码| 尹人香蕉久久99天天拍| 亚洲成在人线av无码| 国产高清自产拍av在线| 久久综合给合久久狠狠狠88| 在线亚洲午夜理论av大片| 99国精品午夜福利视频不卡99| 国产福利午夜十八禁久久| 国产熟睡乱子伦午夜视频| 苏尼特右旗| 亚洲AV无码东方伊甸园| 九九电影网午夜理论片| 人妻少妇偷人精品一区| 福利一区二区在线视频| 国产一区二区三区黄色片 | 久久精品国产亚洲av麻豆软件| 爱性久久久久久久久| 国产蜜臀av在线一区二区| 久久天天躁狠狠躁夜夜躁2012| 久久精品女人天堂av免费观看| 平江县| 一区二区三区国产不卡| 无码国内精品人妻少妇| 日本一道一区二区视频| 精品素人AV无码不卡在线观看| 欧美性色黄大片| 国产精品十八禁一区二区| 苍井空毛片精品久久久| 亚洲熟妇自偷自拍另类| 国产精品小粉嫩在线观看| 又黄又爽又色的少妇毛片| 国产精品一线二线三线区|