【零基础计网入门笔记 01】计算机网络概述
2026/6/30 5:58:11
AIAgent 是一个 AI 代理类,用于处理企业微信消息。
from deepagents import create_deep_agent from langchain_mcp_adapters.tools import load_mcp_tools from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage from mcp.client.session import ClientSession from mcp.client.streamable_http import streamable_http_client from pkg.config import cfg from pkg.log import get_logger class AIAgent: logger = get_logger("ai_agent") def __init__(self): self.model: ChatOpenAI = None # type: ignore self._mcp_session: ClientSession = None # type: ignore self._mcp_server_url = f"http://127.0.0.1:{cfg.service_port}/mcp/" async def start(self): if not self.model: self.model = ChatOpenAI( base_url=cfg.agent_base_url, api_key=cfg.agent_api_key, # type: ignore model=cfg.agent_model, ) async def shutdown(self): self.model = None # type: ignore async def _create_root_agent(self, session: ClientSession): tools = await load_mcp_tools(session) root_agent = create_deep_agent( model=self.model, tools=tools, system_prompt=f"你是一个智能助手,名字叫{cfg.qywx_bot_name}, 可以协助用户处理各种问题,并用温和积极的语气回答问题。回答的格式应该符合markdown规范。", ) return root_agent async def astream(self, input: str, thread_id: str = ""): self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}") async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id): async with ClientSession(read, write) as session: await session.initialize() root_agent = await self._create_root_agent(session) async for chunk in root_agent.astream( input={"messages": [HumanMessage(content=input)]} ): # 从 chunk 字典中提取 AIMessage 的 content if isinstance(chunk, dict): messages = chunk.get("model", {}).get("messages", []) if not messages: continue for msg in messages: if hasattr(msg, "content") and msg.content: yield str(msg.content) elif hasattr(chunk, "content"): yield str(chunk.content) # type: ignore else: yield str(chunk) async def ainvoke(self, input: str, thread_id: str = ""): self.logger.debug(f"Connecting to mcp server: {self._mcp_server_url}") async with streamable_http_client(self._mcp_server_url) as (read, write, get_session_id): async with ClientSession(read, write) as session: await session.initialize() root_agent = await self._create_root_agent(session) resp = await root_agent.ainvoke( input={"messages": [HumanMessage(content=input)]} ) return resp["messages"][-1].content# main.py from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import uvicorn from contextlib import asynccontextmanager from pkg.qywx import qywx_client from pkg.config import cfg from ai_agent.mcp_servers.datetime_server import mcp as datetime_mcp from ai_agent import aiops @asynccontextmanager async def lifespan(app: FastAPI): await aiops.start() await qywx_client.start() # 先获取 MCP app(这会创建 session_manager) mcp_app = datetime_mcp.streamable_http_app() # 在 FastAPI lifespan 中启动 MCP session manager async with datetime_mcp.session_manager.run(): # 挂载 MCP app app.mount("/mcp", mcp_app) yield await aiops.shutdown() await qywx_client.shutdown() app = FastAPI( lifespan=lifespan ) if __name__ == "__main__": uvicorn.run("main:app", host=cfg.service_host, port=cfg.service_port)