<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      解密prompt系列61. 手搓代碼沙箱與FastAPI-MCP實戰

      最近Vibe Code在各種技術社區刷屏,不過說實話,在日常工作中,我更多是用LLM來生成文檔、批量修改代碼或者排查問題。畢竟業務需求嘛,很少有能一次性描述清楚的(懂的都懂哈哈~)。但在看了最新的SWE-Bench Pro評測后,我決定嘗試一下端到端的AI編程體驗。

      前兩章我們討論了JupyterAgent,當時用的是E2B的代碼沙箱。這次我決定自己動手,用字節的TRAE從頭構建一個Python代碼沙箱,并加入MCP支持。完整代碼已經開源在simple_sandbox,Star is Welcomed!

      個人體感TRAE在國內Coding IDE里算Very Good,比開源的Cline,Kilo等在Token使用上效率更高,但和cursor還有距離。如何最大化Token使用效率和效果是個系統工程問題~

      Vibe Coding實戰:從零構建代碼沙箱

      本來想完整分享整個Vibe Coding過程的,結果TRAE升級把歷史對話記錄清空了(哭)。那就跟大家分享一下我的操作思路和最終成果吧!

      我在使用AI IDE時通常有兩種策略:

      • 模型主導:讓模型先做整體規劃,我人工調整后讓模型執行,依賴模型的測試文件和執行報錯進行迭代優化
      • 人工主導:我自己拆解任務,讓模型逐步實現,每一步都進行人工校驗

      如何選擇這兩種模式完全取決于我對于整個項目是否有很強的先驗思考,哈哈就是我知道怎么做的我帶著模型做,我不知道的模型帶著我做。

      這次的任務因為有E2B的SDK可以參考,我選擇了人工主導的方式,將任務拆解成幾個明確的步驟:

      1. 沙箱核心功能:沙箱創建、環境初始化、代碼執行
      2. 功能完善:結構化輸出、預裝環境、工作目錄隔離、中文字體支持
      3. 擴展功能:文件上傳、文件獲取、超時關閉
      4. 服務化:搭建FastAPI服務接口
      5. 客戶端Demo:創建請求示例
      6. 文檔撰寫:完善的README

      最終模型實現的沙箱核心類如下,完整代碼請移步Github,在整個編碼過程中,我只在功能設計層面做了調整,沒有發現任何編碼錯誤:

      
      # 沙箱類
      class Sandbox:
          def __init__(self, sandbox_id: str, work_dir: str, venv_dir: str):
              self.sandbox_id = sandbox_id
              self.work_dir = work_dir
              self.venv_dir = venv_dir
      
              # 配置KernelManager使用虛擬環境中的Python解釋器
              self.kernel_manager = KernelManager(
                  kernel_name='python3',
                  kernel_spec_manager=self._create_custom_kernel_spec_manager()
              )
              # 設置環境變量
              env = os.environ.copy()
              env['VIRTUAL_ENV'] = venv_dir
      
              # 設置內核使用虛擬環境中的Python
              self.kernel_manager.start_kernel(
                  cwd=work_dir,
                  env=env,
              )
      
              self.kernel_client = self.kernel_manager.client()
              self.kernel_client.start_channels()
              self.kernel_client.wait_for_ready()
              self.last_execute_id = 0
      
              # 復制字體文件到沙箱工作目錄
              font_source_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'SimHei.ttf')
              font_dest_path = os.path.join(work_dir, 'SimHei.ttf')
              if os.path.exists(font_source_path):
                  try:
                      shutil.copy2(font_source_path, font_dest_path)
                      print(f"字體文件已復制到: {font_dest_path}")
                  except Exception as e:
                      print(f"復制字體文件失敗: {e}")
              else:
                  print(f"未找到字體文件: {font_source_path}")
      
              # 安裝基本包
              self._install_basic_packages()
              
              # 執行字體注冊代碼
              self.execute_code(font_code)
      
          def _create_custom_kernel_spec_manager(self):
              # 創建自定義的內核規范管理器,確保使用正確的Python環境
              from jupyter_client.kernelspec import KernelSpecManager
              ksm = KernelSpecManager()
              return ksm
      
          def _install_basic_packages(self):
              # 在虛擬環境中安裝基本包
              # 如果是從鏡像復制的環境,可能已經包含了基本包,這里可以跳過或僅檢查
              try:
                  # 直接使用Linux路徑設置
                  pip_path = os.path.join(self.venv_dir, 'bin', 'pip')
                  python_exe = os.path.join(self.venv_dir, 'bin', 'python')
      
                  # 檢查ipykernel是否已安裝
                  check_result = subprocess.run(
                      [python_exe, '-c', 'import ipykernel'],
                      capture_output=True,
                      text=True
                  )
      
                  # 如果ipykernel未安裝,則安裝
                  if check_result.returncode != 0:
                      print("ipykernel未安裝,開始安裝...")
                      subprocess.check_call([pip_path, 'install', 'ipykernel'])
              except Exception as e:
                  print(f"安裝基礎包失敗: {e}")
      
          def execute_code(self, code: str) -> Dict:
              # 生成執行ID
              self.last_execute_id += 1
      
              # 執行代碼
              msg_id = self.kernel_client.execute(code)
      
              stdout = []
              stderr = []
              error = None
              results = []
      
              # 收集執行結果
              while True:
                  try:
                      msg = self.kernel_client.get_iopub_msg(timeout=3600)
                      msg_type = msg['header']['msg_type']
                      if msg['parent_header'].get('msg_id') != msg_id:
                          continue 
                      elif msg_type == 'stream':
                          content = msg['content']
                          if content['name'] == 'stdout':
                              stdout.append(content['text'])
                          elif content['name'] == 'stderr':
                              stderr.append(content['text'])
                      elif msg_type == 'error':
                          content = msg['content']
                          # 合并error字段,包含ename、evalue和traceback
                          error = {
                              'name':ansi_escape.sub('',  content['ename']),
                              'value': ansi_escape.sub('',content['evalue']),
                              'traceback': [ansi_escape.sub('', i) for i in content['traceback']]
                          }
                      elif msg_type == 'execute_result':
                          # 處理執行結果,按照{type,data}格式存儲
                          data = msg['content']['data']
                          for data_type, data_value in data.items():
                              results.append({"type": data_type, "data": data_value})
      
                      elif msg_type == 'display_data':
                          # 處理顯示數據,按照{type,data}格式存儲
                          data = msg['content']['data']
                          for data_type, data_value in data.items():
                              results.append({"type": data_type, "data": data_value})
      
                      elif msg_type == 'execute_reply':
                          break
                      elif msg_type == 'status':
                          if msg['content']['execution_state'] == 'idle':
                              break
                  except Exception:
                      break
      
              return {
                  'stdout': [ansi_escape.sub('', i) for i in stdout],
                  'stderr': [ansi_escape.sub('', i) for i in stderr],
                  'error': error,
                  'results': results
              }
      
          def upload_file(self, file: UploadFile, file_path: Optional[str] = None) -> str:
              # 確定文件保存路徑:默認為工作目錄最簡化code
              if file_path:
                  save_path = os.path.join(self.work_dir, file_path)
              else:
                  save_path = os.path.join(self.work_dir, file.filename)
      
              # 確保目標目錄存在
              os.makedirs(os.path.dirname(save_path), exist_ok=True)
      
              # 保存文件
              with open(save_path, "wb") as f:
                  shutil.copyfileobj(file.file, f)
      
              return save_path
      
          def get_files(self) -> List[Dict[str, str]]:
              files = []
              for root, _, filenames in os.walk(self.work_dir):
                  for filename in filenames:
                      file_path = os.path.join(root, filename)
                      relative_path = os.path.relpath(file_path, self.work_dir)
                      files.append({
                          'path': relative_path,
                          'size': os.path.getsize(file_path)
                      })
              return files
      
          def get_file_path(self, file_path: str) -> str:
              full_path = os.path.abspath(os.path.join(self.work_dir, file_path))
              # 安全檢查,確保文件在工作目錄內
              if not full_path.startswith(os.path.abspath(self.work_dir)):
                  raise HTTPException(status_code=403, detail="File access denied")
              return full_path
      
          def shutdown(self):
              try:
                  self.kernel_client.stop_channels()
                  self.kernel_manager.shutdown_kernel()
                  # 清理工作目錄
                  shutil.rmtree(self.work_dir, ignore_errors=True)
                  # 清理虛擬環境目錄
                  shutil.rmtree(self.venv_dir, ignore_errors=True)
              except Exception:
                  pass
      
          def install_package(self, package_name: str) -> Dict:
              """在沙箱的虛擬環境中安裝Python包"""
              try:
                  # 獲取虛擬環境中的pip路徑(Linux環境)
                  pip_path = os.path.join(self.venv_dir, 'bin', 'pip')
      
                  # 執行pip安裝命令
                  result = subprocess.run(
                      [pip_path, 'install', package_name],
                      capture_output=True,
                      text=True,
                      timeout=60  # 設置超時時間
                  )
      
                  # 檢查安裝是否成功
                  if result.returncode == 0:
                      return {
                          'success': True,
                          'stdout': result.stdout,
                          'stderr': result.stderr,
                          'message': f"成功安裝包: {package_name}"
                      }
                  else:
                      return {
                          'success': False,
                          'stdout': result.stdout,
                          'stderr': result.stderr,
                          'message': f"安裝包失敗: {package_name}"
                      }
              except Exception as e:
                  return {
                      'success': False,
                      'stdout': '',
                      'stderr': str(e),
                      'message': f"安裝過程出錯: {str(e)}"
                  }
      

      用FastAPI-MCP打造標準化MCP服務

      沙箱服務跑通后,我決定將其打包成標準的MCP服務。這時候就輪到FastAPI-MCP出場了!就像我們在解密prompt系列58. MCP - 工具演變 & MCP基礎中提到的MCP本身并不是工具,它只是Adapter,而FastAPI-MCP庫完美體現了這一特性——它可以將現有的FastAPI工具直接轉換成標準MCP服務。

      但這里遇到了一個常見問題:大模型對新的library支持不夠好。解決方案是使用上下文管理模塊,將API接口文檔加入上下文。這種方法特別適用于:

      • 這兩年的新Library:MCP etc.
      • 大版本更新的Lirabry: 例如ES 7.XX -> Elastic Search 8.XX

      image

      這樣我們就可以引用對應的API文檔讓TRAE幫我們進一步把服務借助FastAPI-MCP包裝成MCP服務,并使用FastMCP給出請求Demo。而FastAPI-MCP的使用也非常簡單,只需要添加三行代碼就可以完成MCP服務的適配

      from fastapi_mcp import FastApiMCP
      # 創建并掛載MCP服務器 - 移到所有端點定義之后
      mcp = FastApiMCP(app)
      mcp.mount_http()
      

      接下來,我把FastMCP的接口文檔加入上下文,讓模型生成MCP Client來驗證服務。但運行時發現MCP服務可以請求成功,list_tool卻顯示為空。通過觀察輸入輸出來定位問題,TRAE成功找到了問題所在并進行了修復。

      image

      重要提示:使用FastMCP-API時需要先添加端點再掛載MCP!

      不過下一個問題難住了TRAE:調用后發現FastMCP-API默認使用端點名稱+函數名稱作為工具名稱,導致list_tool顯示的工具名很奇怪。我想優化工具命名,但TRAE沒有在接口文檔中找到對應內容。

      image

      其實解決方案很簡單,只需要添加operation_id參數就能提供工具別名:

      # Explicit operation_id (tool will be named "get_user_info")
      @app.get("/users/{user_id}", operation_id="get_user_info")
      async def read_user(user_id: int):
          return {"user_id": user_id}
      

      image

      結合上下文和明確的任務拆解,LLM在Coding任務上確實是當前AI能發揮最大價值的領域可能沒有之一。筆者已經很明顯的感覺在工作中能否最大程度發揮工具的能力已經能很顯著地拉開大家工作效率的差距。哈哈這里肯定有人吐槽當牛馬你要這么高的效率干什么,但咱就說省出來的時間咱鼓搗點新鮮好玩的不香么?下次咱結合開源的Cline或Kilo一邊看上下文的工程設計,一邊看下模型自主對復雜任務的拆解效果。

      posted @ 2025-10-09 08:04  風雨中的小七  閱讀(187)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 日韩一区二区三区不卡片| 国产情侣激情在线对白| 亚洲激情av一区二区三区| 蜜臀av午夜精品福利| 久久亚洲精品日本波多野结衣| 免费AV片在线观看网址| 久久精品国产99久久美女| 九九日本黄色精品视频| 亚洲色拍拍噜噜噜最新网站| 国产日韩久久免费影院| 久久精品人人槡人妻人人玩AV| 国产三级精品福利久久| 青青草无码免费一二三区| 国产精品老熟女露脸视频| 婷婷久久香蕉五月综合加勒比 | 精品无码一区二区三区电影| 好紧好湿太硬了我太爽了视频| 亚洲码亚洲码天堂码三区| 精品亚洲精品日韩精品| 国产成人AV男人的天堂| 亚洲国产精品综合久久网络| 国产一区二区不卡视频在线| 99RE6在线观看国产精品| 亚洲精品久久久久久无码色欲四季| 亚洲av免费成人精品区| 国产va在线观看免费| 中文字幕精品亚洲字幕成| 亚洲高潮喷水无码AV电影 | 尹人香蕉久久99天天拍欧美p7| 色窝窝免费播放视频在线| 精品久久久久久无码人妻蜜桃| 亚洲免费成人av一区| 婷婷四房播播| 国产国产久热这里只有精品| 精品无码国产日韩制服丝袜| 成av人电影在线观看| 九九热在线免费精品视频| 蜜桃视频网站| 精品国产乱码久久久久久浪潮| 东京热人妻无码一区二区av| 国产97色在线 | 免费|