WorkflowAgent 是一个专门用于 Workflow 编排的受限 Agent,让 AI Agent 决定何时运行 Workflow。
Agentic Workflow 的核心思想:
// 创建 WorkflowAgent
agent := workflow.NewWorkflowAgent(
"gpt-4", // 模型
"", // 指令(空则使用默认)
true, // 启用历史记录
5, // 保留最近5次运行
)
// 附加 Workflow
agent.AttachWorkflow(myWorkflow)
result, err := agent.Run(ctx, "分析最近的销售数据")
if err != nil {
log.Fatal(err)
}
fmt.Println(result)
eventCh := agent.RunStream(ctx, "生成月度报告")
for event := range eventCh {
switch event.Type {
case workflow.AgentEventStart:
fmt.Println("Agent 开始")
case workflow.AgentEventWorkflowEvent:
// Workflow 事件
wfEvent := event.Data["workflow_event"]
case workflow.AgentEventResponse:
// Agent 响应
response := event.Data["response"]
fmt.Printf("回答: %v\n", response)
case workflow.AgentEventComplete:
fmt.Println("完成")
case workflow.AgentEventError:
log.Printf("错误: %v", event.Error)
}
}
wf := workflow.New("DataAnalysis")
// ... 添加步骤 ...
agent := workflow.NewWorkflowAgent("gpt-4", "", true, 5)
// Agentic 执行
result, err := wf.AgenticExecute(ctx, agent, "用户查询")
eventCh := wf.AgenticExecuteStream(ctx, agent, "用户查询")
for event := range eventCh {
// 处理事件
}
agent := workflow.NewWorkflowAgent("gpt-4", "", true, 5).
WithInstructions(customInstructions). // 自定义指令
WithHistorySize(10). // 历史记录数量
WithModel("gpt-4-turbo"). // 更改模型
EnableHistory(true) // 启用/禁用历史
customInstructions := `你是一个数据分析助手。你的工作是:
1. 检查历史记录,如果用户的问题已经在历史中有答案,直接回答
2. 如果需要新的分析,调用 run_workflow 工具
3. 不要重复调用工具
历史记录会在 {workflow_context} 中提供。
`
agent := workflow.NewWorkflowAgent("gpt-4", customInstructions, true, 5)
// 获取历史记录
history := agent.GetWorkflowHistory()
for _, item := range history {
fmt.Printf("运行 %s:\n", item.RunID)
fmt.Printf(" 输入: %v\n", item.Input)
fmt.Printf(" 输出: %v\n", item.Output)
fmt.Printf(" 状态: %s\n", item.Status)
fmt.Printf(" 耗时: %.2fs\n", item.Duration)
}
WorkflowAgent 可以为 Workflow 创建工具函数:
session := &workflow.WorkflowSession{
ID: "session-123",
// ... 其他字段 ...
}
executionInput := &workflow.WorkflowInput{
Input: "initial input",
}
// 创建工具
tool := agent.CreateWorkflowTool(session, executionInput, true) // true = 流式
// 使用工具
result, err := tool(ctx, "执行查询")
package main
import (
"context"
"fmt"
"github.com/astercloud/aster/pkg/workflow"
)
func main() {
ctx := context.Background()
// 创建数据分析 Workflow
wf := workflow.New("DataAnalysis")
wf.AddStep(workflow.NewFunctionStep("analyze",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 执行数据分析
query := input.Input.(string)
result := performAnalysis(query)
return &workflow.StepOutput{
Content: map[string]interface{}{
"query": query,
"result": result,
"charts": []string{"chart1.png", "chart2.png"},
},
Metadata: make(map[string]interface{}),
}, nil
},
))
// 创建 WorkflowAgent
agent := workflow.NewWorkflowAgent("gpt-4", "", true, 5).
WithInstructions(`你是数据分析助手。
- 如果用户询问之前分析过的数据,直接从历史中回答
- 如果是新的查询,调用 run_workflow 工具
- 每次只调用一次工具`)
// 附加 Workflow
agent.AttachWorkflow(wf)
// 第一次查询 - 会运行 Workflow
fmt.Println("=== 第一次查询 ===")
result1, _ := agent.Run(ctx, "分析Q1销售数据")
fmt.Println(result1)
// 第二次查询 - Agent 可能从历史中回答
fmt.Println("\n=== 第二次查询 ===")
result2, _ := agent.Run(ctx, "Q1的销售额是多少?")
fmt.Println(result2)
// 新查询 - 会运行新的 Workflow
fmt.Println("\n=== 新查询 ===")
result3, _ := agent.Run(ctx, "分析Q2销售数据")
fmt.Println(result3)
// 查看历史
fmt.Println("\n=== 历史记录 ===")
for i, item := range agent.GetWorkflowHistory() {
fmt.Printf("%d. 输入: %v, 输出: %v\n", i+1, item.Input, item.Output)
}
}
func performAnalysis(query string) map[string]interface{} {
// 实际的分析逻辑
return map[string]interface{}{
"total_sales": 150000,
"growth": 15.5,
"top_products": []string{"Product A", "Product B"},
}
}
| 功能 | Aster WorkflowAgent |
|---|---|
| 历史访问 | ✅ |
| 工具创建 | ✅ |
| 流式执行 | ✅ |
| 会话管理 | ✅ |
| 并发支持 | ✅ (goroutines) |
| 线程安全 | ✅ (sync.RWMutex) |