803 lines
29 KiB
Markdown
803 lines
29 KiB
Markdown
# 基础设施设计 — MCP 协议集成
|
||
|
||
## 文档元数据
|
||
- 项目名称: free-code
|
||
- 文档类型: 基础设施设计
|
||
- 原始代码来源: `../../src/services/mcp/`(22个文件)
|
||
- 原始设计意图: 将 MCP 服务器、工具、资源与认证统一抽象为可管理的 .NET 协议层,并支持多传输、多作用域与自动适配
|
||
- 交叉引用: [基础设施设计总览](基础设施设计.md) | [核心模块设计-工具系统](../核心模块设计/核心模块设计-工具系统.md) | [原始代码映射](reference/原始代码映射-基础设施.md)
|
||
|
||
## 设计目标
|
||
MCP 协议层负责把外部工具、远程服务、认证流程与本地执行环境统一起来,向上提供稳定的 .NET 抽象。该层不仅要兼容多种 transport,还要把 MCP 工具/命令/资源适配为上层工具系统可消费的接口。
|
||
|
||
## 10.1 IMcpClientManager 接口定义
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// MCP 客户端管理器 — 管理多个 MCP 服务器连接
|
||
/// 对应原始 useManageMCPConnections.ts
|
||
/// </summary>
|
||
public interface IMcpClientManager
|
||
{
|
||
/// <summary>连接所有配置的 MCP 服务器</summary>
|
||
Task ConnectServersAsync(CancellationToken ct = default);
|
||
|
||
/// <summary>获取所有已连接服务器的工具(适配为 ITool)</summary>
|
||
Task<IReadOnlyList<ITool>> GetToolsAsync();
|
||
|
||
/// <summary>获取所有已连接服务器的命令(适配为 ICommand)</summary>
|
||
Task<IReadOnlyList<ICommand>> GetCommandsAsync();
|
||
|
||
/// <summary>列出指定服务器的资源</summary>
|
||
Task<IReadOnlyList<ServerResource>> ListResourcesAsync(
|
||
string? serverName = null, CancellationToken ct = default);
|
||
|
||
/// <summary>读取指定资源</summary>
|
||
Task<ResourceContent> ReadResourceAsync(
|
||
string serverName, string resourceUri, CancellationToken ct = default);
|
||
|
||
/// <summary>断开指定服务器</summary>
|
||
Task DisconnectServerAsync(string serverName);
|
||
|
||
/// <summary>重连指定服务器(用于断线恢复)</summary>
|
||
Task ReconnectServerAsync(string serverName);
|
||
|
||
/// <summary>获取所有服务器连接状态</summary>
|
||
IReadOnlyList<MCPServerConnection> GetConnections();
|
||
|
||
/// <summary>触发认证流程(OAuth)</summary>
|
||
Task AuthenticateServerAsync(string serverName);
|
||
|
||
/// <summary>重新加载所有配置并重连</summary>
|
||
Task ReloadAsync();
|
||
}
|
||
```
|
||
|
||
## 10.2 MCPServerConnection 抽象 record
|
||
|
||
对应原始 `types.ts` 中的 union type。
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// MCP 服务器连接状态 — 替代原始 TypeScript 联合类型
|
||
/// 原始: ConnectedMCPServer | FailedMCPServer | NeedsAuthMCPServer | PendingMCPServer | DisabledMCPServer
|
||
/// </summary>
|
||
public abstract record MCPServerConnection
|
||
{
|
||
public string Name { get; init; }
|
||
public string ConnectionType { get; init; }
|
||
public ScopedMcpServerConfig Config { get; init; }
|
||
|
||
// 类型判别(替代 TypeScript 可辨识联合)
|
||
public bool IsConnected => this is Connected;
|
||
public bool IsFailed => this is Failed;
|
||
public bool NeedsAuth => this is NeedsAuthentication;
|
||
public bool IsPending => this is Pending;
|
||
public bool IsDisabled => this is Disabled;
|
||
|
||
public sealed record Connected(
|
||
string Name,
|
||
ScopedMcpServerConfig Config,
|
||
McpClient Client,
|
||
ServerCapabilities Capabilities,
|
||
ServerInfo? ServerInfo,
|
||
string? Instructions,
|
||
Func<Task> Cleanup
|
||
) : MCPServerConnection { ConnectionType = "connected"; }
|
||
|
||
public sealed record Failed(
|
||
string Name,
|
||
ScopedMcpServerConfig Config,
|
||
string? Error
|
||
) : MCPServerConnection { ConnectionType = "failed"; }
|
||
|
||
public sealed record NeedsAuthentication(
|
||
string Name,
|
||
ScopedMcpServerConfig Config
|
||
) : MCPServerConnection { ConnectionType = "needs-auth"; }
|
||
|
||
public sealed record Pending(
|
||
string Name,
|
||
ScopedMcpServerConfig Config,
|
||
int? ReconnectAttempt = null,
|
||
int? MaxReconnectAttempts = null
|
||
) : MCPServerConnection { ConnectionType = "pending"; }
|
||
|
||
public sealed record Disabled(
|
||
string Name,
|
||
ScopedMcpServerConfig Config
|
||
) : MCPServerConnection { ConnectionType = "disabled"; }
|
||
}
|
||
```
|
||
|
||
## 10.3 ScopedMcpServerConfig 配置层级
|
||
|
||
对应原始 `types.ts` 中的 Zod schema 定义。
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// MCP 服务器配置 — 替代原始 8 种 Zod schema
|
||
/// 使用 FluentValidation 进行校验
|
||
/// </summary>
|
||
public abstract record ScopedMcpServerConfig
|
||
{
|
||
public required ConfigScope Scope { get; init; }
|
||
public string? PluginSource { get; init; }
|
||
}
|
||
|
||
public record StdioServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public string Command { get; init; } = "";
|
||
public IReadOnlyList<string> Args { get; init; } = [];
|
||
public IReadOnlyDictionary<string, string>? Env { get; init; }
|
||
}
|
||
|
||
public record SseServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string Url { get; init; }
|
||
public IReadOnlyDictionary<string, string>? Headers { get; init; }
|
||
public string? HeadersHelper { get; init; }
|
||
public McpOAuthConfig? OAuth { get; init; }
|
||
}
|
||
|
||
public record SseIdeServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string Url { get; init; }
|
||
public required string IdeName { get; init; }
|
||
public bool IdeRunningInWindows { get; init; }
|
||
}
|
||
|
||
public record WsIdeServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string Url { get; init; }
|
||
public required string IdeName { get; init; }
|
||
public string? AuthToken { get; init; }
|
||
public bool IdeRunningInWindows { get; init; }
|
||
}
|
||
|
||
public record HttpServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string Url { get; init; }
|
||
public IReadOnlyDictionary<string, string>? Headers { get; init; }
|
||
public string? HeadersHelper { get; init; }
|
||
public McpOAuthConfig? OAuth { get; init; }
|
||
}
|
||
|
||
public record WebSocketServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string Url { get; init; }
|
||
public IReadOnlyDictionary<string, string>? Headers { get; init; }
|
||
public string? HeadersHelper { get; init; }
|
||
}
|
||
|
||
public record SdkServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string ServerName { get; init; }
|
||
}
|
||
|
||
public record ClaudeAiProxyServerConfig : ScopedMcpServerConfig
|
||
{
|
||
public required string Url { get; init; }
|
||
public required string Id { get; init; }
|
||
}
|
||
|
||
public enum ConfigScope
|
||
{
|
||
Local, User, Project, Dynamic, Enterprise, ClaudeAi, Managed
|
||
}
|
||
|
||
public record McpOAuthConfig
|
||
{
|
||
public string? ClientId { get; init; }
|
||
public int? CallbackPort { get; init; }
|
||
public string? AuthServerMetadataUrl { get; init; }
|
||
public bool Xaa { get; init; }
|
||
}
|
||
```
|
||
|
||
## 10.4 传输层实现
|
||
|
||
对应原始 `@modelcontextprotocol/sdk` 中的多种传输 + 自定义 WebSocketTransport。
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// MCP 传输层抽象 — JSON-RPC 2.0 over various transports
|
||
/// </summary>
|
||
public interface IMcpTransport : IAsyncDisposable
|
||
{
|
||
Task ConnectAsync(CancellationToken ct = default);
|
||
Task SendAsync(JsonRpcMessage message, CancellationToken ct = default);
|
||
IAsyncEnumerable<JsonRpcMessage> ListenAsync(CancellationToken ct = default);
|
||
Task CloseAsync();
|
||
bool IsConnected { get; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Stdio 传输 — 子进程 stdin/stdout
|
||
/// 对应原始 StdioClientTransport
|
||
/// </summary>
|
||
public sealed class StdioTransport : IMcpTransport
|
||
{
|
||
private readonly Process _process;
|
||
private readonly Channel<JsonRpcMessage> _incoming = Channel.CreateUnbounded<JsonRpcMessage>();
|
||
|
||
public StdioTransport(StdioServerConfig config, string workingDirectory)
|
||
{
|
||
var psi = new ProcessStartInfo
|
||
{
|
||
FileName = config.Command,
|
||
RedirectStandardInput = true,
|
||
RedirectStandardOutput = true,
|
||
RedirectStandardError = true,
|
||
UseShellExecute = false,
|
||
CreateNoWindow = true,
|
||
WorkingDirectory = workingDirectory,
|
||
};
|
||
|
||
foreach (var arg in config.Args)
|
||
psi.ArgumentList.Add(arg);
|
||
|
||
// 合并环境变量
|
||
if (config.Env != null)
|
||
foreach (var (key, value) in config.Env)
|
||
psi.Environment[key] = value;
|
||
|
||
_process = new Process { StartInfo = psi };
|
||
}
|
||
|
||
public async Task ConnectAsync(CancellationToken ct = default)
|
||
{
|
||
_process.Start();
|
||
|
||
// 后台读取 stdout → 解析 JSON-RPC 消息 → 写入 channel
|
||
_ = Task.Run(async () =>
|
||
{
|
||
using var reader = new StreamReader(_process.StandardOutput.BaseStream,
|
||
System.Text.Encoding.UTF8);
|
||
while (!reader.EndOfStream && !ct.IsCancellationRequested)
|
||
{
|
||
var line = await reader.ReadLineAsync(ct);
|
||
if (string.IsNullOrEmpty(line)) continue;
|
||
var message = JsonSerializer.Deserialize<JsonRpcMessage>(line);
|
||
if (message != null)
|
||
await _incoming.Writer.WriteAsync(message, ct);
|
||
}
|
||
}, ct);
|
||
}
|
||
|
||
public async Task SendAsync(JsonRpcMessage message, CancellationToken ct = default)
|
||
{
|
||
var json = JsonSerializer.Serialize(message);
|
||
await _process.StandardInput.WriteLineAsync(json.AsMemory(), ct);
|
||
await _process.StandardInput.FlushAsync();
|
||
}
|
||
|
||
public IAsyncEnumerable<JsonRpcMessage> ListenAsync(CancellationToken ct = default)
|
||
=> _incoming.Reader.ReadAllAsync(ct);
|
||
|
||
public bool IsConnected => !_process.HasExited;
|
||
|
||
public async ValueTask DisposeAsync()
|
||
{
|
||
if (!_process.HasExited)
|
||
{
|
||
_process.StandardInput.Close();
|
||
if (!_process.WaitForExit(5000))
|
||
_process.Kill(entireProcessTree: true);
|
||
}
|
||
_incoming.Writer.TryComplete();
|
||
}
|
||
}
|
||
|
||
/// <summary>
|
||
/// SSE 传输 — HTTP Server-Sent Events
|
||
/// 对应原始 SSEClientTransport
|
||
/// </summary>
|
||
public sealed class SseTransport : IMcpTransport
|
||
{
|
||
private readonly HttpClient _httpClient;
|
||
private readonly string _url;
|
||
private readonly Channel<JsonRpcMessage> _incoming = Channel.CreateUnbounded<JsonRpcMessage>();
|
||
private string? _messageEndpoint; // 服务器提供的 POST 端点
|
||
|
||
public async Task ConnectAsync(CancellationToken ct = default)
|
||
{
|
||
// GET SSE 流 → 解析 endpoint 事件 → 开始监听
|
||
var response = await _httpClient.GetAsync(_url, HttpCompletionOption.ResponseHeadersRead, ct);
|
||
using var stream = await response.Content.ReadAsStreamAsync(ct);
|
||
using var reader = new StreamReader(stream);
|
||
|
||
while (!reader.EndOfStream && !ct.IsCancellationRequested)
|
||
{
|
||
var line = await reader.ReadLineAsync(ct);
|
||
if (string.IsNullOrEmpty(line)) continue;
|
||
|
||
if (line.StartsWith("event: endpoint"))
|
||
{
|
||
var dataLine = await reader.ReadLineAsync(ct);
|
||
if (dataLine?.StartsWith("data: ") == true)
|
||
_messageEndpoint = dataLine[6..];
|
||
}
|
||
else if (line.StartsWith("data: "))
|
||
{
|
||
var json = line[6..];
|
||
var message = JsonSerializer.Deserialize<JsonRpcMessage>(json);
|
||
if (message != null)
|
||
await _incoming.Writer.WriteAsync(message, ct);
|
||
}
|
||
}
|
||
}
|
||
|
||
public async Task SendAsync(JsonRpcMessage message, CancellationToken ct = default)
|
||
{
|
||
if (_messageEndpoint == null)
|
||
throw new InvalidOperationException("SSE transport not connected");
|
||
|
||
var content = JsonContent.Create(message);
|
||
await _httpClient.PostAsync(_messageEndpoint, content, ct);
|
||
}
|
||
|
||
public IAsyncEnumerable<JsonRpcMessage> ListenAsync(CancellationToken ct = default)
|
||
=> _incoming.Reader.ReadAllAsync(ct);
|
||
|
||
public bool IsConnected => _messageEndpoint != null;
|
||
}
|
||
|
||
/// <summary>
|
||
/// Streamable HTTP 传输 — MCP 2025-03-26 规范
|
||
/// 对应原始 StreamableHTTPClientTransport
|
||
/// POST 发送消息,响应可能是 JSON 或 SSE
|
||
/// </summary>
|
||
public sealed class StreamableHttpTransport : IMcpTransport
|
||
{
|
||
private readonly HttpClient _httpClient;
|
||
private readonly string _url;
|
||
private readonly Channel<JsonRpcMessage> _incoming = Channel.CreateUnbounded<JsonRpcMessage>();
|
||
private string? _sessionId; // 服务器分配的 Mcp-Session-Id
|
||
|
||
public async Task ConnectAsync(CancellationToken ct = default)
|
||
{
|
||
// 发送 initialize JSON-RPC 请求
|
||
var initRequest = new JsonRpcRequest
|
||
{
|
||
Id = "init-1",
|
||
Method = "initialize",
|
||
Params = new { protocolVersion = "2025-03-26", capabilities = new { }, clientInfo = new { name = "free-code", version = "1.0" } }
|
||
};
|
||
|
||
var response = await SendInternalAsync(initRequest, ct);
|
||
// 从响应头提取 session ID
|
||
if (response.Headers.Contains("Mcp-Session-Id"))
|
||
_sessionId = response.Headers.GetValues("Mcp-Session-Id").First();
|
||
}
|
||
|
||
private async Task<HttpResponseMessage> SendInternalAsync(
|
||
JsonRpcRequest request, CancellationToken ct)
|
||
{
|
||
using var content = JsonContent.Create(request);
|
||
var httpReq = new HttpRequestMessage(HttpMethod.Post, _url) { Content = content };
|
||
|
||
// MCP Streamable HTTP 规范: Accept 必须包含 application/json 和 text/event-stream
|
||
httpReq.Headers.Accept.Add(new("application/json"));
|
||
httpReq.Headers.Accept.Add(new("text/event-stream"));
|
||
|
||
if (_sessionId != null)
|
||
httpReq.Headers.Add("Mcp-Session-Id", _sessionId);
|
||
|
||
return await _httpClient.SendAsync(httpReq, HttpCompletionOption.ResponseHeadersRead, ct);
|
||
}
|
||
|
||
// ... SendAsync 和 ListenAsync 实现类似 SSE 但处理 JSON/SSE 双模式响应
|
||
}
|
||
|
||
/// <summary>
|
||
/// WebSocket 传输 — 对应原始 WebSocketTransport (自定义实现)
|
||
/// </summary>
|
||
public sealed class WebSocketTransport : IMcpTransport
|
||
{
|
||
private readonly ClientWebSocket _webSocket;
|
||
private readonly string _url;
|
||
private readonly Channel<JsonRpcMessage> _incoming = Channel.CreateUnbounded<JsonRpcMessage>();
|
||
|
||
public async Task ConnectAsync(CancellationToken ct = default)
|
||
{
|
||
await _webSocket.ConnectAsync(new Uri(_url), ct);
|
||
_ = ReceiveLoopAsync(ct);
|
||
}
|
||
|
||
private async Task ReceiveLoopAsync(CancellationToken ct)
|
||
{
|
||
var buffer = new byte[8192];
|
||
while (_webSocket.State == WebSocketState.Open && !ct.IsCancellationRequested)
|
||
{
|
||
using var ms = new MemoryStream();
|
||
WebSocketReceiveResult result;
|
||
do
|
||
{
|
||
result = await _webSocket.ReceiveAsync(buffer, ct);
|
||
await ms.WriteAsync(buffer, 0, result.Count, ct);
|
||
} while (!result.EndOfMessage);
|
||
|
||
var json = System.Text.Encoding.UTF8.GetString(ms.ToArray());
|
||
var message = JsonSerializer.Deserialize<JsonRpcMessage>(json);
|
||
if (message != null)
|
||
await _incoming.Writer.WriteAsync(message, ct);
|
||
}
|
||
}
|
||
|
||
public async Task SendAsync(JsonRpcMessage message, CancellationToken ct = default)
|
||
{
|
||
var json = JsonSerializer.Serialize(message);
|
||
var bytes = System.Text.Encoding.UTF8.GetBytes(json);
|
||
await _webSocket.SendAsync(bytes, WebSocketMessageType.Text, true, ct);
|
||
}
|
||
|
||
public IAsyncEnumerable<JsonRpcMessage> ListenAsync(CancellationToken ct = default)
|
||
=> _incoming.Reader.ReadAllAsync(ct);
|
||
|
||
public bool IsConnected => _webSocket.State == WebSocketState.Open;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 进程内传输 — 对应原始 InProcessTransport (linked transport pair)
|
||
/// 用于 Chrome/Computer Use MCP 服务器在同一进程运行
|
||
/// </summary>
|
||
public sealed class InProcessTransport : IMcpTransport
|
||
{
|
||
private readonly Channel<JsonRpcMessage> _serverToClient = Channel.CreateUnbounded<JsonRpcMessage>();
|
||
private readonly Channel<JsonRpcMessage> _clientToServer = Channel.CreateUnbounded<JsonRpcMessage>();
|
||
|
||
// 创建一对连接的传输(客户端 + 服务器各一个)
|
||
public static (InProcessTransport client, InProcessTransport server) CreateLinkedPair()
|
||
{
|
||
var client = new InProcessTransport();
|
||
var server = new InProcessTransport();
|
||
// 交叉连接 channel: client writes → server reads, vice versa
|
||
client._outgoing = server._serverToClient.Writer;
|
||
server._outgoing = client._clientToServer.Writer;
|
||
return (client, server);
|
||
}
|
||
|
||
private ChannelWriter<JsonRpcMessage>? _outgoing;
|
||
public IAsyncEnumerable<JsonRpcMessage> ListenAsync(CancellationToken ct = default)
|
||
=> _serverToClient.Reader.ReadAllAsync(ct);
|
||
// ...
|
||
}
|
||
```
|
||
|
||
## 10.5 McpClient 核心
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// JSON-RPC 2.0 MCP 客户端
|
||
/// 对应原始 @modelcontextprotocol/sdk Client
|
||
/// </summary>
|
||
public sealed class McpClient
|
||
{
|
||
private readonly IMcpTransport _transport;
|
||
private int _requestId;
|
||
private readonly ConcurrentDictionary<string, TaskCompletionSource<JsonRpcResponse>> _pending = new();
|
||
|
||
public ServerCapabilities? Capabilities { get; private set; }
|
||
public ServerInfo? ServerInfo { get; private set; }
|
||
public string? Instructions { get; private set; }
|
||
|
||
public async Task ConnectAsync(CancellationToken ct = default)
|
||
{
|
||
await _transport.ConnectAsync(ct);
|
||
|
||
// 发送 initialize → 接收 response
|
||
var response = await SendRequestAsync("initialize", new
|
||
{
|
||
protocolVersion = "2025-03-26",
|
||
capabilities = new { roots = new { }, elicitation = new { } },
|
||
clientInfo = new { name = "free-code", version = "1.0" }
|
||
}, ct);
|
||
|
||
Capabilities = DeserializeCapabilities(response);
|
||
ServerInfo = DeserializeServerInfo(response);
|
||
Instructions = DeserializeInstructions(response);
|
||
|
||
// 发送 initialized 通知
|
||
await SendNotificationAsync("notifications/initialized", null, ct);
|
||
}
|
||
|
||
public async Task<ListToolsResult> ListToolsAsync(CancellationToken ct = default)
|
||
{
|
||
var response = await SendRequestAsync("tools/list", new { }, ct);
|
||
return Deserialize<ListToolsResult>(response);
|
||
}
|
||
|
||
public async Task<CallToolResult> CallToolAsync(
|
||
string toolName, JsonElement? arguments = null, CancellationToken ct = default)
|
||
{
|
||
var response = await SendRequestAsync("tools/call", new
|
||
{
|
||
name = toolName,
|
||
arguments
|
||
}, ct);
|
||
return Deserialize<CallToolResult>(response);
|
||
}
|
||
|
||
public async Task<ListResourcesResult> ListResourcesAsync(CancellationToken ct = default)
|
||
{
|
||
var response = await SendRequestAsync("resources/list", new { }, ct);
|
||
return Deserialize<ListResourcesResult>(response);
|
||
}
|
||
|
||
public async Task<ReadResourceResult> ReadResourceAsync(
|
||
string uri, CancellationToken ct = default)
|
||
{
|
||
var response = await SendRequestAsync("resources/read", new { uri }, ct);
|
||
return Deserialize<ReadResourceResult>(response);
|
||
}
|
||
|
||
public async Task<ListPromptsResult> ListPromptsAsync(CancellationToken ct = default)
|
||
{
|
||
var response = await SendRequestAsync("prompts/list", new { }, ct);
|
||
return Deserialize<ListPromptsResult>(response);
|
||
}
|
||
|
||
private async Task<JsonElement> SendRequestAsync(
|
||
string method, object? @params, CancellationToken ct)
|
||
{
|
||
var id = Interlocked.Increment(ref _requestId).ToString();
|
||
var tcs = new TaskCompletionSource<JsonRpcResponse>();
|
||
_pending[id] = tcs;
|
||
|
||
var message = new JsonRpcRequest { Id = id, Method = method, Params = @params };
|
||
await _transport.SendAsync(message, ct);
|
||
|
||
using var reg = ct.Register(() => tcs.TrySetCanceled(ct));
|
||
var response = await tcs.Task;
|
||
return response.Result;
|
||
}
|
||
|
||
private async Task SendNotificationAsync(
|
||
string method, object? @params, CancellationToken ct)
|
||
{
|
||
var message = new JsonRpcNotification { Method = method, Params = @params };
|
||
await _transport.SendAsync(message, ct);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 10.6 McpClientManager 实现
|
||
|
||
```csharp
|
||
public class McpClientManager : IMcpClientManager
|
||
{
|
||
private readonly IServiceProvider _services;
|
||
private readonly IConfiguration _config;
|
||
private readonly IFeatureFlagService _features;
|
||
private readonly IAppStateStore _stateStore;
|
||
private readonly ILogger<McpClientManager> _logger;
|
||
private readonly ConcurrentDictionary<string, MCPServerConnection> _connections = new();
|
||
|
||
/// <summary>
|
||
/// 连接所有配置的 MCP 服务器
|
||
/// 对应原始 useManageMCPConnections.ts 的 effect
|
||
/// </summary>
|
||
public async Task ConnectServersAsync(CancellationToken ct = default)
|
||
{
|
||
var configs = await LoadAllServerConfigsAsync();
|
||
var localServers = configs.Where(c => IsLocalServer(c.Value)).ToList();
|
||
var remoteServers = configs.Where(c => !IsLocalServer(c.Value)).ToList();
|
||
|
||
// 本地服务器: 批量连接 (默认 concurrency: 3)
|
||
var localBatchSize = GetLocalBatchSize(); // MCP_SERVER_CONNECTION_BATCH_SIZE
|
||
await Parallel.ForEachAsync(localServers,
|
||
new ParallelOptions { MaxDegreeOfParallelism = localBatchSize, CancellationToken = ct },
|
||
async (kvp, _) =>
|
||
{
|
||
var connection = await ConnectToServerAsync(kvp.Key, kvp.Value, ct);
|
||
_connections[kvp.Key] = connection;
|
||
UpdateState();
|
||
});
|
||
|
||
// 远程服务器: 批量连接 (默认 concurrency: 20)
|
||
var remoteBatchSize = GetRemoteBatchSize(); // MCP_REMOTE_SERVER_CONNECTION_BATCH_SIZE
|
||
await Parallel.ForEachAsync(remoteServers,
|
||
new ParallelOptions { MaxDegreeOfParallelism = remoteBatchSize, CancellationToken = ct },
|
||
async (kvp, _) =>
|
||
{
|
||
var connection = await ConnectToServerAsync(kvp.Key, kvp.Value, ct);
|
||
_connections[kvp.Key] = connection;
|
||
UpdateState();
|
||
});
|
||
}
|
||
|
||
private async Task<MCPServerConnection> ConnectToServerAsync(
|
||
string name, ScopedMcpServerConfig config, CancellationToken ct)
|
||
{
|
||
try
|
||
{
|
||
// 1. 创建传输层
|
||
var transport = CreateTransport(name, config);
|
||
|
||
// 2. 创建客户端并连接
|
||
var client = new McpClient(transport);
|
||
var timeout = GetConnectionTimeout(); // MCP_TIMEOUT 环境变量, 默认 30s
|
||
|
||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||
timeoutCts.CancelAfter(timeout);
|
||
|
||
await client.ConnectAsync(timeoutCts.Token);
|
||
|
||
// 3. 获取能力、工具、资源
|
||
var tools = await client.ListToolsAsync(ct);
|
||
var capabilities = client.Capabilities!;
|
||
|
||
// 4. 截断过长的 instructions (MAX_MCP_DESCRIPTION_LENGTH = 2048)
|
||
var instructions = client.Instructions;
|
||
if (instructions?.Length > 2048)
|
||
instructions = instructions[..2048] + "… [truncated]";
|
||
|
||
return new MCPServerConnection.Connected(name, config, client,
|
||
capabilities, client.ServerInfo, instructions,
|
||
async () => await transport.CloseAsync());
|
||
}
|
||
catch (UnauthorizedException)
|
||
{
|
||
// SSE/HTTP/claudeai-proxy: 需要认证
|
||
return new MCPServerConnection.NeedsAuthentication(name, config);
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
_logger.LogWarning(ex, "MCP server {Name} connection failed", name);
|
||
return new MCPServerConnection.Failed(name, config, ex.Message);
|
||
}
|
||
}
|
||
|
||
private IMcpTransport CreateTransport(string name, ScopedMcpServerConfig config)
|
||
{
|
||
return config switch
|
||
{
|
||
StdioServerConfig stdio => new StdioTransport(stdio,
|
||
Directory.GetCurrentDirectory()),
|
||
SseServerConfig sse => new SseTransport(
|
||
CreateHttpClient(sse.Headers), sse.Url),
|
||
SseIdeServerConfig sseIde => new SseTransport(
|
||
new HttpClient(), sseIde.Url),
|
||
HttpServerConfig http => new StreamableHttpTransport(
|
||
CreateHttpClient(http.Headers), http.Url),
|
||
WsIdeServerConfig wsIde => new WebSocketTransport(wsIde.Url, wsIde.AuthToken),
|
||
WebSocketServerConfig ws => new WebSocketTransport(ws.Url, null, ws.Headers),
|
||
ClaudeAiProxyServerConfig proxy => new StreamableHttpTransport(
|
||
CreateProxyHttpClient(), proxy.Url),
|
||
_ => throw new InvalidOperationException($"Unsupported server type: {config}")
|
||
};
|
||
}
|
||
|
||
/// <summary>
|
||
/// 获取所有工具并适配为 ITool 接口
|
||
/// 对应原始 MCPTool 适配器
|
||
/// </summary>
|
||
public async Task<IReadOnlyList<ITool>> GetToolsAsync()
|
||
{
|
||
var tools = new List<ITool>();
|
||
foreach (var connection in _connections.Values)
|
||
{
|
||
if (connection is MCPServerConnection.Connected connected)
|
||
{
|
||
var mcpTools = await connected.Client.ListToolsAsync();
|
||
foreach (var tool in mcpTools.Tools)
|
||
{
|
||
tools.Add(new McpToolWrapper(connected.Name, tool, connected.Client));
|
||
}
|
||
}
|
||
}
|
||
return tools;
|
||
}
|
||
|
||
/// <summary>更新 AppState 中的 MCP 状态</summary>
|
||
private void UpdateState()
|
||
{
|
||
_stateStore.Update(state => state with
|
||
{
|
||
Mcp = state.Mcp with
|
||
{
|
||
Clients = _connections.Values.ToList(),
|
||
Tools = GetToolsFromConnections(),
|
||
}
|
||
});
|
||
}
|
||
|
||
private static bool IsLocalServer(ScopedMcpServerConfig config) =>
|
||
config is StdioServerConfig or SdkServerConfig;
|
||
}
|
||
|
||
/// <summary>
|
||
/// MCP 工具包装器 — 将 MCP tool 适配为 ITool 接口
|
||
/// 对应原始 MCPTool.ts
|
||
/// </summary>
|
||
public sealed class McpToolWrapper : ITool<JsonElement, JsonElement>
|
||
{
|
||
private readonly string _serverName;
|
||
private readonly McpToolDefinition _definition;
|
||
private readonly McpClient _client;
|
||
|
||
public string Name => $"mcp__{_serverName}__{_definition.Name}";
|
||
public ToolCategory Category => ToolCategory.Mcp;
|
||
|
||
public async Task<ToolResult<JsonElement>> ExecuteAsync(
|
||
JsonElement input, ToolExecutionContext context, CancellationToken ct)
|
||
{
|
||
var timeout = GetMcpToolTimeout(); // MCP_TOOL_TIMEOUT, 默认 ~27.8h
|
||
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
|
||
timeoutCts.CancelAfter(timeout);
|
||
|
||
try
|
||
{
|
||
var result = await _client.CallToolAsync(_definition.Name, input, timeoutCts.Token);
|
||
|
||
// 处理二进制内容 → 持久化到文件
|
||
if (result.HasBinaryContent)
|
||
{
|
||
var persisted = await PersistBinaryContentAsync(result);
|
||
return new ToolResult<JsonElement>(persisted);
|
||
}
|
||
|
||
return new ToolResult<JsonElement>(result.Content);
|
||
}
|
||
catch (McpAuthException)
|
||
{
|
||
// 标记服务器需要认证
|
||
throw;
|
||
}
|
||
}
|
||
|
||
public JsonElement GetInputSchema() => _definition.InputSchema;
|
||
public bool IsReadOnly(JsonElement input) => !_definition.HasDestructiveBehavior;
|
||
public bool IsConcurrencySafe(JsonElement input) => false;
|
||
}
|
||
```
|
||
|
||
## 10.7 McpAuthService
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// MCP OAuth 认证 — 对应原始 auth.ts 中的 ClaudeAuthProvider
|
||
/// </summary>
|
||
public sealed class McpAuthService
|
||
{
|
||
private readonly IdentityModel.OidcClient _oidcClient;
|
||
private readonly ISecureTokenStorage _tokenStorage; // macOS Keychain / credential manager
|
||
|
||
/// <summary>
|
||
/// 执行 OAuth 授权流程: 发现 → 浏览器授权 → code 交换 → token 存储
|
||
/// </summary>
|
||
public async Task<McpOAuthTokens> AuthorizeAsync(
|
||
string serverName, McpOAuthConfig config, CancellationToken ct)
|
||
{
|
||
// 1. 检查缓存的 token
|
||
var cached = await _tokenStorage.GetAsync($"mcp-{serverName}");
|
||
if (cached != null && !cached.IsExpired)
|
||
return cached;
|
||
|
||
// 2. 发现授权服务器元数据
|
||
var metadata = await DiscoverAuthorizationServerAsync(config.AuthServerMetadataUrl!, ct);
|
||
|
||
// 3. 启动本地 HTTP 监听器
|
||
var callbackPort = config.CallbackPort ?? GetAvailablePort();
|
||
using var listener = new HttpListener();
|
||
listener.Prefixes.Add($"http://localhost:{callbackPort}/");
|
||
listener.Start();
|
||
|
||
// 4. 构建授权 URL → 打开浏览器
|
||
var authUrl = BuildAuthorizationUrl(metadata, callbackPort);
|
||
OpenBrowser(authUrl);
|
||
|
||
// 5. 等待回调 → 提取 code
|
||
var context = await listener.GetContextAsync();
|
||
var code = context.Request.QueryString["code"];
|
||
|
||
// 6. 交换 token
|
||
var tokens = await ExchangeCodeAsync(metadata, code, callbackPort, ct);
|
||
await _tokenStorage.SetAsync($"mcp-{serverName}", tokens);
|
||
|
||
return tokens;
|
||
}
|
||
}
|
||
```
|