free-code-dotnet/docs/基础设施设计/基础设施设计-后台任务管理.md
应文浩wenhao.ying@xiaobao100.com e25ac591a7 init easy-code
2026-04-06 07:24:24 +08:00

322 lines
11 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 基础设施设计 — 后台任务管理
## 文档元数据
- 项目名称: free-code
- 文档类型: 基础设施设计
- 原始代码来源: `../../src/tasks/`10个文件
- 原始设计意图: 将 Shell、Agent、远程会话、工作流、监控与记忆合并任务统一纳入后台调度和状态同步框架
- 交叉引用: [基础设施设计总览](基础设施设计.md) | [服务子系统设计-会话记忆](../服务子系统设计/服务子系统设计-会话记忆与上下文.md)
## 设计目标
后台任务管理层统一承载本地 shell、agent、远程 agent、监控与工作流型任务并保证可调度、可观测与可取消。它必须和全局状态存储、查询引擎、桥接层协同工作且在宿主关闭时能优雅终止。
## 13.1 BackgroundTask 任务层次
```csharp
/// <summary>
/// 后台任务基类 — 所有任务类型的公共抽象
/// 对应原始 types.ts 中的 TaskState 联合类型
/// </summary>
public abstract record BackgroundTask
{
public required string TaskId { get; init; }
public abstract BackgroundTaskType TaskType { get; }
public TaskStatus Status { get; set; } = TaskStatus.Pending;
public DateTime? StartedAt { get; set; }
public DateTime? CompletedAt { get; set; }
public string? ErrorMessage { get; set; }
public bool IsBackgrounded { get; set; } = true;
}
public enum BackgroundTaskType
{
LocalShell,
LocalAgent,
RemoteAgent,
InProcessTeammate,
LocalWorkflow,
MonitorMcp,
Dream
}
public enum TaskStatus { Pending, Running, Completed, Failed, Stopped }
// === 具体任务类型 ===
/// <summary>
/// 本地 Shell 任务 — 后台 bash 命令
/// 对应原始 LocalShellTask
/// </summary>
public sealed record LocalShellTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalShell;
public required string Command { get; init; }
public ProcessStartInfo? ProcessStartInfo { get; init; }
public string? Stdout { get; set; }
public string? Stderr { get; set; }
public int? ExitCode { get; set; }
}
/// <summary>
/// 本地 Agent 任务 — 子代理forked process 或 worktree 隔离)
/// 对应原始 LocalAgentTask
/// </summary>
public sealed record LocalAgentTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalAgent;
public required string Prompt { get; init; }
public string? Model { get; init; }
public string? AgentType { get; init; } // explore, librarian, oracle, metis, momus
public string? WorkingDirectory { get; init; }
public List<Message> Messages { get; } = new();
}
/// <summary>
/// 远程 Agent 任务 — claude.ai 上的远程会话
/// 对应原始 RemoteAgentTask
/// </summary>
public sealed record RemoteAgentTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.RemoteAgent;
public required string SessionUrl { get; init; }
public string? Plan { get; set; }
public string? Status { get; set; }
}
/// <summary>
/// 进程内 Teammate 任务 — 协作代理
/// 对应原始 InProcessTeammateTask
/// </summary>
public sealed record InProcessTeammateTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.InProcessTeammate;
public required string AgentName { get; init; }
public string? AgentType { get; init; }
public string? Color { get; init; }
public required string WorkingDirectory { get; init; }
}
/// <summary>
/// 本地工作流任务 — 多步骤工作流
/// 对应原始 LocalWorkflowTask
/// </summary>
public sealed record LocalWorkflowTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalWorkflow;
public required string WorkflowName { get; init; }
public required List<WorkflowStep> Steps { get; init; }
public int CurrentStepIndex { get; set; }
}
/// <summary>
/// MCP SSE 监控任务 — 监控 MCP 服务器连接状态
/// 对应原始 MonitorMcpTask
/// </summary>
public sealed record MonitorMcpTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.MonitorMcp;
public required string ServerName { get; init; }
public int ReconnectAttempt { get; set; }
}
/// <summary>
/// Dream 任务 — 后台记忆合并
/// 对应原始 DreamTask
/// </summary>
public sealed record DreamTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.Dream;
public required string TriggerReason { get; init; } // time | session_count
}
```
## 13.2 IBackgroundTaskManager 接口
```csharp
/// <summary>
/// 后台任务管理器 — Channel-based 任务调度
/// 对应原始 task 工具 + 后台任务 UI 指示器
/// </summary>
public interface IBackgroundTaskManager
{
/// <summary>创建 Shell 任务</summary>
Task<LocalShellTask> CreateShellTaskAsync(string command, ProcessStartInfo psi);
/// <summary>创建 Agent 任务</summary>
Task<LocalAgentTask> CreateAgentTaskAsync(string prompt, string? agentType, string? model);
/// <summary>创建远程 Agent 任务</summary>
Task<RemoteAgentTask> CreateRemoteAgentTaskAsync(string sessionUrl);
/// <summary>创建 Dream 任务</summary>
Task<DreamTask> CreateDreamTaskAsync(string triggerReason);
/// <summary>停止任务</summary>
Task StopTaskAsync(string taskId);
/// <summary>获取任务输出</summary>
Task<string?> GetTaskOutputAsync(string taskId);
/// <summary>列出所有任务</summary>
IReadOnlyList<BackgroundTask> ListTasks();
/// <summary>获取指定任务</summary>
BackgroundTask? GetTask(string taskId);
/// <summary>任务状态变更事件</summary>
event EventHandler<TaskStateChangedEventArgs>? TaskStateChanged;
}
```
## 13.3 BackgroundTaskManager 实现
```csharp
public class BackgroundTaskManager : IBackgroundTaskManager, IHostedService
{
private readonly Channel<BackgroundTask> _taskChannel = Channel.CreateUnbounded<BackgroundTask>();
private readonly ConcurrentDictionary<string, BackgroundTask> _tasks = new();
private readonly IAppStateStore _stateStore;
private readonly IServiceProvider _services;
private readonly ILogger<BackgroundTaskManager> _logger;
private readonly CancellationTokenSource _shutdownCts = new();
public event EventHandler<TaskStateChangedEventArgs>? TaskStateChanged;
public async Task StartAsync(CancellationToken ct)
{
// 启动任务调度循环
_ = Task.Run(() => DispatchLoopAsync(_shutdownCts.Token), _shutdownCts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 任务调度循环 — 从 channel 读取任务并执行
/// </summary>
private async Task DispatchLoopAsync(CancellationToken ct)
{
await foreach (var task in _taskChannel.Reader.ReadAllAsync(ct))
{
_ = Task.Run(async () =>
{
task.Status = TaskStatus.Running;
task.StartedAt = DateTime.UtcNow;
UpdateState();
TaskStateChanged?.Invoke(this, new(task.TaskId, task.Status));
try
{
await ExecuteTaskAsync(task, ct);
task.Status = TaskStatus.Completed;
}
catch (OperationCanceledException)
{
task.Status = TaskStatus.Stopped;
}
catch (Exception ex)
{
task.Status = TaskStatus.Failed;
task.ErrorMessage = ex.Message;
_logger.LogError(ex, "Task {TaskId} failed", task.TaskId);
}
finally
{
task.CompletedAt = DateTime.UtcNow;
UpdateState();
TaskStateChanged?.Invoke(this, new(task.TaskId, task.Status));
}
}, ct);
}
}
private Task ExecuteTaskAsync(BackgroundTask task, CancellationToken ct) => task switch
{
LocalShellTask shell => ExecuteShellTaskAsync(shell, ct),
LocalAgentTask agent => ExecuteAgentTaskAsync(agent, ct),
RemoteAgentTask remote => ExecuteRemoteAgentTaskAsync(remote, ct),
DreamTask dream => ExecuteDreamTaskAsync(dream, ct),
MonitorMcpTask monitor => ExecuteMonitorTaskAsync(monitor, ct),
LocalWorkflowTask workflow => ExecuteWorkflowTaskAsync(workflow, ct),
InProcessTeammateTask teammate => ExecuteTeammateTaskAsync(teammate, ct),
_ => Task.CompletedTask
};
private async Task ExecuteShellTaskAsync(LocalShellTask task, CancellationToken ct)
{
using var process = new Process { StartInfo = task.ProcessStartInfo! };
process.Start();
var stdout = await process.StandardOutput.ReadToEndAsync(ct);
var stderr = await process.StandardError.ReadToEndAsync(ct);
await process.WaitForExitAsync(ct);
task.Stdout = stdout;
task.Stderr = stderr;
task.ExitCode = process.ExitCode;
}
private async Task ExecuteAgentTaskAsync(LocalAgentTask task, CancellationToken ct)
{
// 使用 QueryEngine 在隔离 context 中执行子代理查询
var engine = _services.GetRequiredService<IQueryEngine>();
await foreach (var msg in engine.SubmitMessageAsync(task.Prompt,
new(Model: task.Model), ct))
{
// 收集消息到 task.Messages
}
}
private async Task ExecuteDreamTaskAsync(DreamTask task, CancellationToken ct)
{
var dreamService = _services.GetRequiredService<IAutoDreamService>();
await dreamService.RunDreamCycleAsync(ct);
}
public Task<LocalShellTask> CreateShellTaskAsync(string command, ProcessStartInfo psi)
{
var task = new LocalShellTask
{
TaskId = Guid.NewGuid().ToString("N")[..8],
Command = command,
ProcessStartInfo = psi,
};
_tasks[task.TaskId] = task;
_taskChannel.Writer.TryWrite(task);
UpdateState();
return Task.FromResult(task);
}
public Task StopTaskAsync(string taskId)
{
if (_tasks.TryGetValue(taskId, out var task) && task.Status == TaskStatus.Running)
{
task.Status = TaskStatus.Stopped;
// 进程级取消由具体执行器处理
}
return Task.CompletedTask;
}
public IReadOnlyList<BackgroundTask> ListTasks() => _tasks.Values.ToList();
public BackgroundTask? GetTask(string taskId) => _tasks.GetValueOrDefault(taskId);
/// <summary>更新 AppState 中的任务状态</summary>
private void UpdateState()
{
_stateStore.Update(state =>
{
var tasks = state.Tasks.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
foreach (var task in _tasks.Values)
tasks[task.TaskId] = task;
return state with { Tasks = tasks };
});
}
public async Task StopAsync(CancellationToken ct)
{
_shutdownCts.Cancel();
_taskChannel.Writer.TryComplete();
foreach (var task in _tasks.Values.Where(t => t.Status == TaskStatus.Running))
await StopTaskAsync(task.TaskId);
}
}
```