dameng-mcp-server达梦MCP服务
达梦数据库手写MCP服务
文件名称
server.py
源代码
参考mysql-mcp-server
写的dameng数据库版本的
点击访问mysql-mcp-server的github仓库
mcp服务端
import asyncio
import logging
import os
import sys
from dmPython import connect
from mcp.server import Server
from mcp.types import Resource, Tool, TextContent
from pydantic import AnyUrl# Configure logging
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("dameng_mcp_server")def get_db_config():"""Get database configuration from environment variables."""config = {"user": os.getenv("DAMENG_USER","SYSDBA"),"password": os.getenv("DAMENG_PASSWORD","SYSDBA"),"server": os.getenv("DAMENG_HOST", "localhost"),"port": 5236,"schema": os.getenv("DAMENG_DATABASE","")}if not all([config["user"], config["password"]]):logger.error("Missing required database configuration. Please check environment variables:")logger.error("DAMENG_USER and DAMENG_PASSWORD are required")raise ValueError("Missing required database configuration")return config# Initialize server
app = Server("dameng_mcp_server")@app.list_resources()
async def list_resources() -> list[Resource]:"""List Dameng tables as resources."""config = get_db_config()try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute("SELECT TABLE_NAME FROM ALL_TABLES")tables = cursor.fetchall()logger.info(f"Found tables: {tables}")resources = []for table in tables:resources.append(Resource(uri=f"dameng://{table[0]}/data",name=f"Table: {table[0]}",mimeType="text/plain",description=f"Data in table: {table[0]}"))return resourcesexcept Exception as e:logger.error(f"Failed to list resources: {str(e)}")return []@app.read_resource()
async def read_resource(uri: AnyUrl) -> str:"""Read table contents."""config = get_db_config()uri_str = str(uri)logger.info(f"Reading resource: {uri_str}")if not uri_str.startswith("dm://"):raise ValueError(f"Invalid URI scheme: {uri_str}")parts = uri_str[9:].split('/')table = parts[0]try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(f"SELECT * FROM {table} LIMIT 100")columns = [desc[0] for desc in cursor.description]rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return "\n".join([",".join(columns)] + result)except Exception as e:logger.error(f"Schema error reading resource {uri}: {str(e)}")raise RuntimeError(f"Schema error: {str(e)}")@app.list_tools()
async def list_tools() -> list[Tool]:"""List available Dameng tools."""logger.info("Listing tools...")return [Tool(name="execute_sql",description="Execute an SQL query on the Dameng server",inputSchema={"type": "object","properties": {"query": {"type": "string","description": "The SQL query to execute"}},"required": ["query"]})]@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:"""Execute SQL commands."""config = get_db_config()logger.info(f"Calling tool: {name} with arguments: {arguments}")if name != "execute_sql":raise ValueError(f"Unknown tool: {name}")query = arguments.get("query")if not query:raise ValueError("Query is required")try:with connect(**config) as conn:with conn.cursor() as cursor:cursor.execute(query)# Handle all other queries that return result sets (SELECT, SHOW, DESCRIBE etc.)if cursor.description is not None:columns = [desc[0] for desc in cursor.description]try:rows = cursor.fetchall()result = [",".join(map(str, row)) for row in rows]return [TextContent(type="text", text="\n".join([",".join(columns)] + result))]except Exception as e:logger.warning(f"Error fetching results: {str(e)}")return [TextContent(type="text", text=f"Query executed but error fetching results: {str(e)}")]# Non-SELECT querieselse:conn.commit()return [TextContent(type="text", text=f"Query executed successfully. Rows affected: {cursor.rowcount}")]except Exception as e:logger.error(f"Error executing SQL '{query}': {e}")return [TextContent(type="text", text=f"Error executing query: {str(e)}")]async def main():"""Main entry point to run the MCP server."""from mcp.server.stdio import stdio_server# Add additional debug outputprint("Starting Dameng MCP server with config:", file=sys.stderr)config = get_db_config()print(f"Server: {config['server']}", file=sys.stderr)print(f"Port: {config['port']}", file=sys.stderr)print(f"User: {config['user']}", file=sys.stderr)print(f"Schema: {config['schema']}", file=sys.stderr)logger.info("Starting Dameng MCP server...")logger.info(f"Schema config: {config['server']}/{config['schema']} as {config['user']}")async with stdio_server() as (read_stream, write_stream):try:await app.run(read_stream,write_stream,app.create_initialization_options())except Exception as e:logger.error(f"Server error: {str(e)}", exc_info=True)raiseif __name__ == "__main__":asyncio.run(main())
客户端
这段引用尚硅谷宋红康老师手写的客户端
import asyncio
import os
import json
from typing import Optional, List
from contextlib import AsyncExitStack
from datetime import datetime
import re
from openai import OpenAI
from dotenv import load_dotenv
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_clientload_dotenv()class MCPClient:def __init__(self):self.exit_stack = AsyncExitStack()self.openai_api_key = os.getenv("DASHSCOPE_API_KEY")self.base_url = os.getenv("BASE_URL")self.model = os.getenv("MODEL")if not self.openai_api_key:raise ValueError("❌ 未找到 OpenAI API Key,请在 .env 文件中设置 DASHSCOPE_API_KEY")self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)self.session: Optional[ClientSession] = Noneasync def connect_to_server(self, server_script_path: str):# 对服务器脚本进行判断,只允许是 .py 或 .jsis_python = server_script_path.endswith('.py')is_js = server_script_path.endswith('.js')if not (is_python or is_js):raise ValueError("服务器脚本必须是 .py 或 .js 文件")# 确定启动命令,.py 用 python,.js 用 nodecommand = "python" if is_python else "node"# 构造 MCP 所需的服务器参数,包含启动命令、脚本路径参数、环境变量(为 None 表示默认)server_params = StdioServerParameters(command=command, args=[server_script_path], env=None)# 启动 MCP 工具服务进程(并建立 stdio 通信)stdio_transport = await self.exit_stack.enter_async_context(stdio_client(server_params))# 拆包通信通道,读取服务端返回的数据,并向服务端发送请求self.stdio, self.write = stdio_transport# 创建 MCP 客户端会话对象self.session = await self.exit_stack.enter_async_context(ClientSession(self.stdio, self.write))# 初始化会话await self.session.initialize()# 获取工具列表并打印response = await self.session.list_tools()tools = response.toolsprint("\n已连接到服务器,支持以下工具:", [tool.name for tool in tools])async def process_query(self, query: str) -> str:# 准备初始消息和获取工具列表messages = [{"role": "user", "content": query}]response = await self.session.list_tools()available_tools = [{"type": "function","function": {"name": tool.name,"description": tool.description,"input_schema": tool.inputSchema}} for tool in response.tools]# 提取问题的关键词,对文件名进行生成。# 在接收到用户提问后就应该生成出最后输出的 md 文档的文件名,# 因为导出时若再生成文件名会导致部分组件无法识别该名称。keyword_match = re.search(r'(关于|分析|查询|搜索|查看)([^的\s,。、?\n]+)', query)keyword = keyword_match.group(2) if keyword_match else "分析对象"safe_keyword = re.sub(r'[\\/:*?"<>|]', '', keyword)[:20]timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')md_filename = f"sentiment_{safe_keyword}_{timestamp}.md"md_path = os.path.join("./sentiment_reports", md_filename)# 更新查询,将文件名添加到原始查询中,使大模型在调用工具链时可以识别到该信息# 然后调用 plan_tool_usage 获取工具调用计划query = query.strip() + f" [md_filename={md_filename}] [md_path={md_path}]"messages = [{"role": "user", "content": query}]tool_plan = await self.plan_tool_usage(query, available_tools)tool_outputs = {}messages = [{"role": "user", "content": query}]# 依次执行工具调用,并收集结果for step in tool_plan:tool_name = step["name"]tool_args = step["arguments"]for key, val in tool_args.items():if isinstance(val, str) and val.startswith("{{") and val.endswith("}}"):ref_key = val.strip("{} ")resolved_val = tool_outputs.get(ref_key, val)tool_args[key] = resolved_val# 注入统一的文件名或路径(用于分析和邮件)if tool_name == "analyze_sentiment" and "filename" not in tool_args:tool_args["filename"] = md_filenameif tool_name == "send_email_with_attachment" and "attachment_path" not in tool_args:tool_args["attachment_path"] = md_pathresult = await self.session.call_tool(tool_name, tool_args)tool_outputs[tool_name] = result.content[0].textmessages.append({"role": "tool","tool_call_id": tool_name,"content": result.content[0].text})# 调用大模型生成回复信息,并输出保存结果final_response = self.client.chat.completions.create(model=self.model,messages=messages)final_output = final_response.choices[0].message.content# 对辅助函数进行定义,目的是把文本清理成合法的文件名def clean_filename(text: str) -> str:text = text.strip()text = re.sub(r'[\\/:*?\"<>|]', '', text)return text[:50]# 使用清理函数处理用户查询,生成用于文件命名的前缀,并添加时间戳、设置输出目录# 最后构建出完整的文件路径用于保存记录safe_filename = clean_filename(query)timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')filename = f"{safe_filename}_{timestamp}.txt"output_dir = "./llm_outputs"os.makedirs(output_dir, exist_ok=True)file_path = os.path.join(output_dir, filename)# 将对话内容写入 md 文档,其中包含用户的原始提问以及模型的最终回复结果with open(file_path, "w", encoding="utf-8") as f:f.write(f"🗣 用户提问:{query}\n\n")f.write(f"🤖 模型回复:\n{final_output}\n")print(f"📄 对话记录已保存为:{file_path}")return final_outputasync def chat_loop(self):# 初始化提示信息print("\n🤖 MCP 客户端已启动!输入 'quit' 退出")# 进入主循环中等待用户输入while True:try:query = input("\n你: ").strip()if query.lower() == 'quit':break# 处理用户的提问,并返回结果response = await self.process_query(query)print(f"\n🤖 AI: {response}")except Exception as e:print(f"\n⚠️ 发生错误: {str(e)}")async def plan_tool_usage(self, query: str, tools: List[dict]) -> List[dict]:# 构造系统提示词 system_prompt。# 将所有可用工具组织为文本列表插入提示中,并明确指出工具名,# 限定返回格式是 JSON,防止其输出错误格式的数据。print("\n📤 提交给大模型的工具定义:")print(json.dumps(tools, ensure_ascii=False, indent=2))tool_list_text = "\n".join([f"- {tool['function']['name']}: {tool['function']['description']}"for tool in tools])system_prompt = {"role": "system","content": ("你是一个智能任务规划助手,用户会给出一句自然语言请求。\n""你只能从以下工具中选择(严格使用工具名称):\n"f"{tool_list_text}\n""如果多个工具需要串联,后续步骤中可以使用 {{上一步工具名}} 占位。\n""返回格式:JSON 数组,每个对象包含 name 和 arguments 字段。\n""不要返回自然语言,不要使用未列出的工具名。")}# 构造对话上下文并调用模型。# 将系统提示和用户的自然语言一起作为消息输入,并选用当前的模型。planning_messages = [system_prompt,{"role": "user", "content": query}]response = self.client.chat.completions.create(model=self.model,messages=planning_messages,tools=tools,tool_choice="none")# 提取出模型返回的 JSON 内容content = response.choices[0].message.content.strip()match = re.search(r"```(?:json)?\\s*([\s\S]+?)\\s*```", content)if match:json_text = match.group(1)else:json_text = content# 在解析 JSON 之后返回调用计划try:plan = json.loads(json_text)return plan if isinstance(plan, list) else []except Exception as e:print(f"❌ 工具调用链规划失败: {e}\n原始返回: {content}")return []async def cleanup(self):await self.exit_stack.aclose()async def main():server_script_path = "D:\\soft\\PythonProject\\mcp-project\\dmserver.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\mysql_mcp_server\\server.py"# server_script_path = "D:\\soft\\SoftProject\\mysql_mcp_server\\src\\dameng\\server.py"client = MCPClient()try:await client.connect_to_server(server_script_path)await client.chat_loop()finally:await client.cleanup()if __name__ == "__main__":asyncio.run(main())