11 KiB
11 KiB
基础设施设计 — 后台任务管理
文档元数据
- 项目名称: free-code
- 文档类型: 基础设施设计
- 原始代码来源:
../../src/tasks/(10个文件) - 原始设计意图: 将 Shell、Agent、远程会话、工作流、监控与记忆合并任务统一纳入后台调度和状态同步框架
- 交叉引用: 基础设施设计总览 | 服务子系统设计-会话记忆
设计目标
后台任务管理层统一承载本地 shell、agent、远程 agent、监控与工作流型任务,并保证可调度、可观测与可取消。它必须和全局状态存储、查询引擎、桥接层协同工作,且在宿主关闭时能优雅终止。
13.1 BackgroundTask 任务层次
/// <summary>
/// 后台任务基类 — 所有任务类型的公共抽象
/// 对应原始 types.ts 中的 TaskState 联合类型
/// </summary>
public abstract record BackgroundTask
{
public required string TaskId { get; init; }
public abstract BackgroundTaskType TaskType { get; }
public TaskStatus Status { get; set; } = TaskStatus.Pending;
public DateTime? StartedAt { get; set; }
public DateTime? CompletedAt { get; set; }
public string? ErrorMessage { get; set; }
public bool IsBackgrounded { get; set; } = true;
}
public enum BackgroundTaskType
{
LocalShell,
LocalAgent,
RemoteAgent,
InProcessTeammate,
LocalWorkflow,
MonitorMcp,
Dream
}
public enum TaskStatus { Pending, Running, Completed, Failed, Stopped }
// === 具体任务类型 ===
/// <summary>
/// 本地 Shell 任务 — 后台 bash 命令
/// 对应原始 LocalShellTask
/// </summary>
public sealed record LocalShellTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalShell;
public required string Command { get; init; }
public ProcessStartInfo? ProcessStartInfo { get; init; }
public string? Stdout { get; set; }
public string? Stderr { get; set; }
public int? ExitCode { get; set; }
}
/// <summary>
/// 本地 Agent 任务 — 子代理(forked process 或 worktree 隔离)
/// 对应原始 LocalAgentTask
/// </summary>
public sealed record LocalAgentTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalAgent;
public required string Prompt { get; init; }
public string? Model { get; init; }
public string? AgentType { get; init; } // explore, librarian, oracle, metis, momus
public string? WorkingDirectory { get; init; }
public List<Message> Messages { get; } = new();
}
/// <summary>
/// 远程 Agent 任务 — claude.ai 上的远程会话
/// 对应原始 RemoteAgentTask
/// </summary>
public sealed record RemoteAgentTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.RemoteAgent;
public required string SessionUrl { get; init; }
public string? Plan { get; set; }
public string? Status { get; set; }
}
/// <summary>
/// 进程内 Teammate 任务 — 协作代理
/// 对应原始 InProcessTeammateTask
/// </summary>
public sealed record InProcessTeammateTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.InProcessTeammate;
public required string AgentName { get; init; }
public string? AgentType { get; init; }
public string? Color { get; init; }
public required string WorkingDirectory { get; init; }
}
/// <summary>
/// 本地工作流任务 — 多步骤工作流
/// 对应原始 LocalWorkflowTask
/// </summary>
public sealed record LocalWorkflowTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.LocalWorkflow;
public required string WorkflowName { get; init; }
public required List<WorkflowStep> Steps { get; init; }
public int CurrentStepIndex { get; set; }
}
/// <summary>
/// MCP SSE 监控任务 — 监控 MCP 服务器连接状态
/// 对应原始 MonitorMcpTask
/// </summary>
public sealed record MonitorMcpTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.MonitorMcp;
public required string ServerName { get; init; }
public int ReconnectAttempt { get; set; }
}
/// <summary>
/// Dream 任务 — 后台记忆合并
/// 对应原始 DreamTask
/// </summary>
public sealed record DreamTask : BackgroundTask
{
public override BackgroundTaskType TaskType => BackgroundTaskType.Dream;
public required string TriggerReason { get; init; } // time | session_count
}
13.2 IBackgroundTaskManager 接口
/// <summary>
/// 后台任务管理器 — Channel-based 任务调度
/// 对应原始 task 工具 + 后台任务 UI 指示器
/// </summary>
public interface IBackgroundTaskManager
{
/// <summary>创建 Shell 任务</summary>
Task<LocalShellTask> CreateShellTaskAsync(string command, ProcessStartInfo psi);
/// <summary>创建 Agent 任务</summary>
Task<LocalAgentTask> CreateAgentTaskAsync(string prompt, string? agentType, string? model);
/// <summary>创建远程 Agent 任务</summary>
Task<RemoteAgentTask> CreateRemoteAgentTaskAsync(string sessionUrl);
/// <summary>创建 Dream 任务</summary>
Task<DreamTask> CreateDreamTaskAsync(string triggerReason);
/// <summary>停止任务</summary>
Task StopTaskAsync(string taskId);
/// <summary>获取任务输出</summary>
Task<string?> GetTaskOutputAsync(string taskId);
/// <summary>列出所有任务</summary>
IReadOnlyList<BackgroundTask> ListTasks();
/// <summary>获取指定任务</summary>
BackgroundTask? GetTask(string taskId);
/// <summary>任务状态变更事件</summary>
event EventHandler<TaskStateChangedEventArgs>? TaskStateChanged;
}
13.3 BackgroundTaskManager 实现
public class BackgroundTaskManager : IBackgroundTaskManager, IHostedService
{
private readonly Channel<BackgroundTask> _taskChannel = Channel.CreateUnbounded<BackgroundTask>();
private readonly ConcurrentDictionary<string, BackgroundTask> _tasks = new();
private readonly IAppStateStore _stateStore;
private readonly IServiceProvider _services;
private readonly ILogger<BackgroundTaskManager> _logger;
private readonly CancellationTokenSource _shutdownCts = new();
public event EventHandler<TaskStateChangedEventArgs>? TaskStateChanged;
public async Task StartAsync(CancellationToken ct)
{
// 启动任务调度循环
_ = Task.Run(() => DispatchLoopAsync(_shutdownCts.Token), _shutdownCts.Token);
await Task.CompletedTask;
}
/// <summary>
/// 任务调度循环 — 从 channel 读取任务并执行
/// </summary>
private async Task DispatchLoopAsync(CancellationToken ct)
{
await foreach (var task in _taskChannel.Reader.ReadAllAsync(ct))
{
_ = Task.Run(async () =>
{
task.Status = TaskStatus.Running;
task.StartedAt = DateTime.UtcNow;
UpdateState();
TaskStateChanged?.Invoke(this, new(task.TaskId, task.Status));
try
{
await ExecuteTaskAsync(task, ct);
task.Status = TaskStatus.Completed;
}
catch (OperationCanceledException)
{
task.Status = TaskStatus.Stopped;
}
catch (Exception ex)
{
task.Status = TaskStatus.Failed;
task.ErrorMessage = ex.Message;
_logger.LogError(ex, "Task {TaskId} failed", task.TaskId);
}
finally
{
task.CompletedAt = DateTime.UtcNow;
UpdateState();
TaskStateChanged?.Invoke(this, new(task.TaskId, task.Status));
}
}, ct);
}
}
private Task ExecuteTaskAsync(BackgroundTask task, CancellationToken ct) => task switch
{
LocalShellTask shell => ExecuteShellTaskAsync(shell, ct),
LocalAgentTask agent => ExecuteAgentTaskAsync(agent, ct),
RemoteAgentTask remote => ExecuteRemoteAgentTaskAsync(remote, ct),
DreamTask dream => ExecuteDreamTaskAsync(dream, ct),
MonitorMcpTask monitor => ExecuteMonitorTaskAsync(monitor, ct),
LocalWorkflowTask workflow => ExecuteWorkflowTaskAsync(workflow, ct),
InProcessTeammateTask teammate => ExecuteTeammateTaskAsync(teammate, ct),
_ => Task.CompletedTask
};
private async Task ExecuteShellTaskAsync(LocalShellTask task, CancellationToken ct)
{
using var process = new Process { StartInfo = task.ProcessStartInfo! };
process.Start();
var stdout = await process.StandardOutput.ReadToEndAsync(ct);
var stderr = await process.StandardError.ReadToEndAsync(ct);
await process.WaitForExitAsync(ct);
task.Stdout = stdout;
task.Stderr = stderr;
task.ExitCode = process.ExitCode;
}
private async Task ExecuteAgentTaskAsync(LocalAgentTask task, CancellationToken ct)
{
// 使用 QueryEngine 在隔离 context 中执行子代理查询
var engine = _services.GetRequiredService<IQueryEngine>();
await foreach (var msg in engine.SubmitMessageAsync(task.Prompt,
new(Model: task.Model), ct))
{
// 收集消息到 task.Messages
}
}
private async Task ExecuteDreamTaskAsync(DreamTask task, CancellationToken ct)
{
var dreamService = _services.GetRequiredService<IAutoDreamService>();
await dreamService.RunDreamCycleAsync(ct);
}
public Task<LocalShellTask> CreateShellTaskAsync(string command, ProcessStartInfo psi)
{
var task = new LocalShellTask
{
TaskId = Guid.NewGuid().ToString("N")[..8],
Command = command,
ProcessStartInfo = psi,
};
_tasks[task.TaskId] = task;
_taskChannel.Writer.TryWrite(task);
UpdateState();
return Task.FromResult(task);
}
public Task StopTaskAsync(string taskId)
{
if (_tasks.TryGetValue(taskId, out var task) && task.Status == TaskStatus.Running)
{
task.Status = TaskStatus.Stopped;
// 进程级取消由具体执行器处理
}
return Task.CompletedTask;
}
public IReadOnlyList<BackgroundTask> ListTasks() => _tasks.Values.ToList();
public BackgroundTask? GetTask(string taskId) => _tasks.GetValueOrDefault(taskId);
/// <summary>更新 AppState 中的任务状态</summary>
private void UpdateState()
{
_stateStore.Update(state =>
{
var tasks = state.Tasks.ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
foreach (var task in _tasks.Values)
tasks[task.TaskId] = task;
return state with { Tasks = tasks };
});
}
public async Task StopAsync(CancellationToken ct)
{
_shutdownCts.Cancel();
_taskChannel.Writer.TryComplete();
foreach (var task in _tasks.Values.Where(t => t.Status == TaskStatus.Running))
await StopTaskAsync(task.TaskId);
}
}