核心概念

执行计划 (Execution Plan)

执行计划是 Aster 中用于管理复杂多步骤任务的核心功能。它允许 Agent 将复杂任务分解为多个步骤,支持依赖管理、并行执行、审批流程和错误处理。

执行计划 (Execution Plan)

执行计划是 Aster 中用于管理复杂多步骤任务的核心功能。它允许 Agent 将复杂任务分解为多个步骤,支持依赖管理、并行执行、审批流程和错误处理。

概述

执行计划系统提供:

  • 结构化任务分解:将复杂任务分解为可管理的步骤
  • 依赖管理:步骤之间的依赖关系自动处理
  • 并行执行:无依赖的步骤可以并行执行
  • 审批流程:支持用户审批机制
  • 错误处理:灵活的错误处理策略
  • 状态跟踪:完整的执行状态和进度跟踪
  • 恢复执行:支持从失败点恢复执行

核心概念

执行计划 (ExecutionPlan)

执行计划是一个包含多个步骤的任务执行方案:

type ExecutionPlan struct {
    ID          string    // 计划唯一ID
    Description string    // 计划描述
    Steps       []Step    // 步骤列表
    Status      Status    // 计划状态
    Options     *ExecutionOptions // 执行选项

    // 审批信息
    UserApproved bool
    ApprovedBy   string
    ApprovedAt   *time.Time

    // 执行状态
    CurrentStep int
    StartedAt   *time.Time
    CompletedAt *time.Time
}

步骤 (Step)

每个步骤代表一个具体的操作:

type Step struct {
    ID          string         // 步骤ID
    Index       int            // 步骤序号
    ToolName    string         // 要调用的工具名称
    Description string         // 步骤描述
    Parameters  map[string]any // 工具参数

    // 执行状态
    Status      StepStatus
    Result      any
    Error       string
    StartedAt   *time.Time
    CompletedAt *time.Time
    DurationMs  int64

    // 依赖关系
    DependsOn []string // 依赖的步骤ID列表

    // 重试配置
    RetryCount   int
    MaxRetries   int
    RetryDelayMs int
}

状态管理

计划状态 (Status)

  • draft - 草稿状态
  • pending_approval - 等待审批
  • approved - 已审批
  • executing - 执行中
  • completed - 执行完成
  • failed - 执行失败
  • cancelled - 已取消
  • partial - 部分完成

步骤状态 (StepStatus)

  • pending - 待执行
  • running - 执行中
  • completed - 执行完成
  • failed - 执行失败
  • skipped - 已跳过

基本使用

1. 手动创建执行计划

import (
    "github.com/astercloud/aster/pkg/executionplan"
)

// 创建执行计划
plan := executionplan.NewExecutionPlan("部署应用到生产环境")

// 添加步骤
plan.AddStep("git_pull", "拉取最新代码", map[string]any{
    "repository": "https://github.com/example/app",
    "branch":     "main",
})

step2 := plan.AddStep("run_tests", "运行测试", map[string]any{
    "test_suite": "integration",
})

step3 := plan.AddStep("deploy", "部署应用", map[string]any{
    "environment": "production",
})

// 设置依赖关系
step2.DependsOn = []string{plan.Steps[0].ID}
step3.DependsOn = []string{step2.ID}

// 配置执行选项
plan.Options.RequireApproval = true
plan.Options.StopOnError = true

2. 使用 LLM 生成执行计划

// 创建计划生成器
generator := executionplan.NewGenerator(provider, toolMap)

// 生成计划
plan, err := generator.Generate(ctx, &executionplan.PlanRequest{
    Task:        "部署应用到生产环境",
    Context:     "当前在开发分支,需要合并到主分支并部署",
    Constraints: []string{"必须先运行测试", "需要备份数据库"},
})
if err != nil {
    return err
}

3. 执行计划

// 创建执行器
executor := executionplan.NewExecutor(toolMap)

// 审批计划(如果需要)
if plan.Options.RequireApproval {
    plan.Approve("user@example.com")
}

// 执行计划
toolCtx := &tools.ToolContext{
    AgentID: "agent-123",
}

err := executor.Execute(ctx, plan, toolCtx)
if err != nil {
    log.Printf("执行失败: %v", err)
}

// 查看执行结果
summary := plan.Summary()
fmt.Printf("完成: %d/%d 步骤\n", summary.Completed, summary.TotalSteps)

高级功能

并行执行

无依赖的步骤可以并行执行以提高效率:

plan := executionplan.NewExecutionPlan("并行构建和测试")

// 这些步骤没有依赖关系,可以并行执行
plan.AddStep("build_frontend", "构建前端", nil)
plan.AddStep("build_backend", "构建后端", nil)
plan.AddStep("run_linter", "运行代码检查", nil)

// 启用并行执行
plan.Options.AllowParallel = true
plan.Options.MaxParallelSteps = 3

executor := executionplan.NewExecutor(toolMap)
executor.Execute(ctx, plan, toolCtx)

依赖管理

步骤可以依赖其他步骤的完成:

plan := executionplan.NewExecutionPlan("CI/CD 流程")

// 步骤 1: 安装依赖
step1 := plan.AddStep("install", "安装依赖", nil)

// 步骤 2 和 3: 依赖步骤 1
step2 := plan.AddStep("build", "构建项目", nil)
step2.DependsOn = []string{step1.ID}

step3 := plan.AddStep("lint", "代码检查", nil)
step3.DependsOn = []string{step1.ID}

// 步骤 4: 依赖步骤 2
step4 := plan.AddStep("test", "运行测试", nil)
step4.DependsOn = []string{step2.ID}

// 步骤 5: 依赖步骤 2 和 4
step5 := plan.AddStep("deploy", "部署", nil)
step5.DependsOn = []string{step2.ID, step4.ID}

// 执行器会自动处理依赖关系
// step2 和 step3 可以在 step1 完成后并行执行
plan.Options.AllowParallel = true

错误处理

配置不同的错误处理策略:

// 策略 1: 遇到错误立即停止(默认)
plan.Options.StopOnError = true

// 策略 2: 继续执行其他步骤
plan.Options.StopOnError = false
plan.Options.ContinueOnError = true

// 执行后检查结果
if plan.Status == executionplan.StatusPartial {
    // 部分步骤失败
    for i, step := range plan.Steps {
        if step.Status == executionplan.StepStatusFailed {
            log.Printf("步骤 %d 失败: %s", i, step.Error)
        }
    }
}

重试机制

为步骤配置重试:

step := plan.AddStep("api_call", "调用外部 API", map[string]any{
    "url": "https://api.example.com/data",
})

// 配置重试
step.MaxRetries = 3
step.RetryDelayMs = 1000 // 1秒延迟

超时控制

设置步骤和计划的超时:

// 单步超时:30秒
plan.Options.StepTimeoutMs = 30000

// 总超时:5分钟
plan.Options.TotalTimeoutMs = 300000

审批流程

配置审批要求:

// 需要用户审批
plan.Options.RequireApproval = true
plan.Options.ApprovalTimeoutMs = 60000 // 1分钟审批超时

// 或者自动审批
plan.Options.AutoApprove = true

// 手动审批
if !plan.IsApproved() {
    // 显示计划给用户
    fmt.Println("执行计划:")
    for i, step := range plan.Steps {
        fmt.Printf("%d. %s - %s\n", i+1, step.ToolName, step.Description)
    }

    // 等待用户确认
    if userApproves() {
        plan.Approve("user@example.com")
    } else {
        plan.Reject("用户拒绝执行")
    }
}

恢复执行

从失败点恢复执行:

// 首次执行
err := executor.Execute(ctx, plan, toolCtx)
if err != nil {
    log.Printf("执行失败: %v", err)

    // 保存计划状态
    savePlan(plan)
}

// 稍后恢复执行
plan = loadPlan()
err = executor.Resume(ctx, plan, toolCtx)

执行回调

监听执行事件:

executor := executionplan.NewExecutor(
    toolMap,
    executionplan.WithOnStepStart(func(plan *executionplan.ExecutionPlan, step *executionplan.Step) {
        log.Printf("开始执行: %s", step.Description)
    }),
    executionplan.WithOnStepComplete(func(plan *executionplan.ExecutionPlan, step *executionplan.Step) {
        log.Printf("完成: %s (耗时: %dms)", step.Description, step.DurationMs)
    }),
    executionplan.WithOnStepFailed(func(plan *executionplan.ExecutionPlan, step *executionplan.Step, err error) {
        log.Printf("失败: %s - %v", step.Description, err)
    }),
    executionplan.WithOnPlanComplete(func(plan *executionplan.ExecutionPlan) {
        summary := plan.Summary()
        log.Printf("计划完成: %d 成功, %d 失败", summary.Completed, summary.Failed)
    }),
)

与 Agent 集成

使用 Agent 的执行计划管理器

agent, err := agent.NewAgent(config)
if err != nil {
    return err
}

// 获取执行计划管理器
planMgr := agent.ExecutionPlan()

// 生成计划
plan, err := planMgr.GeneratePlan(ctx, "部署应用", &agent.ExecutionPlanConfig{
    RequireApproval: true,
    AllowParallel:   true,
})
if err != nil {
    return err
}

// 审批
err = planMgr.ApprovePlan("user@example.com")
if err != nil {
    return err
}

// 执行
err = planMgr.ExecutePlan(ctx)
if err != nil {
    return err
}

// 获取当前计划
currentPlan := planMgr.CurrentPlan()
summary := currentPlan.Summary()

最佳实践

1. 合理分解步骤

将复杂任务分解为小而专注的步骤:

// 好的做法:步骤职责单一
plan.AddStep("validate_input", "验证输入参数", params)
plan.AddStep("backup_database", "备份数据库", params)
plan.AddStep("run_migration", "运行数据库迁移", params)
plan.AddStep("deploy_code", "部署代码", params)
plan.AddStep("verify_deployment", "验证部署", params)

// 避免:步骤过于复杂
plan.AddStep("deploy_everything", "部署所有内容", params)

2. 明确依赖关系

清晰地定义步骤之间的依赖:

// 明确依赖
buildStep := plan.AddStep("build", "构建", nil)
testStep := plan.AddStep("test", "测试", nil)
testStep.DependsOn = []string{buildStep.ID}

deployStep := plan.AddStep("deploy", "部署", nil)
deployStep.DependsOn = []string{buildStep.ID, testStep.ID}

3. 使用描述性名称

为步骤提供清晰的描述:

// 好的描述
plan.AddStep("git_pull", "从 main 分支拉取最新代码", params)
plan.AddStep("npm_install", "安装 Node.js 依赖包", params)

// 避免模糊描述
plan.AddStep("step1", "执行操作", params)

4. 合理配置重试

为可能失败的操作配置重试:

// 网络请求应该重试
apiStep := plan.AddStep("fetch_data", "获取远程数据", params)
apiStep.MaxRetries = 3
apiStep.RetryDelayMs = 2000

// 幂等操作可以重试
deployStep := plan.AddStep("deploy", "部署应用", params)
deployStep.MaxRetries = 2

// 非幂等操作不应重试
paymentStep := plan.AddStep("process_payment", "处理支付", params)
paymentStep.MaxRetries = 0 // 不重试

5. 设置合理的超时

根据操作特性设置超时:

// 快速操作:短超时
plan.AddStep("validate", "验证配置", params)
plan.Options.StepTimeoutMs = 5000 // 5秒

// 耗时操作:长超时
plan.AddStep("build", "编译大型项目", params)
plan.Options.StepTimeoutMs = 300000 // 5分钟

6. 利用并行执行

识别可以并行的步骤:

plan := executionplan.NewExecutionPlan("多任务处理")

// 这些步骤可以并行
plan.AddStep("process_images", "处理图片", nil)
plan.AddStep("process_videos", "处理视频", nil)
plan.AddStep("generate_thumbnails", "生成缩略图", nil)

plan.Options.AllowParallel = true
plan.Options.MaxParallelSteps = 3

7. 保存和恢复状态

对于长时间运行的计划,保存状态:

// 定期保存
executor := executionplan.NewExecutor(
    toolMap,
    executionplan.WithOnStepComplete(func(plan *executionplan.ExecutionPlan, step *executionplan.Step) {
        // 每完成一步就保存
        savePlanToDatabase(plan)
    }),
)

// 从保存的状态恢复
plan := loadPlanFromDatabase(planID)
if !plan.IsCompleted() {
    executor.Resume(ctx, plan, toolCtx)
}

示例场景

CI/CD 流程

plan := executionplan.NewExecutionPlan("CI/CD 流程")

// 阶段 1: 准备
checkout := plan.AddStep("git_checkout", "检出代码", map[string]any{
    "branch": "main",
})

// 阶段 2: 构建和检查(并行)
install := plan.AddStep("npm_install", "安装依赖", nil)
install.DependsOn = []string{checkout.ID}

build := plan.AddStep("npm_build", "构建项目", nil)
build.DependsOn = []string{install.ID}

lint := plan.AddStep("npm_lint", "代码检查", nil)
lint.DependsOn = []string{install.ID}

// 阶段 3: 测试
test := plan.AddStep("npm_test", "运行测试", nil)
test.DependsOn = []string{build.ID}

// 阶段 4: 部署
deploy := plan.AddStep("deploy", "部署到生产", nil)
deploy.DependsOn = []string{build.ID, lint.ID, test.ID}

plan.Options.AllowParallel = true
plan.Options.RequireApproval = true
plan.Options.StopOnError = true

数据处理流水线

plan := executionplan.NewExecutionPlan("数据处理流水线")

// 提取
extract := plan.AddStep("extract_data", "从数据源提取数据", map[string]any{
    "source": "database",
})

// 转换(并行)
transform1 := plan.AddStep("transform_users", "转换用户数据", nil)
transform1.DependsOn = []string{extract.ID}

transform2 := plan.AddStep("transform_orders", "转换订单数据", nil)
transform2.DependsOn = []string{extract.ID}

// 加载
load := plan.AddStep("load_data", "加载到目标系统", nil)
load.DependsOn = []string{transform1.ID, transform2.ID}

// 验证
validate := plan.AddStep("validate_data", "验证数据完整性", nil)
validate.DependsOn = []string{load.ID}

plan.Options.AllowParallel = true
plan.Options.ContinueOnError = false

性能考虑

并行度控制

// 限制并行步骤数以避免资源耗尽
plan.Options.MaxParallelSteps = 5

// 对于 CPU 密集型任务
plan.Options.MaxParallelSteps = runtime.NumCPU()

// 对于 I/O 密集型任务
plan.Options.MaxParallelSteps = 20

内存管理

// 对于大型结果,避免在步骤中存储
step := plan.AddStep("process_large_file", "处理大文件", params)
// 不要: step.Result = largeData
// 而是: 将结果写入文件或数据库,只存储引用

故障排查

查看执行历史

for i, step := range plan.Steps {
    fmt.Printf("步骤 %d: %s\n", i+1, step.Description)
    fmt.Printf("  状态: %s\n", step.Status)
    if step.Error != "" {
        fmt.Printf("  错误: %s\n", step.Error)
    }
    if step.DurationMs > 0 {
        fmt.Printf("  耗时: %dms\n", step.DurationMs)
    }
}

调试失败步骤

for _, step := range plan.Steps {
    if step.Status == executionplan.StepStatusFailed {
        log.Printf("失败步骤: %s", step.Description)
        log.Printf("工具: %s", step.ToolName)
        log.Printf("参数: %+v", step.Parameters)
        log.Printf("错误: %s", step.Error)
        log.Printf("重试次数: %d/%d", step.RetryCount, step.MaxRetries)
    }
}

相关资源