核心概念

SubAgent 异步执行

异步执行、状态查询、Resume 机制详解

SubAgent 异步执行

v0.12.2+ 新增的异步执行功能,让 SubAgent 可以在后台长时间运行,主 Agent 无需等待。

🎯 为什么需要异步执行?

问题场景

同步执行的限制

// 主 Agent 必须等待 SubAgent 完成
result := subagent.Execute(ctx, "分析大型数据集")  // 阻塞 10 分钟
// 主 Agent 无法做其他事情

异步执行的优势

// 主 Agent 立即返回,SubAgent 在后台运行
taskID := subagent.ExecuteAsync(ctx, "分析大型数据集")  // 立即返回
// 主 Agent 可以继续处理其他任务
// 稍后查询结果
result := subagent.Query(taskID)

🚀 快速开始

1. 启用异步执行

subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
    Specs:       specs,
    Factory:     factory,
    EnableAsync: true,  // 启用异步执行
    DefaultTimeout: 30 * time.Minute,
})

2. 异步启动任务

// 使用 task 工具,设置 async=true
result, _ := ag.Chat(ctx, `
tool_use:
  name: task
  parameters:
    description: "深度分析用户行为数据,生成详细报告"
    subagent_type: data-analyst
    async: true
    timeout: 1800  # 30分钟超时
`)

// 立即返回 task_id
// {
//   "ok": true,
//   "task_id": "subagent_1234567890",
//   "status": "starting",
//   "message": "SubAgent started in background..."
// }

3. 查询任务状态

// 使用 query_subagent 工具
result, _ := ag.Chat(ctx, `
tool_use:
  name: query_subagent
  parameters:
    task_id: "subagent_1234567890"
`)

// 返回详细状态
// {
//   "ok": true,
//   "task_id": "subagent_1234567890",
//   "status": "running",
//   "duration": 120.5,
//   "resource_usage": {
//     "memory_mb": 256.5,
//     "cpu_percent": 45.2
//   }
// }

4. 获取结果

// 当 status 为 "completed" 时,output 字段包含结果
// {
//   "ok": true,
//   "task_id": "subagent_1234567890",
//   "status": "completed",
//   "output": "分析报告:...",
//   "duration": 1800.0
// }

📊 完整工作流

场景:并行分析多个数据源

// 1. 启动多个异步任务
task1 := startTask("分析数据源A")
task2 := startTask("分析数据源B")
task3 := startTask("分析数据源C")

// 2. 主 Agent 继续其他工作
doOtherWork()

// 3. 轮询检查任务状态
for {
    status1 := queryTask(task1)
    status2 := queryTask(task2)
    status3 := queryTask(task3)

    if allCompleted(status1, status2, status3) {
        break
    }

    sleep(10 * time.Second)
}

// 4. 收集结果
result1 := getOutput(task1)
result2 := getOutput(task2)
result3 := getOutput(task3)

// 5. 整合分析
finalReport := synthesize(result1, result2, result3)

🔧 管理工具详解

1. query_subagent

功能:查询子代理的状态和输出

参数

{
  task_id: string; // 任务 ID
}

返回

{
    ok: boolean,
    task_id: string,
    subagent_type: string,
    status: string,  // "starting" | "running" | "completed" | "failed" | "stopped" | "timeout"
    start_time: string,
    end_time?: string,
    duration: number,  // 秒
    output?: string,   // 输出结果(completed 时)
    error?: string,    // 错误信息(failed 时)
    exit_code?: number,
    resource_usage?: {
        memory_mb: number,
        cpu_percent: number
    }
}

使用示例

tool_use:
  name: query_subagent
  parameters:
    task_id: "subagent_1234567890"

2. stop_subagent

功能:停止正在运行的子代理

参数

{
  task_id: string; // 任务 ID
}

返回

{
    ok: boolean,
    task_id: string,
    message: string
}

使用场景

  • 任务运行时间过长,需要手动停止
  • 发现任务执行方向错误,需要中止
  • 资源紧张,需要释放资源

使用示例

tool_use:
  name: stop_subagent
  parameters:
    task_id: "subagent_1234567890"

3. resume_subagent

功能:恢复已停止或失败的子代理

参数

{
  task_id: string; // 原任务 ID
}

返回

{
    ok: boolean,
    old_task_id: string,
    new_task_id: string,  // 新的任务 ID
    subagent_type: string,
    status: string,
    message: string
}

使用场景

  • 子代理因超时或错误而停止,需要重新执行
  • 手动停止的子代理,需要继续执行
  • 系统重启后,恢复未完成的任务

注意事项

  • 恢复后会生成新的 task_id
  • 原有的元数据会被保留
  • 只能恢复状态为 "stopped"、"failed" 或 "completed" 的子代理

使用示例

tool_use:
  name: resume_subagent
  parameters:
    task_id: "subagent_1234567890"

4. list_subagents

功能:列出所有子代理任务

参数

{
    status_filter?: string  // 可选,按状态过滤
}

返回

{
    ok: boolean,
    count: number,
    subagents: Array<{
        task_id: string,
        subagent_type: string,
        status: string,
        start_time: string,
        end_time?: string,
        duration: number,
        error?: string
    }>
}

使用场景

  • 查看当前有哪些子代理正在运行
  • 检查历史任务的执行情况
  • 清理已完成的任务

使用示例

# 列出所有任务
tool_use:
  name: list_subagents

# 只列出正在运行的任务
tool_use:
  name: list_subagents
  parameters:
    status_filter: "running"

🎯 使用模式

模式 1:Fire and Forget

场景:启动任务后不关心结果

// 启动任务
startTask("清理临时文件")

// 不等待,继续其他工作

模式 2:Polling

场景:定期检查任务状态

taskID := startTask("长时间分析")

// 轮询检查
for {
    status := queryTask(taskID)
    if status == "completed" {
        result := getOutput(taskID)
        break
    }
    sleep(10 * time.Second)
}

模式 3:Batch Processing

场景:批量处理多个任务

// 启动所有任务
taskIDs := []string{}
for _, item := range items {
    taskID := startTask("处理 " + item)
    taskIDs = append(taskIDs, taskID)
}

// 等待所有任务完成
for {
    allDone := true
    for _, taskID := range taskIDs {
        status := queryTask(taskID)
        if status != "completed" {
            allDone = false
            break
        }
    }
    if allDone {
        break
    }
    sleep(10 * time.Second)
}

// 收集所有结果
results := []string{}
for _, taskID := range taskIDs {
    result := getOutput(taskID)
    results = append(results, result)
}

模式 4:Retry on Failure

场景:任务失败后自动重试

taskID := startTask("不稳定的任务")

maxRetries := 3
for i := 0; i < maxRetries; i++ {
    // 等待完成
    waitForCompletion(taskID)

    status := queryTask(taskID)
    if status == "completed" {
        result := getOutput(taskID)
        break
    } else if status == "failed" {
        // 重试
        taskID = resumeTask(taskID)
    }
}

⚙️ 高级配置

1. 超时控制

// 全局默认超时
subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
    EnableAsync:    true,
    DefaultTimeout: 30 * time.Minute,  // 默认 30 分钟
})

// 单个任务超时
tool_use:
  name: task
  parameters:
    description: "快速任务"
    subagent_type: quick-agent
    async: true
    timeout: 300  # 5 分钟超时

2. 进程级隔离

// 启用进程级隔离(更安全,但开销更大)
subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
    EnableAsync:            true,
    EnableProcessIsolation: true,  // 每个 SubAgent 运行在独立进程中
})

3. 自定义管理器

// 使用自定义的 SubagentManager
customManager := myCustomSubagentManager()

subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
    Manager:     customManager,
    EnableAsync: true,
})

📊 状态说明

状态说明可执行操作
starting正在启动query
running正在运行query, stop
completed已完成(成功)query, resume
failed执行失败query, resume
stopped已停止query, resume
timeout超时query, resume

🎓 最佳实践

1. 合理设置超时

// ✅ 根据任务类型设置合理的超时
- 快速任务: 5-10 分钟
- 中等任务: 30 分钟
- 长时间任务: 1-2 小时

// ❌ 避免
- 超时过短:任务还没完成就被杀死
- 超时过长:占用资源过久

2. 轮询间隔

// ✅ 根据任务预期时间设置轮询间隔
- 快速任务(< 1分钟): 每 5 秒查询一次
- 中等任务(1-10分钟): 每 30 秒查询一次
- 长时间任务(> 10分钟): 每 1-2 分钟查询一次

// ❌ 避免
- 轮询过频:浪费资源
- 轮询过慢:响应不及时

3. 错误处理

// ✅ 检查任务状态并处理错误
status := queryTask(taskID)
if status == "failed" {
    error := getError(taskID)
    log.Printf("Task failed: %s", error)

    // 决定是否重试
    if isRetryable(error) {
        resumeTask(taskID)
    }
}

// ❌ 避免
- 假设任务总是成功
- 不检查错误信息

4. 资源清理

// ✅ 及时清理已完成的任务
completedTasks := listTasks(status="completed")
for _, taskID := range completedTasks {
    cleanupTask(taskID)
}

// ❌ 避免
- 让已完成的任务一直占用资源
- 不清理失败的任务

🔍 故障排查

问题 1:任务一直处于 "starting" 状态

可能原因

  • SubAgent 启动失败
  • 资源不足
  • 配置错误

解决方法

// 查询详细状态
status := queryTask(taskID)
if status.Error != "" {
    log.Printf("Error: %s", status.Error)
}

// 检查资源使用
if status.ResourceUsage != nil {
    log.Printf("Memory: %.2f MB, CPU: %.2f%%",
        status.ResourceUsage.MemoryMB,
        status.ResourceUsage.CPUPercent)
}

问题 2:任务超时

可能原因

  • 任务复杂度超出预期
  • 超时设置过短
  • 资源不足导致执行缓慢

解决方法

// 增加超时时间
tool_use:
  name: task
  parameters:
    timeout: 7200  # 增加到 2 小时

// 或者恢复任务
resumeTask(taskID)

问题 3:无法恢复任务

可能原因

  • 任务状态不允许恢复
  • 任务已被清理

解决方法

// 检查任务状态
status := queryTask(taskID)
if status.Status == "running" {
    // 先停止再恢复
    stopTask(taskID)
    time.Sleep(1 * time.Second)
    resumeTask(taskID)
}

📚 相关资源

🎉 总结

异步执行功能让 SubAgent 系统更加强大:

  • 非阻塞:主 Agent 无需等待
  • 长时间运行:支持数小时的任务
  • 状态查询:随时了解任务进度
  • Resume 机制:失败后可恢复
  • 资源监控:实时监控资源使用
  • 灵活控制:停止、恢复、清理

这使得 Aster 的 SubAgent 系统达到了与 Claude Codex 相当的功能完整度!