# 基础设施设计 — MCP 协议集成 ## 文档元数据 - 项目名称: free-code - 文档类型: 基础设施设计 - 原始代码来源: `../../src/services/mcp/`(22个文件) - 原始设计意图: 将 MCP 服务器、工具、资源与认证统一抽象为可管理的 .NET 协议层,并支持多传输、多作用域与自动适配 - 交叉引用: [基础设施设计总览](基础设施设计.md) | [核心模块设计-工具系统](../核心模块设计/核心模块设计-工具系统.md) | [原始代码映射](reference/原始代码映射-基础设施.md) ## 设计目标 MCP 协议层负责把外部工具、远程服务、认证流程与本地执行环境统一起来,向上提供稳定的 .NET 抽象。该层不仅要兼容多种 transport,还要把 MCP 工具/命令/资源适配为上层工具系统可消费的接口。 ## 10.1 IMcpClientManager 接口定义 ```csharp /// /// MCP 客户端管理器 — 管理多个 MCP 服务器连接 /// 对应原始 useManageMCPConnections.ts /// public interface IMcpClientManager { /// 连接所有配置的 MCP 服务器 Task ConnectServersAsync(CancellationToken ct = default); /// 获取所有已连接服务器的工具(适配为 ITool) Task> GetToolsAsync(); /// 获取所有已连接服务器的命令(适配为 ICommand) Task> GetCommandsAsync(); /// 列出指定服务器的资源 Task> ListResourcesAsync( string? serverName = null, CancellationToken ct = default); /// 读取指定资源 Task ReadResourceAsync( string serverName, string resourceUri, CancellationToken ct = default); /// 断开指定服务器 Task DisconnectServerAsync(string serverName); /// 重连指定服务器(用于断线恢复) Task ReconnectServerAsync(string serverName); /// 获取所有服务器连接状态 IReadOnlyList GetConnections(); /// 触发认证流程(OAuth) Task AuthenticateServerAsync(string serverName); /// 重新加载所有配置并重连 Task ReloadAsync(); } ``` ## 10.2 MCPServerConnection 抽象 record 对应原始 `types.ts` 中的 union type。 ```csharp /// /// MCP 服务器连接状态 — 替代原始 TypeScript 联合类型 /// 原始: ConnectedMCPServer | FailedMCPServer | NeedsAuthMCPServer | PendingMCPServer | DisabledMCPServer /// public abstract record MCPServerConnection { public string Name { get; init; } public string ConnectionType { get; init; } public ScopedMcpServerConfig Config { get; init; } // 类型判别(替代 TypeScript 可辨识联合) public bool IsConnected => this is Connected; public bool IsFailed => this is Failed; public bool NeedsAuth => this is NeedsAuthentication; public bool IsPending => this is Pending; public bool IsDisabled => this is Disabled; public sealed record Connected( string Name, ScopedMcpServerConfig Config, McpClient Client, ServerCapabilities Capabilities, ServerInfo? ServerInfo, string? Instructions, Func Cleanup ) : MCPServerConnection { ConnectionType = "connected"; } public sealed record Failed( string Name, ScopedMcpServerConfig Config, string? Error ) : MCPServerConnection { ConnectionType = "failed"; } public sealed record NeedsAuthentication( string Name, ScopedMcpServerConfig Config ) : MCPServerConnection { ConnectionType = "needs-auth"; } public sealed record Pending( string Name, ScopedMcpServerConfig Config, int? ReconnectAttempt = null, int? MaxReconnectAttempts = null ) : MCPServerConnection { ConnectionType = "pending"; } public sealed record Disabled( string Name, ScopedMcpServerConfig Config ) : MCPServerConnection { ConnectionType = "disabled"; } } ``` ## 10.3 ScopedMcpServerConfig 配置层级 对应原始 `types.ts` 中的 Zod schema 定义。 ```csharp /// /// MCP 服务器配置 — 替代原始 8 种 Zod schema /// 使用 FluentValidation 进行校验 /// public abstract record ScopedMcpServerConfig { public required ConfigScope Scope { get; init; } public string? PluginSource { get; init; } } public record StdioServerConfig : ScopedMcpServerConfig { public string Command { get; init; } = ""; public IReadOnlyList Args { get; init; } = []; public IReadOnlyDictionary? Env { get; init; } } public record SseServerConfig : ScopedMcpServerConfig { public required string Url { get; init; } public IReadOnlyDictionary? Headers { get; init; } public string? HeadersHelper { get; init; } public McpOAuthConfig? OAuth { get; init; } } public record SseIdeServerConfig : ScopedMcpServerConfig { public required string Url { get; init; } public required string IdeName { get; init; } public bool IdeRunningInWindows { get; init; } } public record WsIdeServerConfig : ScopedMcpServerConfig { public required string Url { get; init; } public required string IdeName { get; init; } public string? AuthToken { get; init; } public bool IdeRunningInWindows { get; init; } } public record HttpServerConfig : ScopedMcpServerConfig { public required string Url { get; init; } public IReadOnlyDictionary? Headers { get; init; } public string? HeadersHelper { get; init; } public McpOAuthConfig? OAuth { get; init; } } public record WebSocketServerConfig : ScopedMcpServerConfig { public required string Url { get; init; } public IReadOnlyDictionary? Headers { get; init; } public string? HeadersHelper { get; init; } } public record SdkServerConfig : ScopedMcpServerConfig { public required string ServerName { get; init; } } public record ClaudeAiProxyServerConfig : ScopedMcpServerConfig { public required string Url { get; init; } public required string Id { get; init; } } public enum ConfigScope { Local, User, Project, Dynamic, Enterprise, ClaudeAi, Managed } public record McpOAuthConfig { public string? ClientId { get; init; } public int? CallbackPort { get; init; } public string? AuthServerMetadataUrl { get; init; } public bool Xaa { get; init; } } ``` ## 10.4 传输层实现 对应原始 `@modelcontextprotocol/sdk` 中的多种传输 + 自定义 WebSocketTransport。 ```csharp /// /// MCP 传输层抽象 — JSON-RPC 2.0 over various transports /// public interface IMcpTransport : IAsyncDisposable { Task ConnectAsync(CancellationToken ct = default); Task SendAsync(JsonRpcMessage message, CancellationToken ct = default); IAsyncEnumerable ListenAsync(CancellationToken ct = default); Task CloseAsync(); bool IsConnected { get; } } /// /// Stdio 传输 — 子进程 stdin/stdout /// 对应原始 StdioClientTransport /// public sealed class StdioTransport : IMcpTransport { private readonly Process _process; private readonly Channel _incoming = Channel.CreateUnbounded(); public StdioTransport(StdioServerConfig config, string workingDirectory) { var psi = new ProcessStartInfo { FileName = config.Command, RedirectStandardInput = true, RedirectStandardOutput = true, RedirectStandardError = true, UseShellExecute = false, CreateNoWindow = true, WorkingDirectory = workingDirectory, }; foreach (var arg in config.Args) psi.ArgumentList.Add(arg); // 合并环境变量 if (config.Env != null) foreach (var (key, value) in config.Env) psi.Environment[key] = value; _process = new Process { StartInfo = psi }; } public async Task ConnectAsync(CancellationToken ct = default) { _process.Start(); // 后台读取 stdout → 解析 JSON-RPC 消息 → 写入 channel _ = Task.Run(async () => { using var reader = new StreamReader(_process.StandardOutput.BaseStream, System.Text.Encoding.UTF8); while (!reader.EndOfStream && !ct.IsCancellationRequested) { var line = await reader.ReadLineAsync(ct); if (string.IsNullOrEmpty(line)) continue; var message = JsonSerializer.Deserialize(line); if (message != null) await _incoming.Writer.WriteAsync(message, ct); } }, ct); } public async Task SendAsync(JsonRpcMessage message, CancellationToken ct = default) { var json = JsonSerializer.Serialize(message); await _process.StandardInput.WriteLineAsync(json.AsMemory(), ct); await _process.StandardInput.FlushAsync(); } public IAsyncEnumerable ListenAsync(CancellationToken ct = default) => _incoming.Reader.ReadAllAsync(ct); public bool IsConnected => !_process.HasExited; public async ValueTask DisposeAsync() { if (!_process.HasExited) { _process.StandardInput.Close(); if (!_process.WaitForExit(5000)) _process.Kill(entireProcessTree: true); } _incoming.Writer.TryComplete(); } } /// /// SSE 传输 — HTTP Server-Sent Events /// 对应原始 SSEClientTransport /// public sealed class SseTransport : IMcpTransport { private readonly HttpClient _httpClient; private readonly string _url; private readonly Channel _incoming = Channel.CreateUnbounded(); private string? _messageEndpoint; // 服务器提供的 POST 端点 public async Task ConnectAsync(CancellationToken ct = default) { // GET SSE 流 → 解析 endpoint 事件 → 开始监听 var response = await _httpClient.GetAsync(_url, HttpCompletionOption.ResponseHeadersRead, ct); using var stream = await response.Content.ReadAsStreamAsync(ct); using var reader = new StreamReader(stream); while (!reader.EndOfStream && !ct.IsCancellationRequested) { var line = await reader.ReadLineAsync(ct); if (string.IsNullOrEmpty(line)) continue; if (line.StartsWith("event: endpoint")) { var dataLine = await reader.ReadLineAsync(ct); if (dataLine?.StartsWith("data: ") == true) _messageEndpoint = dataLine[6..]; } else if (line.StartsWith("data: ")) { var json = line[6..]; var message = JsonSerializer.Deserialize(json); if (message != null) await _incoming.Writer.WriteAsync(message, ct); } } } public async Task SendAsync(JsonRpcMessage message, CancellationToken ct = default) { if (_messageEndpoint == null) throw new InvalidOperationException("SSE transport not connected"); var content = JsonContent.Create(message); await _httpClient.PostAsync(_messageEndpoint, content, ct); } public IAsyncEnumerable ListenAsync(CancellationToken ct = default) => _incoming.Reader.ReadAllAsync(ct); public bool IsConnected => _messageEndpoint != null; } /// /// Streamable HTTP 传输 — MCP 2025-03-26 规范 /// 对应原始 StreamableHTTPClientTransport /// POST 发送消息,响应可能是 JSON 或 SSE /// public sealed class StreamableHttpTransport : IMcpTransport { private readonly HttpClient _httpClient; private readonly string _url; private readonly Channel _incoming = Channel.CreateUnbounded(); private string? _sessionId; // 服务器分配的 Mcp-Session-Id public async Task ConnectAsync(CancellationToken ct = default) { // 发送 initialize JSON-RPC 请求 var initRequest = new JsonRpcRequest { Id = "init-1", Method = "initialize", Params = new { protocolVersion = "2025-03-26", capabilities = new { }, clientInfo = new { name = "free-code", version = "1.0" } } }; var response = await SendInternalAsync(initRequest, ct); // 从响应头提取 session ID if (response.Headers.Contains("Mcp-Session-Id")) _sessionId = response.Headers.GetValues("Mcp-Session-Id").First(); } private async Task SendInternalAsync( JsonRpcRequest request, CancellationToken ct) { using var content = JsonContent.Create(request); var httpReq = new HttpRequestMessage(HttpMethod.Post, _url) { Content = content }; // MCP Streamable HTTP 规范: Accept 必须包含 application/json 和 text/event-stream httpReq.Headers.Accept.Add(new("application/json")); httpReq.Headers.Accept.Add(new("text/event-stream")); if (_sessionId != null) httpReq.Headers.Add("Mcp-Session-Id", _sessionId); return await _httpClient.SendAsync(httpReq, HttpCompletionOption.ResponseHeadersRead, ct); } // ... SendAsync 和 ListenAsync 实现类似 SSE 但处理 JSON/SSE 双模式响应 } /// /// WebSocket 传输 — 对应原始 WebSocketTransport (自定义实现) /// public sealed class WebSocketTransport : IMcpTransport { private readonly ClientWebSocket _webSocket; private readonly string _url; private readonly Channel _incoming = Channel.CreateUnbounded(); public async Task ConnectAsync(CancellationToken ct = default) { await _webSocket.ConnectAsync(new Uri(_url), ct); _ = ReceiveLoopAsync(ct); } private async Task ReceiveLoopAsync(CancellationToken ct) { var buffer = new byte[8192]; while (_webSocket.State == WebSocketState.Open && !ct.IsCancellationRequested) { using var ms = new MemoryStream(); WebSocketReceiveResult result; do { result = await _webSocket.ReceiveAsync(buffer, ct); await ms.WriteAsync(buffer, 0, result.Count, ct); } while (!result.EndOfMessage); var json = System.Text.Encoding.UTF8.GetString(ms.ToArray()); var message = JsonSerializer.Deserialize(json); if (message != null) await _incoming.Writer.WriteAsync(message, ct); } } public async Task SendAsync(JsonRpcMessage message, CancellationToken ct = default) { var json = JsonSerializer.Serialize(message); var bytes = System.Text.Encoding.UTF8.GetBytes(json); await _webSocket.SendAsync(bytes, WebSocketMessageType.Text, true, ct); } public IAsyncEnumerable ListenAsync(CancellationToken ct = default) => _incoming.Reader.ReadAllAsync(ct); public bool IsConnected => _webSocket.State == WebSocketState.Open; } /// /// 进程内传输 — 对应原始 InProcessTransport (linked transport pair) /// 用于 Chrome/Computer Use MCP 服务器在同一进程运行 /// public sealed class InProcessTransport : IMcpTransport { private readonly Channel _serverToClient = Channel.CreateUnbounded(); private readonly Channel _clientToServer = Channel.CreateUnbounded(); // 创建一对连接的传输(客户端 + 服务器各一个) public static (InProcessTransport client, InProcessTransport server) CreateLinkedPair() { var client = new InProcessTransport(); var server = new InProcessTransport(); // 交叉连接 channel: client writes → server reads, vice versa client._outgoing = server._serverToClient.Writer; server._outgoing = client._clientToServer.Writer; return (client, server); } private ChannelWriter? _outgoing; public IAsyncEnumerable ListenAsync(CancellationToken ct = default) => _serverToClient.Reader.ReadAllAsync(ct); // ... } ``` ## 10.5 McpClient 核心 ```csharp /// /// JSON-RPC 2.0 MCP 客户端 /// 对应原始 @modelcontextprotocol/sdk Client /// public sealed class McpClient { private readonly IMcpTransport _transport; private int _requestId; private readonly ConcurrentDictionary> _pending = new(); public ServerCapabilities? Capabilities { get; private set; } public ServerInfo? ServerInfo { get; private set; } public string? Instructions { get; private set; } public async Task ConnectAsync(CancellationToken ct = default) { await _transport.ConnectAsync(ct); // 发送 initialize → 接收 response var response = await SendRequestAsync("initialize", new { protocolVersion = "2025-03-26", capabilities = new { roots = new { }, elicitation = new { } }, clientInfo = new { name = "free-code", version = "1.0" } }, ct); Capabilities = DeserializeCapabilities(response); ServerInfo = DeserializeServerInfo(response); Instructions = DeserializeInstructions(response); // 发送 initialized 通知 await SendNotificationAsync("notifications/initialized", null, ct); } public async Task ListToolsAsync(CancellationToken ct = default) { var response = await SendRequestAsync("tools/list", new { }, ct); return Deserialize(response); } public async Task CallToolAsync( string toolName, JsonElement? arguments = null, CancellationToken ct = default) { var response = await SendRequestAsync("tools/call", new { name = toolName, arguments }, ct); return Deserialize(response); } public async Task ListResourcesAsync(CancellationToken ct = default) { var response = await SendRequestAsync("resources/list", new { }, ct); return Deserialize(response); } public async Task ReadResourceAsync( string uri, CancellationToken ct = default) { var response = await SendRequestAsync("resources/read", new { uri }, ct); return Deserialize(response); } public async Task ListPromptsAsync(CancellationToken ct = default) { var response = await SendRequestAsync("prompts/list", new { }, ct); return Deserialize(response); } private async Task SendRequestAsync( string method, object? @params, CancellationToken ct) { var id = Interlocked.Increment(ref _requestId).ToString(); var tcs = new TaskCompletionSource(); _pending[id] = tcs; var message = new JsonRpcRequest { Id = id, Method = method, Params = @params }; await _transport.SendAsync(message, ct); using var reg = ct.Register(() => tcs.TrySetCanceled(ct)); var response = await tcs.Task; return response.Result; } private async Task SendNotificationAsync( string method, object? @params, CancellationToken ct) { var message = new JsonRpcNotification { Method = method, Params = @params }; await _transport.SendAsync(message, ct); } } ``` ## 10.6 McpClientManager 实现 ```csharp public class McpClientManager : IMcpClientManager { private readonly IServiceProvider _services; private readonly IConfiguration _config; private readonly IFeatureFlagService _features; private readonly IAppStateStore _stateStore; private readonly ILogger _logger; private readonly ConcurrentDictionary _connections = new(); /// /// 连接所有配置的 MCP 服务器 /// 对应原始 useManageMCPConnections.ts 的 effect /// public async Task ConnectServersAsync(CancellationToken ct = default) { var configs = await LoadAllServerConfigsAsync(); var localServers = configs.Where(c => IsLocalServer(c.Value)).ToList(); var remoteServers = configs.Where(c => !IsLocalServer(c.Value)).ToList(); // 本地服务器: 批量连接 (默认 concurrency: 3) var localBatchSize = GetLocalBatchSize(); // MCP_SERVER_CONNECTION_BATCH_SIZE await Parallel.ForEachAsync(localServers, new ParallelOptions { MaxDegreeOfParallelism = localBatchSize, CancellationToken = ct }, async (kvp, _) => { var connection = await ConnectToServerAsync(kvp.Key, kvp.Value, ct); _connections[kvp.Key] = connection; UpdateState(); }); // 远程服务器: 批量连接 (默认 concurrency: 20) var remoteBatchSize = GetRemoteBatchSize(); // MCP_REMOTE_SERVER_CONNECTION_BATCH_SIZE await Parallel.ForEachAsync(remoteServers, new ParallelOptions { MaxDegreeOfParallelism = remoteBatchSize, CancellationToken = ct }, async (kvp, _) => { var connection = await ConnectToServerAsync(kvp.Key, kvp.Value, ct); _connections[kvp.Key] = connection; UpdateState(); }); } private async Task ConnectToServerAsync( string name, ScopedMcpServerConfig config, CancellationToken ct) { try { // 1. 创建传输层 var transport = CreateTransport(name, config); // 2. 创建客户端并连接 var client = new McpClient(transport); var timeout = GetConnectionTimeout(); // MCP_TIMEOUT 环境变量, 默认 30s using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); timeoutCts.CancelAfter(timeout); await client.ConnectAsync(timeoutCts.Token); // 3. 获取能力、工具、资源 var tools = await client.ListToolsAsync(ct); var capabilities = client.Capabilities!; // 4. 截断过长的 instructions (MAX_MCP_DESCRIPTION_LENGTH = 2048) var instructions = client.Instructions; if (instructions?.Length > 2048) instructions = instructions[..2048] + "… [truncated]"; return new MCPServerConnection.Connected(name, config, client, capabilities, client.ServerInfo, instructions, async () => await transport.CloseAsync()); } catch (UnauthorizedException) { // SSE/HTTP/claudeai-proxy: 需要认证 return new MCPServerConnection.NeedsAuthentication(name, config); } catch (Exception ex) { _logger.LogWarning(ex, "MCP server {Name} connection failed", name); return new MCPServerConnection.Failed(name, config, ex.Message); } } private IMcpTransport CreateTransport(string name, ScopedMcpServerConfig config) { return config switch { StdioServerConfig stdio => new StdioTransport(stdio, Directory.GetCurrentDirectory()), SseServerConfig sse => new SseTransport( CreateHttpClient(sse.Headers), sse.Url), SseIdeServerConfig sseIde => new SseTransport( new HttpClient(), sseIde.Url), HttpServerConfig http => new StreamableHttpTransport( CreateHttpClient(http.Headers), http.Url), WsIdeServerConfig wsIde => new WebSocketTransport(wsIde.Url, wsIde.AuthToken), WebSocketServerConfig ws => new WebSocketTransport(ws.Url, null, ws.Headers), ClaudeAiProxyServerConfig proxy => new StreamableHttpTransport( CreateProxyHttpClient(), proxy.Url), _ => throw new InvalidOperationException($"Unsupported server type: {config}") }; } /// /// 获取所有工具并适配为 ITool 接口 /// 对应原始 MCPTool 适配器 /// public async Task> GetToolsAsync() { var tools = new List(); foreach (var connection in _connections.Values) { if (connection is MCPServerConnection.Connected connected) { var mcpTools = await connected.Client.ListToolsAsync(); foreach (var tool in mcpTools.Tools) { tools.Add(new McpToolWrapper(connected.Name, tool, connected.Client)); } } } return tools; } /// 更新 AppState 中的 MCP 状态 private void UpdateState() { _stateStore.Update(state => state with { Mcp = state.Mcp with { Clients = _connections.Values.ToList(), Tools = GetToolsFromConnections(), } }); } private static bool IsLocalServer(ScopedMcpServerConfig config) => config is StdioServerConfig or SdkServerConfig; } /// /// MCP 工具包装器 — 将 MCP tool 适配为 ITool 接口 /// 对应原始 MCPTool.ts /// public sealed class McpToolWrapper : ITool { private readonly string _serverName; private readonly McpToolDefinition _definition; private readonly McpClient _client; public string Name => $"mcp__{_serverName}__{_definition.Name}"; public ToolCategory Category => ToolCategory.Mcp; public async Task> ExecuteAsync( JsonElement input, ToolExecutionContext context, CancellationToken ct) { var timeout = GetMcpToolTimeout(); // MCP_TOOL_TIMEOUT, 默认 ~27.8h using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct); timeoutCts.CancelAfter(timeout); try { var result = await _client.CallToolAsync(_definition.Name, input, timeoutCts.Token); // 处理二进制内容 → 持久化到文件 if (result.HasBinaryContent) { var persisted = await PersistBinaryContentAsync(result); return new ToolResult(persisted); } return new ToolResult(result.Content); } catch (McpAuthException) { // 标记服务器需要认证 throw; } } public JsonElement GetInputSchema() => _definition.InputSchema; public bool IsReadOnly(JsonElement input) => !_definition.HasDestructiveBehavior; public bool IsConcurrencySafe(JsonElement input) => false; } ``` ## 10.7 McpAuthService ```csharp /// /// MCP OAuth 认证 — 对应原始 auth.ts 中的 ClaudeAuthProvider /// public sealed class McpAuthService { private readonly IdentityModel.OidcClient _oidcClient; private readonly ISecureTokenStorage _tokenStorage; // macOS Keychain / credential manager /// /// 执行 OAuth 授权流程: 发现 → 浏览器授权 → code 交换 → token 存储 /// public async Task AuthorizeAsync( string serverName, McpOAuthConfig config, CancellationToken ct) { // 1. 检查缓存的 token var cached = await _tokenStorage.GetAsync($"mcp-{serverName}"); if (cached != null && !cached.IsExpired) return cached; // 2. 发现授权服务器元数据 var metadata = await DiscoverAuthorizationServerAsync(config.AuthServerMetadataUrl!, ct); // 3. 启动本地 HTTP 监听器 var callbackPort = config.CallbackPort ?? GetAvailablePort(); using var listener = new HttpListener(); listener.Prefixes.Add($"http://localhost:{callbackPort}/"); listener.Start(); // 4. 构建授权 URL → 打开浏览器 var authUrl = BuildAuthorizationUrl(metadata, callbackPort); OpenBrowser(authUrl); // 5. 等待回调 → 提取 code var context = await listener.GetContextAsync(); var code = context.Request.QueryString["code"]; // 6. 交换 token var tokens = await ExchangeCodeAsync(metadata, code, callbackPort, ct); await _tokenStorage.SetAsync($"mcp-{serverName}", tokens); return tokens; } } ```