<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [MCP][03]使用FastMCP開發MCP應用

      前言

      之前的示例用的都是MCP的官方SDK(版本 1.14.0),簡單使用還是沒問題的,但對于Sampling、Elicitation這些相對高級的功能,官方沒有提供Demo,而且因為比較新,網上也沒搜到能用的案例。以我自己的水平折騰了一天也沒搗鼓出來。在翻mcp源碼時意外發現了其內置的FastMCP,順藤摸瓜找到了FastMCP的官網,在官方文檔中找到了相關用法。這里我們就用FastMCP來實現之前用mcp官方sdk做的功能,看看它有什么優勢。

      安裝

      截至本文日期的fastmcp版本為 2.12.2

      # uv
      uv add fastmcp
      
      # pip
      python -m pip install fastmcp
      

      MCP Server

      MCP Server的寫法跟之前使用mcp官方sdk差不多,只是導入FastMCP的地方和運行配置不太一樣。

      from fastmcp import FastMCP
      from typing import TypeAlias, Union
      from datetime import datetime
      import asyncio
      import asyncssh
      
      mcp = FastMCP("custom")
      
      Number: TypeAlias = Union[int, float]
      @mcp.tool()
      def add(a: Number, b: Number) -> Number:
          """Add two numbers"""
          return a + b
      
      @mcp.tool()
      def multiply(a: Number, b: Number) -> Number:
          """Multiply two numbers"""
          return a * b
      
      @mcp.tool()
      def is_greater_than(a: Number, b: Number) -> bool:
          """Check if a is greater than b
          
          Args:
              a (Number): The first number
              b (Number): The second number
      
          Returns:
              bool: True if a is greater than b, False otherwise
          """
          return a > b
      
      @mcp.tool()
      async def get_weather(city: str) -> str:  
          """Get weather for a given city."""
          return f"It's always sunny in {city}!"
      
      @mcp.tool()
      async def get_date() -> str:
          """Get today's date."""
          return datetime.now().strftime("%Y-%m-%d")
      
      @mcp.tool()
      async def execute_ssh_command_remote(hostname: str, command: str) -> str:
          """Execute an SSH command on a remote host.
          
          Args:
              hostname (str): The hostname of the remote host.
              command (str): The SSH command to execute.
      
          Returns:
              str: The output of the SSH command.
          """
          try:
              async with asyncssh.connect(hostname, username="rainux", connect_timeout=10) as conn:
                  result = await conn.run(command, timeout=10)
                  stdout = result.stdout
                  stderr = result.stderr
                  content = str(stdout if stdout else stderr)
                  return content
          except Exception as e:
              return f"Error executing command '{command}' on host '{hostname}': {str(e)}"
          
      @mcp.tool()
      async def execute_command_local(command: str, timeout: int = 10) -> str:
          """Execute a shell command locally.
          
          Args:
              command (str): The shell command to execute.
              timeout (int): Timeout in seconds for command execution. default is 10 seconds.
      
          Returns:
              str: The output of the shell command.
          """
          try:
              proc = await asyncio.create_subprocess_shell(
                  command,
                  stdout=asyncio.subprocess.PIPE,
                  stderr=asyncio.subprocess.PIPE
              )
              stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
              stdout_str = stdout.decode().strip()
              stderr_str = stderr.decode().strip()
              # content = stdout.decode() if stdout else stderr.decode()
              if stdout_str:
                  return f"Stdout: {stdout_str}"
              elif stderr_str:
                  return f"Stderr: {stderr_str}"
              else:
                  return "Command executed successfully with no output"
          except asyncio.TimeoutError:
              if proc and not proc.returncode:
                  try:
                      proc.terminate()
                      await proc.wait()
                  except:
                      pass
              return f"Error: Command '{command}' timed out after {timeout} seconds"
          except Exception as e:
              return f"Error executing command '{command}': {str(e)}"
      
      if __name__ == "__main__":
          mcp.run(transport="http", host="localhost", port=8001, show_banner=False)
      

      因為使用http協議,所以運行client前要先運行server,順帶測試下能否正常啟動。

      MCP Client

      FastMCP的client寫法與mcp官方sdk用法大致上也差不多,但在一些細節上更加友好。

      """
      MCP客戶端示例程序
      該程序演示了如何使用MCP協議與服務器進行交互,并通過LLM處理用戶查詢。
      """
      
      import asyncio
      import json
      import readline  # For enhanced input editing
      import traceback
      from typing import cast
      from openai.types.chat import ChatCompletionMessageFunctionToolCall
      from fastmcp import Client
      from openai import AsyncOpenAI
      from pkg.config import cfg
      from pkg.log import logger
      
      
      class MCPHost:
          """MCP主機類,用于管理與MCP服務器的連接和交互"""
          
          def __init__(self, server_uri: str):
              """
              初始化MCP客戶端
              
              Args:
                  server_uri (str): MCP服務器的URI地址
              """
              # 初始化MCP客戶端連接
              self.mcp_client: Client = Client(server_uri)
              # 初始化異步OpenAI客戶端用于與LLM交互
              self.llm = AsyncOpenAI(
                  base_url=cfg.llm_base_url,
                  api_key=cfg.llm_api_key,
              )
              # 存儲對話歷史消息
              self.messages = []
      
          async def close(self):
              """關閉MCP客戶端連接"""
              if self.mcp_client:
                  await self.mcp_client.close()
      
          async def process_query(self, query: str) -> str:
              """Process a user query by interacting with the MCP server and LLM.
              
              Args:
                  query (str): The user query to process.
      
              Returns:
                  str: The response from the MCP server.
              """
              # 將用戶查詢添加到消息歷史中
              self.messages.append({
                  "role": "user",
                  "content": query,
              })
      
              # 使用異步上下文管理器確保MCP客戶端連接正確建立和關閉
              async with self.mcp_client:
                  # 從MCP服務器獲取可用工具列表
                  tools = await self.mcp_client.list_tools()
                  # 構造LLM可以理解的工具格式
                  available_tools = []
      
                  # 將MCP工具轉換為OpenAI格式
                  for tool in tools:
                      available_tools.append({
                          "type": "function",
                          "function": {
                              "name": tool.name,
                              "description": tool.description,
                              "parameters": tool.inputSchema,
                          }
                      })
                  logger.info(f"Available tools: {[tool['function']['name'] for tool in available_tools]}")
      
                  # 調用LLM,傳入對話歷史和可用工具
                  resp = await self.llm.chat.completions.create(
                      model=cfg.llm_model,
                      messages=self.messages,
                      tools=available_tools,
                      temperature=0.3,
                  )
      
                  # 存儲最終響應文本
                  final_text = []
                  # 獲取LLM的首個響應消息
                  message = resp.choices[0].message
                  # 如果響應包含直接內容,則添加到結果中
                  if hasattr(message, "content") and message.content:
                      final_text.append(message.content)
      
                  # 循環處理工具調用,直到沒有更多工具調用為止
                  while message.tool_calls:
                      # 遍歷所有工具調用
                      for tool_call in message.tool_calls:
                          # 確保工具調用有函數信息
                          if not hasattr(tool_call, "function"):
                              continue
      
                          # 類型轉換以獲取函數調用詳情
                          function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)
                          function = function_call.function
                          tool_name = function.name
                          # 解析函數參數
                          tool_args = json.loads(function.arguments)
      
                          # 檢查MCP客戶端是否已連接
                          if not self.mcp_client.is_connected():
                              raise RuntimeError("Session not initialized. Cannot call tool.")
                          
                          # 調用MCP服務器上的指定工具
                          result = await self.mcp_client.call_tool(tool_name, tool_args)
      
                          # 將助手的工具調用添加到消息歷史中
                          self.messages.append({
                              "role": "assistant",
                              "tool_calls": [
                                  {
                                      "id": tool_call.id,
                                      "type": "function",
                                      "function": {
                                          "name": function.name,
                                          "arguments": function.arguments
                                      }
                                  }
                              ]
                          })
      
                          # 將工具調用結果添加到消息歷史中
                          self.messages.append({
                              "role": "tool",
                              "tool_call_id":tool_call.id,
                              "content": str(result.content) if result.content else ""
                          })
                      
                      # 基于工具調用結果再次調用LLM
                      final_resp = await self.llm.chat.completions.create(
                          model=cfg.llm_model,
                          messages=self.messages,
                          tools=available_tools,
                          temperature=0.3,
                      )
                      # 更新消息為最新的LLM響應
                      message = final_resp.choices[0].message
                      # 如果響應包含內容,則添加到最終結果中
                      if message.content:
                          final_text.append(message.content)
                  
                  # 返回連接后的完整響應
                  return "\n".join(final_text)
      
          async def chat_loop(self):
              """主聊天循環,處理用戶輸入并顯示響應"""
              print("Welcome to the MCP chat! Type 'quit' to exit.")
      
              # 持續處理用戶輸入直到用戶退出
              while True:
                  try:
                      # 獲取用戶輸入
                      query = input("You: ").strip()
      
                      # 檢查退出命令
                      if query.lower() == "quit":
                          print("Exiting chat. Goodbye!")
                          break
      
                      # 跳過空輸入
                      if not query:
                          continue
      
                      # 處理用戶查詢并獲取響應
                      resp = await self.process_query(query)
                      print(f"Assistant: {resp}")
                  
                  # 捕獲并記錄聊天循環中的任何異常
                  except Exception as e:
                      logger.error(f"Error in chat loop: {str(e)}")
                      logger.error(traceback.format_exc())
      
      
      async def main():
          """主函數,程序入口點"""
          # 創建MCP主機實例
          client = MCPHost(server_uri="http://localhost:8001/mcp")
          try:
              # 啟動聊天循環
              await client.chat_loop()
          except Exception as e:
              # 記錄主程序中的任何異常
              logger.error(f"Error in main: {str(e)}")
              logger.error(traceback.format_exc())
          finally:
              # 確保客戶端連接被正確關閉
              await client.close()
          
      
      if __name__ == "__main__":
          # 運行主程序
          asyncio.run(main())
      

      FastMCP的客戶端API設計更加直觀,特別是在連接管理和工具調用方面,代碼更簡潔易懂。

      client運行輸出:

      Welcome to the MCP chat! Type 'quit' to exit.
      You: 今天的日期是什么
      Assistant: 今天的日期是2025年9月13日。
      You: 檢查下 tx 服務器和本地的內存占用情況
      Assistant: 以下是 tx 服務器和本地的內存占用情況:
      
      ### tx 服務器
      
                     total        used        free      shared  buff/cache   available
      Mem:           3.7Gi       2.2Gi       207Mi       142Mi       1.7Gi       1.5Gi
      Swap:             0B          0B          0B
      
      
      ### 本地
      
                     total        used        free      shared  buff/cache   available
      Mem:           62Gi        14Gi        38Gi       487Mi        10Gi        48Gi
      Swap:         3.8Gi          0B       3.8Gi
      
      
      從這些信息中可以看出,tx 服務器的內存使用較高,而本地系統仍有較多可用內存。如果需要進一步分析或采取措施,請告訴我!
      You: 再查下硬盤
      Assistant: 已檢查 tx 服務器和本地的內存及硬盤占用情況。以下是總結:
      
      ### tx 服務器
      - **內存占用**:
        - 總內存: 3.7 Gi
        - 已用內存: 2.2 Gi
        - 可用內存: 1.5 Gi
      - **硬盤使用**:
        - 根目錄 `/`: 總大小 69G,已用 17G,可用 53G,使用率 24%
      
      ### 本地
      - **內存占用**:
        - 總內存: 62 Gi
        - 已用內存: 14 Gi
        - 可用內存: 48 Gi
      - **硬盤使用**:
        - 根目錄 `/`: 總大小 234G,已用 30G,使用率 14%
        - `/home`: 總大小 676G,已用 197G,使用率 31%
      
      如果需要進一步操作,請告知!
      You: quit
      Exiting chat. Goodbye!
      

      可以看到,FastMCP的基本運行邏輯是正常的,跟使用MCP官方SDK相差不大,而且還更簡潔一點。

      小結

      使用FastMCP與使用mcp官方sdk相比,整體體驗更加友好。FastMCP不僅保持了與官方SDK的兼容性,還在API設計上做了優化,使得代碼更加簡潔易懂。后續博客中我們會繼續使用FastMCP來介紹Sampling、Elicitation等MCP的高級功能。

      posted @ 2025-09-15 09:10  花酒鋤作田  閱讀(407)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 最近中文字幕国产精选| 国产一区二区不卡91| 激情综合网激情综合网激情| 欧美 亚洲 日韩 在线综合| 日本欧美一区二区三区在线播放| 天天躁夜夜躁天干天干2020| 精品久久久久中文字幕日本| a在线观看视频在线播放| 天天澡日日澡狠狠欧美老妇| 一本精品中文字幕在线| 久久精品视频一二三四区| 无码中文字幕热热久久| 九九热爱视频精品视频| 吉水县| 久久精品熟妇丰满人妻久久| 国产精品中文字幕视频| 无码成人午夜在线观看| 一区二区在线观看成人午夜| 天堂av在线一区二区| 东京热大乱系列无码| 栾城县| 午夜成人性爽爽免费视频| 国产91丝袜在线播放动漫| 激情综合网激情国产av| 亚洲十八禁一区二区三区| 性色欲情网站| 国产精品一区二区性色av| 国产午夜精品福利视频| 国产精品一区在线蜜臀| 日韩av一区二区精品不卡| 欲色欲色天天天www| 成人亚洲一级午夜激情网| 日本福利一区二区精品| 东方四虎在线观看av| 精品视频在线观自拍自拍| 国产11一12周岁女毛片| 亚洲午夜精品国产电影在线观看| 一本一道久久综合狠狠老| 国产对白老熟女正在播放| 亚洲国产日韩a在线播放| 人妻激情偷乱视频一区二区三区|