Advanced

WorkflowAgent 编排

使用 WorkflowAgent 实现 Agentic Workflow

WorkflowAgent - Agentic Workflow

WorkflowAgent 是一个专门用于 Workflow 编排的受限 Agent,让 AI Agent 决定何时运行 Workflow。

核心概念

Agentic Workflow 的核心思想:

  1. Agent 先检查历史记录
  2. 如果能从历史中回答,直接回答
  3. 如果需要新数据,调用 Workflow 工具
  4. Agent 决策何时运行 Workflow

创建 WorkflowAgent

// 创建 WorkflowAgent
agent := workflow.NewWorkflowAgent(
    "gpt-4",              // 模型
    "",                   // 指令(空则使用默认)
    true,                 // 启用历史记录
    5,                    // 保留最近5次运行
)

// 附加 Workflow
agent.AttachWorkflow(myWorkflow)

基础使用

同步执行

result, err := agent.Run(ctx, "分析最近的销售数据")
if err != nil {
    log.Fatal(err)
}
fmt.Println(result)

流式执行

eventCh := agent.RunStream(ctx, "生成月度报告")
for event := range eventCh {
    switch event.Type {
    case workflow.AgentEventStart:
        fmt.Println("Agent 开始")

    case workflow.AgentEventWorkflowEvent:
        // Workflow 事件
        wfEvent := event.Data["workflow_event"]

    case workflow.AgentEventResponse:
        // Agent 响应
        response := event.Data["response"]
        fmt.Printf("回答: %v\n", response)

    case workflow.AgentEventComplete:
        fmt.Println("完成")

    case workflow.AgentEventError:
        log.Printf("错误: %v", event.Error)
    }
}

Workflow 集成

方式 1: 直接从 Workflow 调用

wf := workflow.New("DataAnalysis")
// ... 添加步骤 ...

agent := workflow.NewWorkflowAgent("gpt-4", "", true, 5)

// Agentic 执行
result, err := wf.AgenticExecute(ctx, agent, "用户查询")

方式 2: 流式 Agentic 执行

eventCh := wf.AgenticExecuteStream(ctx, agent, "用户查询")
for event := range eventCh {
    // 处理事件
}

配置选项

agent := workflow.NewWorkflowAgent("gpt-4", "", true, 5).
    WithInstructions(customInstructions).  // 自定义指令
    WithHistorySize(10).                   // 历史记录数量
    WithModel("gpt-4-turbo").             // 更改模型
    EnableHistory(true)                    // 启用/禁用历史

自定义指令

customInstructions := `你是一个数据分析助手。你的工作是:
1. 检查历史记录,如果用户的问题已经在历史中有答案,直接回答
2. 如果需要新的分析,调用 run_workflow 工具
3. 不要重复调用工具

历史记录会在 {workflow_context} 中提供。
`

agent := workflow.NewWorkflowAgent("gpt-4", customInstructions, true, 5)

访问历史

// 获取历史记录
history := agent.GetWorkflowHistory()

for _, item := range history {
    fmt.Printf("运行 %s:\n", item.RunID)
    fmt.Printf("  输入: %v\n", item.Input)
    fmt.Printf("  输出: %v\n", item.Output)
    fmt.Printf("  状态: %s\n", item.Status)
    fmt.Printf("  耗时: %.2fs\n", item.Duration)
}

创建 Workflow 工具

WorkflowAgent 可以为 Workflow 创建工具函数:

session := &workflow.WorkflowSession{
    ID: "session-123",
    // ... 其他字段 ...
}

executionInput := &workflow.WorkflowInput{
    Input: "initial input",
}

// 创建工具
tool := agent.CreateWorkflowTool(session, executionInput, true)  // true = 流式

// 使用工具
result, err := tool(ctx, "执行查询")

完整示例

package main

import (
    "context"
    "fmt"
    "github.com/astercloud/aster/pkg/workflow"
)

func main() {
    ctx := context.Background()

    // 创建数据分析 Workflow
    wf := workflow.New("DataAnalysis")

    wf.AddStep(workflow.NewFunctionStep("analyze",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            // 执行数据分析
            query := input.Input.(string)
            result := performAnalysis(query)

            return &workflow.StepOutput{
                Content: map[string]interface{}{
                    "query":  query,
                    "result": result,
                    "charts": []string{"chart1.png", "chart2.png"},
                },
                Metadata: make(map[string]interface{}),
            }, nil
        },
    ))

    // 创建 WorkflowAgent
    agent := workflow.NewWorkflowAgent("gpt-4", "", true, 5).
        WithInstructions(`你是数据分析助手。
- 如果用户询问之前分析过的数据,直接从历史中回答
- 如果是新的查询,调用 run_workflow 工具
- 每次只调用一次工具`)

    // 附加 Workflow
    agent.AttachWorkflow(wf)

    // 第一次查询 - 会运行 Workflow
    fmt.Println("=== 第一次查询 ===")
    result1, _ := agent.Run(ctx, "分析Q1销售数据")
    fmt.Println(result1)

    // 第二次查询 - Agent 可能从历史中回答
    fmt.Println("\n=== 第二次查询 ===")
    result2, _ := agent.Run(ctx, "Q1的销售额是多少?")
    fmt.Println(result2)

    // 新查询 - 会运行新的 Workflow
    fmt.Println("\n=== 新查询 ===")
    result3, _ := agent.Run(ctx, "分析Q2销售数据")
    fmt.Println(result3)

    // 查看历史
    fmt.Println("\n=== 历史记录 ===")
    for i, item := range agent.GetWorkflowHistory() {
        fmt.Printf("%d. 输入: %v, 输出: %v\n", i+1, item.Input, item.Output)
    }
}

func performAnalysis(query string) map[string]interface{} {
    // 实际的分析逻辑
    return map[string]interface{}{
        "total_sales": 150000,
        "growth":      15.5,
        "top_products": []string{"Product A", "Product B"},
    }
}

工作原理

  1. 历史管理
    • WorkflowAgent 维护最近 N 次运行的历史
    • 历史包括输入、输出、状态、指标
  2. 决策逻辑
    • Agent 首先检查能否从历史回答
    • 如果需要新数据,创建并调用 Workflow 工具
    • 工具执行完成后,Agent 处理结果
  3. 会话持久化
    • 支持从数据库读取会话
    • 每次执行更新会话历史
    • 自动保存运行结果

最佳实践

  1. 合理设置历史大小
    • 太小:无法充分利用历史
    • 太大:context 过长,影响性能
    • 建议:3-10 次
  2. 清晰的指令
    • 明确告诉 Agent 何时使用工具
    • 强调不要重复调用
    • 说明如何利用历史
  3. 流式优先
    • 对于长时间运行的 Workflow,使用流式执行
    • 提供更好的用户体验
    • 实时反馈进度
  4. 错误处理
    • Agent 会捕获 Workflow 错误
    • 提供友好的错误消息
    • 支持重试机制

技术特性

功能Aster WorkflowAgent
历史访问
工具创建
流式执行
会话管理
并发支持✅ (goroutines)
线程安全✅ (sync.RWMutex)