<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      [MCP][06]持久化記憶

      前言

      在之前的案例中,會話歷史都保存在內存中,一旦程序重啟,會話歷史就會丟失,導致AI沒法根據會話歷史推測用戶當前意圖。這就像一個人記性特別差,每次見面都忘了之前聊過什么,讓人感覺很不專業。

      如果你之前使用過LangGraph,那應該知道LangGraph提供了checkpointer功能來管理歷史會話,可以把歷史會話保存到關系型數據庫或內存中。雖然LangGraph有這個功能,但是MCP官方文檔中并沒有提類似的實現。不過沒關系,加載會話的邏輯其實很簡單,我們也可以實現一個簡單的持久化記憶功能。

      我這里設計的是把歷史會話保存到sqlite中,跟LangGraph一樣,根據thread_id查找歷史會話。我這里只是把會話當字符串來保存,而LangGraph則做了特殊處理,所以功能和性能方面相對差點,但基本功能都已經實現了,而且還另外加了時間字段。如果數據庫更換為MySQL或Postgres,可以按時間字段來分區,這樣就能更好地管理歷史數據了。

      MCP Server

      有的持久化記憶的設計是把歷史會話管理功能集成到MCP Server上,但我認為歷史會話跟Client是強相關的,所以MCP Server端我沒做任何改動,和前面的示例基本一致。這樣設計的好處是保持了Server的簡潔性,讓每個組件各司其職。

      import asyncio
      from dataclasses import dataclass
      from datetime import datetime
      
      from fastmcp import Context, FastMCP
      
      
      @dataclass
      class UserDecision:
          decision: str = "decline"
      
      
      mcp = FastMCP("Elicitation Server")
      
      @mcp.tool()
      async def get_weather(city: str) -> str:  
          """Get weather for a given city."""
          return f"It's always sunny in {city}!"
      
      @mcp.tool()
      async def get_date() -> str:
          """Get today's date."""
          return datetime.now().strftime("%Y-%m-%d")
      
      @mcp.tool()
      async def execute_command_local(command: str, ctx: Context, is_need_user_check: bool = False, timeout: int = 10) -> str:
          """Execute a shell command locally.
          
          Args:
              command (str): The shell command to execute.
              is_need_user_check (bool): Set to True when performing create, delete, or modify operations on the host, indicating that user confirmation is required.
              timeout (int): Timeout in seconds for command execution. Default is 10 seconds.
      
          Returns:
              str: The output of the shell command.
          """
          if is_need_user_check:
              user_check_result = await ctx.elicit(
                  message=f"Do you want to execute this command(yes or no): {command}",
                  response_type=UserDecision,  # response_type 必須是符合 JSON Schema
              )
              if user_check_result.action != "accept":
                  return "User denied command execution."
          try:
              proc = await asyncio.create_subprocess_shell(
                  command,
                  stdout=asyncio.subprocess.PIPE,
                  stderr=asyncio.subprocess.PIPE
              )
              stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
              stdout_str = stdout.decode().strip()
              stderr_str = stderr.decode().strip()
              if stdout_str:
                  return f"Stdout: {stdout_str}"
              elif stderr_str:
                  return f"Stderr: {stderr_str}"
              else:
                  return "Command executed successfully with no output"
          except asyncio.TimeoutError:
              if proc and not proc.returncode:
                  try:
                      proc.terminate()
                      await proc.wait()
                  except:
                      pass
              return f"Error: Command '{command}' timed out after {timeout} seconds"
          except Exception as e:
              return f"Error executing command '{command}': {str(e)}"
          
      if __name__ == "__main__":
          mcp.run(transport="streamable-http", host="localhost", port=8001, show_banner=False)
      

      client使用http通信,所以server記得要先運行。

      MCP Client

      在客戶端,我們增加了對sqlite的異步使用支持,將每次對話按thread_id保存到sqlite中。這就像是給AI裝上了"記憶硬盤",讓它能夠記住之前的對話內容。

      import asyncio
      import json
      import readline  # For enhanced input editing
      import traceback
      from collections.abc import Sequence
      from datetime import datetime
      from typing import Dict, List, Optional, cast
      
      from fastmcp import Client
      from fastmcp.client.elicitation import ElicitResult
      from mcp.shared.context import RequestContext
      from mcp.types import ElicitRequestParams
      from openai import AsyncOpenAI
      from openai.types.chat import ChatCompletionMessageFunctionToolCall
      from sqlalchemy import (Boolean, Column, DateTime, Integer, String, Text, func,
                              select)
      from sqlalchemy.ext.asyncio import (AsyncSession, async_sessionmaker,
                                          create_async_engine)
      from sqlalchemy.orm import declarative_base, sessionmaker
      
      from pkg.config import cfg
      from pkg.log import logger
      
      db_engine = create_async_engine("sqlite+aiosqlite:///aiodemo.sqlite")
      Base = declarative_base()
      session_local = async_sessionmaker(bind=db_engine, expire_on_commit=False, class_=AsyncSession)
      
      class MCPMemory(Base):
          __tablename__ = "mcp_memory"
          id = Column(Integer, primary_key=True, autoincrement=True)
          thread_id = Column(String(20), index=True, comment="client gets memories according to thread_id")
          role = Column(String(20), comment="system/user/assistant/tool")
          tool_calls = Column(Text)
          tool_call_id = Column(String(50))
          content = Column(Text)
          is_deleted = Column(Boolean, default=False, comment="logical deletion")
          created_at = Column(DateTime, default=func.now())
          updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
          deleted_at = Column(DateTime, default=None)
      
          def __repr__(self):
              return f"<MCPMemory(id={self.id}, thread_id={self.thread_id}, role={self.role}, content={self.content}, is_deleted={self.is_deleted}, created_at={self.created_at}, updated_at={self.updated_at})>"
      
      class MCPMemoryService:
          @classmethod
          async def get_memories(cls, thread_id: str) -> Sequence[MCPMemory]:
              async with session_local() as session:
                  results = await session.execute(select(MCPMemory).where(MCPMemory.thread_id == thread_id, MCPMemory.is_deleted == False).order_by(MCPMemory.created_at))
                  return results.scalars().fetchall()
              
          @classmethod
          async def add_memory(cls, thread_id: str, role: str, content: Optional[str] = None, tool_calls: Optional[str] = None, tool_call_id: Optional[str] = None):
              async with session_local() as session:
                  memory = MCPMemory(thread_id=thread_id, role=role, content=content, tool_calls=tool_calls, tool_call_id=tool_call_id)
                  session.add(memory)
                  await session.commit()
                  await session.refresh(memory)
                  return memory
      
      async def init_db():
          async with db_engine.begin() as conn:
              await conn.run_sync(Base.metadata.create_all)
      
      
      
      class MCPHost:
          """MCP主機類,用于管理與MCP服務器的連接和交互"""
          
          def __init__(self, server_uri: str, thread_id: str):
              """
              初始化MCP客戶端
              
              Args:
                  server_uri (str): MCP服務器的URI地址
              """
              # 初始化MCP客戶端連接
              self.mcp_client: Client = Client(server_uri, elicitation_handler=self.elicitation_handler)
              # 初始化異步OpenAI客戶端用于與LLM交互
              self.llm = AsyncOpenAI(
                  base_url=cfg.llm_base_url,
                  api_key=cfg.llm_api_key,
              )
              self.thread_id = thread_id
              # 存儲對話歷史消息
              self.messages = []
      
          async def close(self):
              """關閉MCP客戶端連接"""
              if self.mcp_client:
                  await self.mcp_client.close()
      
          async def load_memory(self):
              """從數據庫加載歷史對話記錄"""
              results = await MCPMemoryService.get_memories(self.thread_id)
              if results:
                  for rst in results:
                      # 處理工具調用消息
                      if rst.tool_calls is not None:
                          try:
                              tool_calls = json.loads(str(rst.tool_calls))
                              self.messages.append({
                                  "role": rst.role,
                                  "tool_calls": tool_calls
                              })
                          except json.JSONDecodeError:
                              logger.error(f"Failed to decode tool_calls JSON: {rst.tool_calls}")
                      # 處理工具調用結果消息
                      elif rst.tool_call_id is not None:
                          self.messages.append({
                              "role": "tool",
                              "tool_call_id": str(rst.tool_call_id),
                              "content": str(rst.content) if rst.content is not None else ""
                          })
                      # 處理普通消息
                      else:
                          self.messages.append({
                              "role": rst.role, 
                              "content": str(rst.content) if rst.content is not None else ""
                          })
              logger.info(f"Loaded {len(results)} messages from memory")
      
          async def elicitation_handler(self, message: str, response_type: type, params: ElicitRequestParams, context: RequestContext):
              print(f"MCP Server asks: {message}")
      
              user_decision = input("Please check(yes or no): ").strip()
      
              if not user_decision or user_decision != "yes":
                  return ElicitResult(action="decline")
              
              response_data = response_type(decision="accept")
              return response_data
      
          async def process_query(self, query: str) -> str:
              """Process a user query by interacting with the MCP server and LLM.
              
              Args:
                  query (str): The user query to process.
      
              Returns:
                  str: The response from the MCP server.
              """
              # 將用戶查詢添加到消息歷史中
              self.messages.append({
                  "role": "user",
                  "content": query,
              })
              await MCPMemoryService.add_memory(thread_id=self.thread_id, role="user", content=query)
      
              # 使用異步上下文管理器確保MCP客戶端連接正確建立和關閉
              async with self.mcp_client:
                  # 從MCP服務器獲取可用工具列表
                  tools = await self.mcp_client.list_tools()
                  # 構造LLM可以理解的工具格式
                  available_tools = []
      
                  # 將MCP工具轉換為OpenAI格式
                  for tool in tools:
                      available_tools.append({
                          "type": "function",
                          "function": {
                              "name": tool.name,
                              "description": tool.description,
                              "parameters": tool.inputSchema,
                          }
                      })
                  logger.info(f"Available tools: {[tool['function']['name'] for tool in available_tools]}")
      
                  # 調用LLM,傳入對話歷史和可用工具
                  resp = await self.llm.chat.completions.create(
                      model=cfg.llm_model,
                      messages=self.messages,
                      tools=available_tools,
                      temperature=0.3,
                  )
      
                  # 存儲最終響應文本
                  final_text = []
                  # 獲取LLM的首個響應消息
                  message = resp.choices[0].message
                  # 如果響應包含直接內容,則添加到結果中
                  if hasattr(message, "content") and message.content:
                      final_text.append(message.content)
      
                  # 循環處理工具調用,直到沒有更多工具調用為止
                  while message.tool_calls:
                      # 遍歷所有工具調用
                      for tool_call in message.tool_calls:
                          # 確保工具調用有函數信息
                          if not hasattr(tool_call, "function"):
                              continue
      
                          # 類型轉換以獲取函數調用詳情
                          function_call = cast(ChatCompletionMessageFunctionToolCall, tool_call)
                          function = function_call.function
                          tool_name = function.name
                          # 解析函數參數
                          tool_args = json.loads(function.arguments)
      
                          # 檢查MCP客戶端是否已連接
                          if not self.mcp_client.is_connected():
                              raise RuntimeError("Session not initialized. Cannot call tool.")
                          
                          # 調用MCP服務器上的指定工具
                          result = await self.mcp_client.call_tool(tool_name, tool_args)
      
                          # 將助手的工具調用添加到消息歷史中
                          self.messages.append({
                              "role": "assistant",
                              "tool_calls": [
                                  {
                                      "id": tool_call.id,
                                      "type": "function",
                                      "function": {
                                          "name": function.name,
                                          "arguments": function.arguments
                                      }
                                  }
                              ]
                          })
                          await MCPMemoryService.add_memory(
                              thread_id=self.thread_id,
                              role="assistant",
                              tool_calls=json.dumps([
                                  {
                                      "id": tool_call.id,
                                      "type": "function",
                                      "function": {
                                          "name": function.name,
                                          "arguments": function.arguments
                                      }
                                  }
                              ])
                          )
      
                          # 將工具調用結果添加到消息歷史中
                          self.messages.append({
                              "role": "tool",
                              "tool_call_id":tool_call.id,
                              "content": str(result.content) if result.content else ""
                          })
                          await MCPMemoryService.add_memory(
                              thread_id=self.thread_id,
                              role="tool",
                              tool_call_id=tool_call.id,
                              content=str(result.content) if result.content else "",
                          )
                      
                      # 基于工具調用結果再次調用LLM
                      final_resp = await self.llm.chat.completions.create(
                          model=cfg.llm_model,
                          messages=self.messages,
                          tools=available_tools,
                          temperature=0.3,
                      )
                      # 更新消息為最新的LLM響應
                      message = final_resp.choices[0].message
                      # 如果響應包含內容,則添加到最終結果中
                      if message.content:
                          final_text.append(message.content)
                  
                  # 將最終響應添加到消息歷史中
                  if final_text:
                      final_content = "\n".join(final_text)
                      self.messages.append({
                          "role": "assistant",
                          "content": final_content
                      })
                      await MCPMemoryService.add_memory(
                          thread_id=self.thread_id,
                          role="assistant",
                          content=final_content
                      )
                  
                  # 返回連接后的完整響應
                  return "\n".join(final_text)
      
          async def chat_loop(self):
              """主聊天循環,處理用戶輸入并顯示響應"""
              print("Welcome to the MCP chat! Type 'quit' to exit.")
      
              # 加載歷史記憶
              await self.load_memory()
              if self.messages:
                  print(f"Loaded {len(self.messages)} historical messages")
              
              # 顯示歷史對話
              for msg in self.messages:
                  if msg["role"] == "user":
                      print(f"You (history): {msg['content']}")
                  elif msg["role"] == "assistant":
                      if "content" in msg:
                          print(f"Assistant (history): {msg['content']}")
      
              # 持續處理用戶輸入直到用戶退出
              while True:
                  try:
                      # 獲取用戶輸入
                      query = input("You: ").strip()
      
                      # 檢查退出命令
                      if query.lower() == "quit":
                          print("Exiting chat. Goodbye!")
                          break
      
                      # 跳過空輸入
                      if not query:
                          continue
      
                      # 處理用戶查詢并獲取響應
                      resp = await self.process_query(query)
                      print(f"Assistant: {resp}")
                  
                  # 捕獲并記錄聊天循環中的任何異常
                  except Exception as e:
                      logger.error(f"Error in chat loop: {str(e)}")
                      logger.error(traceback.format_exc())
      
      
      async def main():
          """主函數,程序入口點"""
          # 創建MCP主機實例
          await init_db()
          client = MCPHost(server_uri="http://localhost:8001/mcp", thread_id="1234")
          try:
              # 啟動聊天循環
              await client.chat_loop()
          except Exception as e:
              # 記錄主程序中的任何異常
              logger.error(f"Error in main: {str(e)}")
              logger.error(traceback.format_exc())
          finally:
              # 確保客戶端連接被正確關閉
              await client.close()
          
      
      if __name__ == "__main__":
          # 運行主程序
          asyncio.run(main())
      

      Client 運行輸出示例,第一次運行

      python demo08-client.py
      Welcome to the MCP chat! Type 'quit' to exit.
      You: 今天的日期是什么
      Assistant: 今天的日期是2025年9月14日。
      You: 明天呢
      Assistant: 今天是2025年9月14日,明天是2025年9月15日。
      You: 合肥的天氣怎么樣
      Assistant: 合肥的天氣總是陽光明媚!
      You: quit
      Exiting chat. Goodbye!
      

      第二次運行輸出示例,可以看到AI連接上了歷史會話。

      Welcome to the MCP chat! Type 'quit' to exit.
      Loaded 12 historical messages
      You (history): 今天的日期是什么
      Assistant (history): 今天的日期是2025年9月14日。
      You (history): 明天呢
      Assistant (history): 今天是2025年9月14日,明天是2025年9月15日。
      You (history): 合肥的天氣怎么樣
      Assistant (history): 合肥的天氣總是陽光明媚!
      You: 南京呢?
      Assistant: 南京的天氣也總是陽光明媚!
      You: quit
      Exiting chat. Goodbye!
      

      第三次運行輸出示例,使用tool、elicitation也是正常的。

      Welcome to the MCP chat! Type 'quit' to exit.
      Loaded 16 historical messages
      You (history): 今天的日期是什么
      Assistant (history): 今天的日期是2025年9月14日。
      You (history): 明天呢
      Assistant (history): 今天是2025年9月14日,明天是2025年9月15日。
      You (history): 合肥的天氣怎么樣
      Assistant (history): 合肥的天氣總是陽光明媚!
      You (history): 南京呢?
      Assistant (history): 南京的天氣也總是陽光明媚!
      You: 查詢下內存使用情況
      Assistant: 當前內存使用情況如下:
      
      - **總內存**: 62Gi
      - **已使用內存**: 16Gi
      - **空閑內存**: 38Gi
      - **共享內存**: 161Mi
      - **緩存/緩沖區**: 8.7Gi
      - **可用內存**: 46Gi
      
      交換分區使用情況:
      
      - **總交換空間**: 3.8Gi
      - **已使用交換空間**: 0B
      - **空閑交換空間**: 3.8Gi
      
      整體來看,內存使用情況良好,仍有大量可用內存。
      You: 在家目錄生成一個txt文件,文件名為當前日期,內容為當前內存使用情況和合肥今日的天氣情況
      MCP Server asks: Do you want to execute this command(yes or no): echo "內存使用情況:\ntotal        used        free      shared  buff/cache   available\n內存:          62Gi        16Gi        38Gi       161Mi       8.7Gi        46Gi\n交換:         3.8Gi          0B       3.8Gi\n\n合肥的天氣:\nIt's always sunny in 合肥!" > ~/2025-09-14.txt
      Please check(yes or no): yes
      Assistant: 已在家目錄生成文件 `2025-09-14.txt`,文件內容包含當前內存使用情況和合肥今日的天氣情況。
      You: quit
      Exiting chat. Goodbye!
      

      小結

      通過這個示例,我們實現了一個完整的MCP持久化記憶系統,讓AI具備了"記住"對話歷史的能力。這個系統的核心優勢包括:

      1. 記憶持久化:對話歷史保存在SQLite數據庫中,即使程序重啟也不會丟失,就像給AI安裝了一個可靠的"記憶硬盤"。
      2. 線程隔離:通過thread_id區分不同用戶的對話歷史,確保每個用戶的記憶都是獨立的,互不干擾。
      3. 完整記錄:不僅記錄普通對話,還完整保存了工具調用和執行結果,確保AI能夠完整理解上下文。
      4. 時間追蹤:每條記憶都帶有時間戳,方便后續按時間維度進行分析和管理。
      5. 靈活擴展:數據庫設計支持邏輯刪除和時間分區,為后續擴展更多功能奠定了基礎。

      這個持久化記憶系統的實現,使得MCP應用能夠提供更加連貫和智能的用戶體驗。用戶不再需要每次都重復之前的上下文信息,AI也能基于歷史對話提供更加個性化和精準的服務。

      在實際應用中,這種持久化記憶機制特別適用于以下場景:

      • 客服系統:記住用戶的歷史問題和服務記錄
      • 個人助手:記住用戶的偏好和習慣
      • 教育應用:跟蹤學習進度和歷史問答
      • 商業分析:保存分析過程和結果供后續參考

      總的來說,持久化記憶機制是構建專業級AI應用的重要組成部分,它讓AI從"一次性對話工具"升級為"有記憶的智能伙伴"。

      posted @ 2025-09-16 09:09  花酒鋤作田  閱讀(223)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 精品人妻日韩中文字幕| 久久精品一本到99热免费| 开心婷婷五月激情综合社区| 亚洲国产精品线观看不卡| 五月丁香六月综合缴清无码| 热久在线免费观看视频| 91精品国产自产91精品| 极品无码国模国产在线观看| 国产睡熟迷奷系列网站| 国产精品女同性一区二区| 一个色的导航| 青青草无码免费一二三区| 国产精品亚洲综合一区二区| 国产自在自线午夜精品 | 成人网站免费观看永久视频下载 | 国产成人高清亚洲综合| 精品国产粉嫩一区二区三区| 国产精品天干天干综合网| 吉川爱美一区二区三区视频| 亚洲人妻系列中文字幕| 波多野结衣av高清一区二区三区 | 又粗又硬又黄a级毛片| 最近中文字幕完整版hd| 国产成AV人片久青草影院| 星子县| 国产无遮挡真人免费视频| 男女激情一区二区三区| 无遮挡又黄又刺激的视频| 国产成人高清亚洲综合| 亚洲AV天天做在线观看| 国产av不卡一区二区| 亚洲av色香蕉一区二区三| аⅴ天堂中文在线网| av无码精品一区二区三区| 国产精品福利自产拍久久 | 日韩乱码卡一卡2卡三卡四| 日韩精品人妻av一区二区三区| 污污内射在线观看一区二区少妇| 免费一区二三区三区蜜桃| 国产成人亚洲综合图区| 日韩精品国产二区三区|