322 lines
11 KiB
Markdown
322 lines
11 KiB
Markdown
# 基础设施设计 — 后台任务管理
|
||
|
||
## 文档元数据
|
||
- 项目名称: free-code
|
||
- 文档类型: 基础设施设计
|
||
- 原始代码来源: `../../src/tasks/`(10个文件)
|
||
- 原始设计意图: 将 Shell、Agent、远程会话、工作流、监控与记忆合并任务统一纳入后台调度和状态同步框架
|
||
- 交叉引用: [基础设施设计总览](基础设施设计.md) | [服务子系统设计-会话记忆](../服务子系统设计/服务子系统设计-会话记忆与上下文.md)
|
||
|
||
## 设计目标
|
||
后台任务管理层统一承载本地 shell、agent、远程 agent、监控与工作流型任务,并保证可调度、可观测与可取消。它必须和全局状态存储、查询引擎、桥接层协同工作,且在宿主关闭时能优雅终止。
|
||
|
||
## 13.1 BackgroundTask 任务层次
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// 后台任务基类 — 所有任务类型的公共抽象
|
||
/// 对应原始 types.ts 中的 TaskState 联合类型
|
||
/// </summary>
|
||
public abstract record BackgroundTask
|
||
{
|
||
public required string TaskId { get; init; }
|
||
public abstract BackgroundTaskType TaskType { get; }
|
||
public TaskStatus Status { get; set; } = TaskStatus.Pending;
|
||
public DateTime? StartedAt { get; set; }
|
||
public DateTime? CompletedAt { get; set; }
|
||
public string? ErrorMessage { get; set; }
|
||
public bool IsBackgrounded { get; set; } = true;
|
||
}
|
||
|
||
public enum BackgroundTaskType
|
||
{
|
||
LocalShell,
|
||
LocalAgent,
|
||
RemoteAgent,
|
||
InProcessTeammate,
|
||
LocalWorkflow,
|
||
MonitorMcp,
|
||
Dream
|
||
}
|
||
|
||
public enum TaskStatus { Pending, Running, Completed, Failed, Stopped }
|
||
|
||
// === 具体任务类型 ===
|
||
|
||
/// <summary>
|
||
/// 本地 Shell 任务 — 后台 bash 命令
|
||
/// 对应原始 LocalShellTask
|
||
/// </summary>
|
||
public sealed record LocalShellTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalShell;
|
||
public required string Command { get; init; }
|
||
public ProcessStartInfo? ProcessStartInfo { get; init; }
|
||
public string? Stdout { get; set; }
|
||
public string? Stderr { get; set; }
|
||
public int? ExitCode { get; set; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 本地 Agent 任务 — 子代理(forked process 或 worktree 隔离)
|
||
/// 对应原始 LocalAgentTask
|
||
/// </summary>
|
||
public sealed record LocalAgentTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalAgent;
|
||
public required string Prompt { get; init; }
|
||
public string? Model { get; init; }
|
||
public string? AgentType { get; init; } // explore, librarian, oracle, metis, momus
|
||
public string? WorkingDirectory { get; init; }
|
||
public List<Message> Messages { get; } = new();
|
||
}
|
||
|
||
/// <summary>
|
||
/// 远程 Agent 任务 — claude.ai 上的远程会话
|
||
/// 对应原始 RemoteAgentTask
|
||
/// </summary>
|
||
public sealed record RemoteAgentTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.RemoteAgent;
|
||
public required string SessionUrl { get; init; }
|
||
public string? Plan { get; set; }
|
||
public string? Status { get; set; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 进程内 Teammate 任务 — 协作代理
|
||
/// 对应原始 InProcessTeammateTask
|
||
/// </summary>
|
||
public sealed record InProcessTeammateTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.InProcessTeammate;
|
||
public required string AgentName { get; init; }
|
||
public string? AgentType { get; init; }
|
||
public string? Color { get; init; }
|
||
public required string WorkingDirectory { get; init; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// 本地工作流任务 — 多步骤工作流
|
||
/// 对应原始 LocalWorkflowTask
|
||
/// </summary>
|
||
public sealed record LocalWorkflowTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalWorkflow;
|
||
public required string WorkflowName { get; init; }
|
||
public required List<WorkflowStep> Steps { get; init; }
|
||
public int CurrentStepIndex { get; set; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// MCP SSE 监控任务 — 监控 MCP 服务器连接状态
|
||
/// 对应原始 MonitorMcpTask
|
||
/// </summary>
|
||
public sealed record MonitorMcpTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.MonitorMcp;
|
||
public required string ServerName { get; init; }
|
||
public int ReconnectAttempt { get; set; }
|
||
}
|
||
|
||
/// <summary>
|
||
/// Dream 任务 — 后台记忆合并
|
||
/// 对应原始 DreamTask
|
||
/// </summary>
|
||
public sealed record DreamTask : BackgroundTask
|
||
{
|
||
public override BackgroundTaskType TaskType => BackgroundTaskType.Dream;
|
||
public required string TriggerReason { get; init; } // time | session_count
|
||
}
|
||
```
|
||
|
||
## 13.2 IBackgroundTaskManager 接口
|
||
|
||
```csharp
|
||
/// <summary>
|
||
/// 后台任务管理器 — Channel-based 任务调度
|
||
/// 对应原始 task 工具 + 后台任务 UI 指示器
|
||
/// </summary>
|
||
public interface IBackgroundTaskManager
|
||
{
|
||
/// <summary>创建 Shell 任务</summary>
|
||
Task<LocalShellTask> CreateShellTaskAsync(string command, ProcessStartInfo psi);
|
||
|
||
/// <summary>创建 Agent 任务</summary>
|
||
Task<LocalAgentTask> CreateAgentTaskAsync(string prompt, string? agentType, string? model);
|
||
|
||
/// <summary>创建远程 Agent 任务</summary>
|
||
Task<RemoteAgentTask> CreateRemoteAgentTaskAsync(string sessionUrl);
|
||
|
||
/// <summary>创建 Dream 任务</summary>
|
||
Task<DreamTask> CreateDreamTaskAsync(string triggerReason);
|
||
|
||
/// <summary>停止任务</summary>
|
||
Task StopTaskAsync(string taskId);
|
||
|
||
/// <summary>获取任务输出</summary>
|
||
Task<string?> GetTaskOutputAsync(string taskId);
|
||
|
||
/// <summary>列出所有任务</summary>
|
||
IReadOnlyList<BackgroundTask> ListTasks();
|
||
|
||
/// <summary>获取指定任务</summary>
|
||
BackgroundTask? GetTask(string taskId);
|
||
|
||
/// <summary>任务状态变更事件</summary>
|
||
event EventHandler<TaskStateChangedEventArgs>? TaskStateChanged;
|
||
}
|
||
```
|
||
|
||
## 13.3 BackgroundTaskManager 实现
|
||
|
||
```csharp
|
||
public class BackgroundTaskManager : IBackgroundTaskManager, IHostedService
|
||
{
|
||
private readonly Channel<BackgroundTask> _taskChannel = Channel.CreateUnbounded<BackgroundTask>();
|
||
private readonly ConcurrentDictionary<string, BackgroundTask> _tasks = new();
|
||
private readonly IAppStateStore _stateStore;
|
||
private readonly IServiceProvider _services;
|
||
private readonly ILogger<BackgroundTaskManager> _logger;
|
||
private readonly CancellationTokenSource _shutdownCts = new();
|
||
|
||
public event EventHandler<TaskStateChangedEventArgs>? TaskStateChanged;
|
||
|
||
public async Task StartAsync(CancellationToken ct)
|
||
{
|
||
// 启动任务调度循环
|
||
_ = Task.Run(() => DispatchLoopAsync(_shutdownCts.Token), _shutdownCts.Token);
|
||
await Task.CompletedTask;
|
||
}
|
||
|
||
/// <summary>
|
||
/// 任务调度循环 — 从 channel 读取任务并执行
|
||
/// </summary>
|
||
private async Task DispatchLoopAsync(CancellationToken ct)
|
||
{
|
||
await foreach (var task in _taskChannel.Reader.ReadAllAsync(ct))
|
||
{
|
||
_ = Task.Run(async () =>
|
||
{
|
||
task.Status = TaskStatus.Running;
|
||
task.StartedAt = DateTime.UtcNow;
|
||
UpdateState();
|
||
TaskStateChanged?.Invoke(this, new(task.TaskId, task.Status));
|
||
|
||
try
|
||
{
|
||
await ExecuteTaskAsync(task, ct);
|
||
task.Status = TaskStatus.Completed;
|
||
}
|
||
catch (OperationCanceledException)
|
||
{
|
||
task.Status = TaskStatus.Stopped;
|
||
}
|
||
catch (Exception ex)
|
||
{
|
||
task.Status = TaskStatus.Failed;
|
||
task.ErrorMessage = ex.Message;
|
||
_logger.LogError(ex, "Task {TaskId} failed", task.TaskId);
|
||
}
|
||
finally
|
||
{
|
||
task.CompletedAt = DateTime.UtcNow;
|
||
UpdateState();
|
||
TaskStateChanged?.Invoke(this, new(task.TaskId, task.Status));
|
||
}
|
||
}, ct);
|
||
}
|
||
}
|
||
|
||
private Task ExecuteTaskAsync(BackgroundTask task, CancellationToken ct) => task switch
|
||
{
|
||
LocalShellTask shell => ExecuteShellTaskAsync(shell, ct),
|
||
LocalAgentTask agent => ExecuteAgentTaskAsync(agent, ct),
|
||
RemoteAgentTask remote => ExecuteRemoteAgentTaskAsync(remote, ct),
|
||
DreamTask dream => ExecuteDreamTaskAsync(dream, ct),
|
||
MonitorMcpTask monitor => ExecuteMonitorTaskAsync(monitor, ct),
|
||
LocalWorkflowTask workflow => ExecuteWorkflowTaskAsync(workflow, ct),
|
||
InProcessTeammateTask teammate => ExecuteTeammateTaskAsync(teammate, ct),
|
||
_ => Task.CompletedTask
|
||
};
|
||
|
||
private async Task ExecuteShellTaskAsync(LocalShellTask task, CancellationToken ct)
|
||
{
|
||
using var process = new Process { StartInfo = task.ProcessStartInfo! };
|
||
process.Start();
|
||
|
||
var stdout = await process.StandardOutput.ReadToEndAsync(ct);
|
||
var stderr = await process.StandardError.ReadToEndAsync(ct);
|
||
await process.WaitForExitAsync(ct);
|
||
|
||
task.Stdout = stdout;
|
||
task.Stderr = stderr;
|
||
task.ExitCode = process.ExitCode;
|
||
}
|
||
|
||
private async Task ExecuteAgentTaskAsync(LocalAgentTask task, CancellationToken ct)
|
||
{
|
||
// 使用 QueryEngine 在隔离 context 中执行子代理查询
|
||
var engine = _services.GetRequiredService<IQueryEngine>();
|
||
await foreach (var msg in engine.SubmitMessageAsync(task.Prompt,
|
||
new(Model: task.Model), ct))
|
||
{
|
||
// 收集消息到 task.Messages
|
||
}
|
||
}
|
||
|
||
private async Task ExecuteDreamTaskAsync(DreamTask task, CancellationToken ct)
|
||
{
|
||
var dreamService = _services.GetRequiredService<IAutoDreamService>();
|
||
await dreamService.RunDreamCycleAsync(ct);
|
||
}
|
||
|
||
public Task<LocalShellTask> CreateShellTaskAsync(string command, ProcessStartInfo psi)
|
||
{
|
||
var task = new LocalShellTask
|
||
{
|
||
TaskId = Guid.NewGuid().ToString("N")[..8],
|
||
Command = command,
|
||
ProcessStartInfo = psi,
|
||
};
|
||
_tasks[task.TaskId] = task;
|
||
_taskChannel.Writer.TryWrite(task);
|
||
UpdateState();
|
||
return Task.FromResult(task);
|
||
}
|
||
|
||
public Task StopTaskAsync(string taskId)
|
||
{
|
||
if (_tasks.TryGetValue(taskId, out var task) && task.Status == TaskStatus.Running)
|
||
{
|
||
task.Status = TaskStatus.Stopped;
|
||
// 进程级取消由具体执行器处理
|
||
}
|
||
return Task.CompletedTask;
|
||
}
|
||
|
||
public IReadOnlyList<BackgroundTask> ListTasks() => _tasks.Values.ToList();
|
||
public BackgroundTask? GetTask(string taskId) => _tasks.GetValueOrDefault(taskId);
|
||
|
||
/// <summary>更新 AppState 中的任务状态</summary>
|
||
private void UpdateState()
|
||
{
|
||
_stateStore.Update(state =>
|
||
{
|
||
var tasks = state.Tasks.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
|
||
foreach (var task in _tasks.Values)
|
||
tasks[task.TaskId] = task;
|
||
return state with { Tasks = tasks };
|
||
});
|
||
}
|
||
|
||
public async Task StopAsync(CancellationToken ct)
|
||
{
|
||
_shutdownCts.Cancel();
|
||
_taskChannel.Writer.TryComplete();
|
||
foreach (var task in _tasks.Values.Where(t => t.Status == TaskStatus.Running))
|
||
await StopTaskAsync(task.TaskId);
|
||
}
|
||
}
|
||
```
|