v0.12.2+ 新增的异步执行功能,让 SubAgent 可以在后台长时间运行,主 Agent 无需等待。
同步执行的限制:
// 主 Agent 必须等待 SubAgent 完成
result := subagent.Execute(ctx, "分析大型数据集") // 阻塞 10 分钟
// 主 Agent 无法做其他事情
异步执行的优势:
// 主 Agent 立即返回,SubAgent 在后台运行
taskID := subagent.ExecuteAsync(ctx, "分析大型数据集") // 立即返回
// 主 Agent 可以继续处理其他任务
// 稍后查询结果
result := subagent.Query(taskID)
subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
Specs: specs,
Factory: factory,
EnableAsync: true, // 启用异步执行
DefaultTimeout: 30 * time.Minute,
})
// 使用 task 工具,设置 async=true
result, _ := ag.Chat(ctx, `
tool_use:
name: task
parameters:
description: "深度分析用户行为数据,生成详细报告"
subagent_type: data-analyst
async: true
timeout: 1800 # 30分钟超时
`)
// 立即返回 task_id
// {
// "ok": true,
// "task_id": "subagent_1234567890",
// "status": "starting",
// "message": "SubAgent started in background..."
// }
// 使用 query_subagent 工具
result, _ := ag.Chat(ctx, `
tool_use:
name: query_subagent
parameters:
task_id: "subagent_1234567890"
`)
// 返回详细状态
// {
// "ok": true,
// "task_id": "subagent_1234567890",
// "status": "running",
// "duration": 120.5,
// "resource_usage": {
// "memory_mb": 256.5,
// "cpu_percent": 45.2
// }
// }
// 当 status 为 "completed" 时,output 字段包含结果
// {
// "ok": true,
// "task_id": "subagent_1234567890",
// "status": "completed",
// "output": "分析报告:...",
// "duration": 1800.0
// }
// 1. 启动多个异步任务
task1 := startTask("分析数据源A")
task2 := startTask("分析数据源B")
task3 := startTask("分析数据源C")
// 2. 主 Agent 继续其他工作
doOtherWork()
// 3. 轮询检查任务状态
for {
status1 := queryTask(task1)
status2 := queryTask(task2)
status3 := queryTask(task3)
if allCompleted(status1, status2, status3) {
break
}
sleep(10 * time.Second)
}
// 4. 收集结果
result1 := getOutput(task1)
result2 := getOutput(task2)
result3 := getOutput(task3)
// 5. 整合分析
finalReport := synthesize(result1, result2, result3)
功能:查询子代理的状态和输出
参数:
{
task_id: string; // 任务 ID
}
返回:
{
ok: boolean,
task_id: string,
subagent_type: string,
status: string, // "starting" | "running" | "completed" | "failed" | "stopped" | "timeout"
start_time: string,
end_time?: string,
duration: number, // 秒
output?: string, // 输出结果(completed 时)
error?: string, // 错误信息(failed 时)
exit_code?: number,
resource_usage?: {
memory_mb: number,
cpu_percent: number
}
}
使用示例:
tool_use:
name: query_subagent
parameters:
task_id: "subagent_1234567890"
功能:停止正在运行的子代理
参数:
{
task_id: string; // 任务 ID
}
返回:
{
ok: boolean,
task_id: string,
message: string
}
使用场景:
使用示例:
tool_use:
name: stop_subagent
parameters:
task_id: "subagent_1234567890"
功能:恢复已停止或失败的子代理
参数:
{
task_id: string; // 原任务 ID
}
返回:
{
ok: boolean,
old_task_id: string,
new_task_id: string, // 新的任务 ID
subagent_type: string,
status: string,
message: string
}
使用场景:
注意事项:
使用示例:
tool_use:
name: resume_subagent
parameters:
task_id: "subagent_1234567890"
功能:列出所有子代理任务
参数:
{
status_filter?: string // 可选,按状态过滤
}
返回:
{
ok: boolean,
count: number,
subagents: Array<{
task_id: string,
subagent_type: string,
status: string,
start_time: string,
end_time?: string,
duration: number,
error?: string
}>
}
使用场景:
使用示例:
# 列出所有任务
tool_use:
name: list_subagents
# 只列出正在运行的任务
tool_use:
name: list_subagents
parameters:
status_filter: "running"
场景:启动任务后不关心结果
// 启动任务
startTask("清理临时文件")
// 不等待,继续其他工作
场景:定期检查任务状态
taskID := startTask("长时间分析")
// 轮询检查
for {
status := queryTask(taskID)
if status == "completed" {
result := getOutput(taskID)
break
}
sleep(10 * time.Second)
}
场景:批量处理多个任务
// 启动所有任务
taskIDs := []string{}
for _, item := range items {
taskID := startTask("处理 " + item)
taskIDs = append(taskIDs, taskID)
}
// 等待所有任务完成
for {
allDone := true
for _, taskID := range taskIDs {
status := queryTask(taskID)
if status != "completed" {
allDone = false
break
}
}
if allDone {
break
}
sleep(10 * time.Second)
}
// 收集所有结果
results := []string{}
for _, taskID := range taskIDs {
result := getOutput(taskID)
results = append(results, result)
}
场景:任务失败后自动重试
taskID := startTask("不稳定的任务")
maxRetries := 3
for i := 0; i < maxRetries; i++ {
// 等待完成
waitForCompletion(taskID)
status := queryTask(taskID)
if status == "completed" {
result := getOutput(taskID)
break
} else if status == "failed" {
// 重试
taskID = resumeTask(taskID)
}
}
// 全局默认超时
subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
EnableAsync: true,
DefaultTimeout: 30 * time.Minute, // 默认 30 分钟
})
// 单个任务超时
tool_use:
name: task
parameters:
description: "快速任务"
subagent_type: quick-agent
async: true
timeout: 300 # 5 分钟超时
// 启用进程级隔离(更安全,但开销更大)
subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
EnableAsync: true,
EnableProcessIsolation: true, // 每个 SubAgent 运行在独立进程中
})
// 使用自定义的 SubagentManager
customManager := myCustomSubagentManager()
subagentMW, _ := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
Manager: customManager,
EnableAsync: true,
})
| 状态 | 说明 | 可执行操作 |
|---|---|---|
| starting | 正在启动 | query |
| running | 正在运行 | query, stop |
| completed | 已完成(成功) | query, resume |
| failed | 执行失败 | query, resume |
| stopped | 已停止 | query, resume |
| timeout | 超时 | query, resume |
// ✅ 根据任务类型设置合理的超时
- 快速任务: 5-10 分钟
- 中等任务: 30 分钟
- 长时间任务: 1-2 小时
// ❌ 避免
- 超时过短:任务还没完成就被杀死
- 超时过长:占用资源过久
// ✅ 根据任务预期时间设置轮询间隔
- 快速任务(< 1分钟): 每 5 秒查询一次
- 中等任务(1-10分钟): 每 30 秒查询一次
- 长时间任务(> 10分钟): 每 1-2 分钟查询一次
// ❌ 避免
- 轮询过频:浪费资源
- 轮询过慢:响应不及时
// ✅ 检查任务状态并处理错误
status := queryTask(taskID)
if status == "failed" {
error := getError(taskID)
log.Printf("Task failed: %s", error)
// 决定是否重试
if isRetryable(error) {
resumeTask(taskID)
}
}
// ❌ 避免
- 假设任务总是成功
- 不检查错误信息
// ✅ 及时清理已完成的任务
completedTasks := listTasks(status="completed")
for _, taskID := range completedTasks {
cleanupTask(taskID)
}
// ❌ 避免
- 让已完成的任务一直占用资源
- 不清理失败的任务
可能原因:
解决方法:
// 查询详细状态
status := queryTask(taskID)
if status.Error != "" {
log.Printf("Error: %s", status.Error)
}
// 检查资源使用
if status.ResourceUsage != nil {
log.Printf("Memory: %.2f MB, CPU: %.2f%%",
status.ResourceUsage.MemoryMB,
status.ResourceUsage.CPUPercent)
}
可能原因:
解决方法:
// 增加超时时间
tool_use:
name: task
parameters:
timeout: 7200 # 增加到 2 小时
// 或者恢复任务
resumeTask(taskID)
可能原因:
解决方法:
// 检查任务状态
status := queryTask(taskID)
if status.Status == "running" {
// 先停止再恢复
stopTask(taskID)
time.Sleep(1 * time.Second)
resumeTask(taskID)
}
异步执行功能让 SubAgent 系统更加强大:
这使得 Aster 的 SubAgent 系统达到了与 Claude Codex 相当的功能完整度!