执行计划是 Aster 中用于管理复杂多步骤任务的核心功能。它允许 Agent 将复杂任务分解为多个步骤,支持依赖管理、并行执行、审批流程和错误处理。
执行计划系统提供:
执行计划是一个包含多个步骤的任务执行方案:
type ExecutionPlan struct {
ID string // 计划唯一ID
Description string // 计划描述
Steps []Step // 步骤列表
Status Status // 计划状态
Options *ExecutionOptions // 执行选项
// 审批信息
UserApproved bool
ApprovedBy string
ApprovedAt *time.Time
// 执行状态
CurrentStep int
StartedAt *time.Time
CompletedAt *time.Time
}
每个步骤代表一个具体的操作:
type Step struct {
ID string // 步骤ID
Index int // 步骤序号
ToolName string // 要调用的工具名称
Description string // 步骤描述
Parameters map[string]any // 工具参数
// 执行状态
Status StepStatus
Result any
Error string
StartedAt *time.Time
CompletedAt *time.Time
DurationMs int64
// 依赖关系
DependsOn []string // 依赖的步骤ID列表
// 重试配置
RetryCount int
MaxRetries int
RetryDelayMs int
}
计划状态 (Status):
draft - 草稿状态pending_approval - 等待审批approved - 已审批executing - 执行中completed - 执行完成failed - 执行失败cancelled - 已取消partial - 部分完成步骤状态 (StepStatus):
pending - 待执行running - 执行中completed - 执行完成failed - 执行失败skipped - 已跳过import (
"github.com/astercloud/aster/pkg/executionplan"
)
// 创建执行计划
plan := executionplan.NewExecutionPlan("部署应用到生产环境")
// 添加步骤
plan.AddStep("git_pull", "拉取最新代码", map[string]any{
"repository": "https://github.com/example/app",
"branch": "main",
})
step2 := plan.AddStep("run_tests", "运行测试", map[string]any{
"test_suite": "integration",
})
step3 := plan.AddStep("deploy", "部署应用", map[string]any{
"environment": "production",
})
// 设置依赖关系
step2.DependsOn = []string{plan.Steps[0].ID}
step3.DependsOn = []string{step2.ID}
// 配置执行选项
plan.Options.RequireApproval = true
plan.Options.StopOnError = true
// 创建计划生成器
generator := executionplan.NewGenerator(provider, toolMap)
// 生成计划
plan, err := generator.Generate(ctx, &executionplan.PlanRequest{
Task: "部署应用到生产环境",
Context: "当前在开发分支,需要合并到主分支并部署",
Constraints: []string{"必须先运行测试", "需要备份数据库"},
})
if err != nil {
return err
}
// 创建执行器
executor := executionplan.NewExecutor(toolMap)
// 审批计划(如果需要)
if plan.Options.RequireApproval {
plan.Approve("user@example.com")
}
// 执行计划
toolCtx := &tools.ToolContext{
AgentID: "agent-123",
}
err := executor.Execute(ctx, plan, toolCtx)
if err != nil {
log.Printf("执行失败: %v", err)
}
// 查看执行结果
summary := plan.Summary()
fmt.Printf("完成: %d/%d 步骤\n", summary.Completed, summary.TotalSteps)
无依赖的步骤可以并行执行以提高效率:
plan := executionplan.NewExecutionPlan("并行构建和测试")
// 这些步骤没有依赖关系,可以并行执行
plan.AddStep("build_frontend", "构建前端", nil)
plan.AddStep("build_backend", "构建后端", nil)
plan.AddStep("run_linter", "运行代码检查", nil)
// 启用并行执行
plan.Options.AllowParallel = true
plan.Options.MaxParallelSteps = 3
executor := executionplan.NewExecutor(toolMap)
executor.Execute(ctx, plan, toolCtx)
步骤可以依赖其他步骤的完成:
plan := executionplan.NewExecutionPlan("CI/CD 流程")
// 步骤 1: 安装依赖
step1 := plan.AddStep("install", "安装依赖", nil)
// 步骤 2 和 3: 依赖步骤 1
step2 := plan.AddStep("build", "构建项目", nil)
step2.DependsOn = []string{step1.ID}
step3 := plan.AddStep("lint", "代码检查", nil)
step3.DependsOn = []string{step1.ID}
// 步骤 4: 依赖步骤 2
step4 := plan.AddStep("test", "运行测试", nil)
step4.DependsOn = []string{step2.ID}
// 步骤 5: 依赖步骤 2 和 4
step5 := plan.AddStep("deploy", "部署", nil)
step5.DependsOn = []string{step2.ID, step4.ID}
// 执行器会自动处理依赖关系
// step2 和 step3 可以在 step1 完成后并行执行
plan.Options.AllowParallel = true
配置不同的错误处理策略:
// 策略 1: 遇到错误立即停止(默认)
plan.Options.StopOnError = true
// 策略 2: 继续执行其他步骤
plan.Options.StopOnError = false
plan.Options.ContinueOnError = true
// 执行后检查结果
if plan.Status == executionplan.StatusPartial {
// 部分步骤失败
for i, step := range plan.Steps {
if step.Status == executionplan.StepStatusFailed {
log.Printf("步骤 %d 失败: %s", i, step.Error)
}
}
}
为步骤配置重试:
step := plan.AddStep("api_call", "调用外部 API", map[string]any{
"url": "https://api.example.com/data",
})
// 配置重试
step.MaxRetries = 3
step.RetryDelayMs = 1000 // 1秒延迟
设置步骤和计划的超时:
// 单步超时:30秒
plan.Options.StepTimeoutMs = 30000
// 总超时:5分钟
plan.Options.TotalTimeoutMs = 300000
配置审批要求:
// 需要用户审批
plan.Options.RequireApproval = true
plan.Options.ApprovalTimeoutMs = 60000 // 1分钟审批超时
// 或者自动审批
plan.Options.AutoApprove = true
// 手动审批
if !plan.IsApproved() {
// 显示计划给用户
fmt.Println("执行计划:")
for i, step := range plan.Steps {
fmt.Printf("%d. %s - %s\n", i+1, step.ToolName, step.Description)
}
// 等待用户确认
if userApproves() {
plan.Approve("user@example.com")
} else {
plan.Reject("用户拒绝执行")
}
}
从失败点恢复执行:
// 首次执行
err := executor.Execute(ctx, plan, toolCtx)
if err != nil {
log.Printf("执行失败: %v", err)
// 保存计划状态
savePlan(plan)
}
// 稍后恢复执行
plan = loadPlan()
err = executor.Resume(ctx, plan, toolCtx)
监听执行事件:
executor := executionplan.NewExecutor(
toolMap,
executionplan.WithOnStepStart(func(plan *executionplan.ExecutionPlan, step *executionplan.Step) {
log.Printf("开始执行: %s", step.Description)
}),
executionplan.WithOnStepComplete(func(plan *executionplan.ExecutionPlan, step *executionplan.Step) {
log.Printf("完成: %s (耗时: %dms)", step.Description, step.DurationMs)
}),
executionplan.WithOnStepFailed(func(plan *executionplan.ExecutionPlan, step *executionplan.Step, err error) {
log.Printf("失败: %s - %v", step.Description, err)
}),
executionplan.WithOnPlanComplete(func(plan *executionplan.ExecutionPlan) {
summary := plan.Summary()
log.Printf("计划完成: %d 成功, %d 失败", summary.Completed, summary.Failed)
}),
)
agent, err := agent.NewAgent(config)
if err != nil {
return err
}
// 获取执行计划管理器
planMgr := agent.ExecutionPlan()
// 生成计划
plan, err := planMgr.GeneratePlan(ctx, "部署应用", &agent.ExecutionPlanConfig{
RequireApproval: true,
AllowParallel: true,
})
if err != nil {
return err
}
// 审批
err = planMgr.ApprovePlan("user@example.com")
if err != nil {
return err
}
// 执行
err = planMgr.ExecutePlan(ctx)
if err != nil {
return err
}
// 获取当前计划
currentPlan := planMgr.CurrentPlan()
summary := currentPlan.Summary()
将复杂任务分解为小而专注的步骤:
// 好的做法:步骤职责单一
plan.AddStep("validate_input", "验证输入参数", params)
plan.AddStep("backup_database", "备份数据库", params)
plan.AddStep("run_migration", "运行数据库迁移", params)
plan.AddStep("deploy_code", "部署代码", params)
plan.AddStep("verify_deployment", "验证部署", params)
// 避免:步骤过于复杂
plan.AddStep("deploy_everything", "部署所有内容", params)
清晰地定义步骤之间的依赖:
// 明确依赖
buildStep := plan.AddStep("build", "构建", nil)
testStep := plan.AddStep("test", "测试", nil)
testStep.DependsOn = []string{buildStep.ID}
deployStep := plan.AddStep("deploy", "部署", nil)
deployStep.DependsOn = []string{buildStep.ID, testStep.ID}
为步骤提供清晰的描述:
// 好的描述
plan.AddStep("git_pull", "从 main 分支拉取最新代码", params)
plan.AddStep("npm_install", "安装 Node.js 依赖包", params)
// 避免模糊描述
plan.AddStep("step1", "执行操作", params)
为可能失败的操作配置重试:
// 网络请求应该重试
apiStep := plan.AddStep("fetch_data", "获取远程数据", params)
apiStep.MaxRetries = 3
apiStep.RetryDelayMs = 2000
// 幂等操作可以重试
deployStep := plan.AddStep("deploy", "部署应用", params)
deployStep.MaxRetries = 2
// 非幂等操作不应重试
paymentStep := plan.AddStep("process_payment", "处理支付", params)
paymentStep.MaxRetries = 0 // 不重试
根据操作特性设置超时:
// 快速操作:短超时
plan.AddStep("validate", "验证配置", params)
plan.Options.StepTimeoutMs = 5000 // 5秒
// 耗时操作:长超时
plan.AddStep("build", "编译大型项目", params)
plan.Options.StepTimeoutMs = 300000 // 5分钟
识别可以并行的步骤:
plan := executionplan.NewExecutionPlan("多任务处理")
// 这些步骤可以并行
plan.AddStep("process_images", "处理图片", nil)
plan.AddStep("process_videos", "处理视频", nil)
plan.AddStep("generate_thumbnails", "生成缩略图", nil)
plan.Options.AllowParallel = true
plan.Options.MaxParallelSteps = 3
对于长时间运行的计划,保存状态:
// 定期保存
executor := executionplan.NewExecutor(
toolMap,
executionplan.WithOnStepComplete(func(plan *executionplan.ExecutionPlan, step *executionplan.Step) {
// 每完成一步就保存
savePlanToDatabase(plan)
}),
)
// 从保存的状态恢复
plan := loadPlanFromDatabase(planID)
if !plan.IsCompleted() {
executor.Resume(ctx, plan, toolCtx)
}
plan := executionplan.NewExecutionPlan("CI/CD 流程")
// 阶段 1: 准备
checkout := plan.AddStep("git_checkout", "检出代码", map[string]any{
"branch": "main",
})
// 阶段 2: 构建和检查(并行)
install := plan.AddStep("npm_install", "安装依赖", nil)
install.DependsOn = []string{checkout.ID}
build := plan.AddStep("npm_build", "构建项目", nil)
build.DependsOn = []string{install.ID}
lint := plan.AddStep("npm_lint", "代码检查", nil)
lint.DependsOn = []string{install.ID}
// 阶段 3: 测试
test := plan.AddStep("npm_test", "运行测试", nil)
test.DependsOn = []string{build.ID}
// 阶段 4: 部署
deploy := plan.AddStep("deploy", "部署到生产", nil)
deploy.DependsOn = []string{build.ID, lint.ID, test.ID}
plan.Options.AllowParallel = true
plan.Options.RequireApproval = true
plan.Options.StopOnError = true
plan := executionplan.NewExecutionPlan("数据处理流水线")
// 提取
extract := plan.AddStep("extract_data", "从数据源提取数据", map[string]any{
"source": "database",
})
// 转换(并行)
transform1 := plan.AddStep("transform_users", "转换用户数据", nil)
transform1.DependsOn = []string{extract.ID}
transform2 := plan.AddStep("transform_orders", "转换订单数据", nil)
transform2.DependsOn = []string{extract.ID}
// 加载
load := plan.AddStep("load_data", "加载到目标系统", nil)
load.DependsOn = []string{transform1.ID, transform2.ID}
// 验证
validate := plan.AddStep("validate_data", "验证数据完整性", nil)
validate.DependsOn = []string{load.ID}
plan.Options.AllowParallel = true
plan.Options.ContinueOnError = false
// 限制并行步骤数以避免资源耗尽
plan.Options.MaxParallelSteps = 5
// 对于 CPU 密集型任务
plan.Options.MaxParallelSteps = runtime.NumCPU()
// 对于 I/O 密集型任务
plan.Options.MaxParallelSteps = 20
// 对于大型结果,避免在步骤中存储
step := plan.AddStep("process_large_file", "处理大文件", params)
// 不要: step.Result = largeData
// 而是: 将结果写入文件或数据库,只存储引用
for i, step := range plan.Steps {
fmt.Printf("步骤 %d: %s\n", i+1, step.Description)
fmt.Printf(" 状态: %s\n", step.Status)
if step.Error != "" {
fmt.Printf(" 错误: %s\n", step.Error)
}
if step.DurationMs > 0 {
fmt.Printf(" 耗时: %dms\n", step.DurationMs)
}
}
for _, step := range plan.Steps {
if step.Status == executionplan.StepStatusFailed {
log.Printf("失败步骤: %s", step.Description)
log.Printf("工具: %s", step.ToolName)
log.Printf("参数: %+v", step.Parameters)
log.Printf("错误: %s", step.Error)
log.Printf("重试次数: %d/%d", step.RetryCount, step.MaxRetries)
}
}