工作流 Agent 是 aster Phase 7 引入的核心功能,用于编排多个子 Agent 的执行流程。基于 Google ADK-Go 的设计,使用 stream.Reader 实现高效的流式处理。
在复杂的 AI 任务中,单个 Agent 往往难以处理所有场景。工作流 Agent 提供了结构化的编排能力,将大任务分解为多个小任务,由专门的子 Agent 协作完成。
优势:
graph TB
WorkflowAgent[工作流 Agent] --> Parallel[ParallelAgent<br/>并行执行]
WorkflowAgent --> Sequential[SequentialAgent<br/>顺序执行]
WorkflowAgent --> Loop[LoopAgent<br/>循环执行]
Parallel -->|使用场景| P1[多方案比较]
Parallel --> P2[并行数据收集]
Parallel --> P3[候选生成]
Sequential -->|使用场景| S1[流水线处理]
Sequential --> S2[多阶段任务]
Sequential --> S3[数据分析管道]
Loop -->|使用场景| L1[迭代优化]
Loop --> L2[质量提升循环]
Loop --> L3[多轮对话]
style Parallel fill:#10b981
style Sequential fill:#3b82f6
style Loop fill:#f59e0b
同时执行多个子 Agent,收集所有结果。
sequenceDiagram
participant User
participant ParallelAgent
participant AgentA
participant AgentB
participant AgentC
User->>ParallelAgent: Execute("求解问题")
par 并行执行
ParallelAgent->>AgentA: Execute("方案 A")
ParallelAgent->>AgentB: Execute("方案 B")
ParallelAgent->>AgentC: Execute("方案 C")
AgentA->>ParallelAgent: 结果 A
AgentB->>ParallelAgent: 结果 B
AgentC->>ParallelAgent: 结果 C
end
ParallelAgent->>User: 所有结果
特点:
errgroup 管理并发使用场景:
按顺序依次执行子 Agent,前一步的输出可作为后一步的上下文。
sequenceDiagram
participant User
participant SequentialAgent
participant Step1 as 数据收集
participant Step2 as 数据分析
participant Step3 as 报告生成
User->>SequentialAgent: Execute("处理数据")
SequentialAgent->>Step1: Execute("收集数据")
Step1->>SequentialAgent: 原始数据
SequentialAgent->>Step2: Execute("分析数据")
Step2->>SequentialAgent: 分析结果
SequentialAgent->>Step3: Execute("生成报告")
Step3->>SequentialAgent: 最终报告
SequentialAgent->>User: 完整流程结果
特点:
sequential_step)LoopAgent(MaxIterations=1) 的特例使用场景:
重复执行子 Agent 直到满足终止条件或达到最大迭代次数。
sequenceDiagram
participant User
participant LoopAgent
participant Critic as 评估 Agent
participant Improver as 改进 Agent
User->>LoopAgent: Execute("优化代码")
loop 直到质量达标或达到最大次数
LoopAgent->>Critic: Execute("评估质量")
Critic->>LoopAgent: 质量分数: 75
alt 质量未达标
LoopAgent->>Improver: Execute("提出改进")
Improver->>LoopAgent: 改进建议
Note over LoopAgent: 继续下一轮迭代
else 质量达标 (>= 90)
Note over LoopAgent: 停止循环
end
end
LoopAgent->>User: 最终优化结果
特点:
StopCondition)MaxIterations)loop_iteration)MaxIterations 或 StopCondition 之一使用场景:
所有工作流 Agent 都实现统一的接口:
type Agent interface {
Name() string
Execute(ctx context.Context, message string) *stream.Reader[*session.Event]
}
关键特性:
stream.Reader 实现流式接口context.Context)Recv() 返回 (event, error) 元组,支持错误传播工作流 Agent 会丰富事件元数据:
type Event struct {
ID string
Timestamp time.Time
AgentID string
Branch string // 事件来源路径
Content types.Message
Actions types.EventActions
Metadata map[string]interface{} // 工作流元数据
}
元数据字段:
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
branch | string | 事件来源路径 | "Pipeline.Analyzer.iter1" |
parallel_index | int | ParallelAgent 子 Agent 索引 | 0, 1, 2 |
parallel_agent | string | ParallelAgent 名称 | "MultiAlgorithm" |
sequential_step | int | SequentialAgent 当前步骤(1-based) | 2 |
total_steps | int | SequentialAgent 总步骤数 | 3 |
sequential_agent | string | SequentialAgent 名称 | "DataPipeline" |
loop_iteration | uint | LoopAgent 当前迭代(0-based) | 1 |
loop_agent | string | LoopAgent 名称 | "OptimizationLoop" |
sub_agent_index | int | 子 Agent 在列表中的索引 | 0 |
工作流 Agent 支持嵌套使用,构建复杂的多层级编排:
graph TB
Root[顺序工作流]
Root --> Step1[并行收集数据]
Root --> Step2[分析数据]
Root --> Step3[循环优化报告]
Step1 --> S1A[数据源 A]
Step1 --> S1B[数据源 B]
Step1 --> S1C[数据源 C]
Step3 --> S3A[评估报告质量]
Step3 --> S3B[改进报告]
S3B -.循环.-> S3A
style Root fill:#3b82f6
style Step1 fill:#10b981
style Step3 fill:#f59e0b
示例代码:
// 第一层:并行收集
parallelCollector, _ := workflow.NewParallelAgent(workflow.ParallelConfig{
Name: "ParallelCollector",
SubAgents: []workflow.Agent{
NewDataSourceAgent("Source1"),
NewDataSourceAgent("Source2"),
NewDataSourceAgent("Source3"),
},
})
// 第二层:顺序执行(包含嵌套的并行流程)
nestedWorkflow, _ := workflow.NewSequentialAgent(workflow.SequentialConfig{
Name: "NestedWorkflow",
SubAgents: []workflow.Agent{
parallelCollector, // 并行收集
analyzer, // 串行分析
reporter, // 串行报告
},
})
| 场景 | 推荐模式 | 原因 |
|---|---|---|
| 需要比较多个方案 | ParallelAgent | 同时评估,节省时间 |
| 步骤间有依赖关系 | SequentialAgent | 保证顺序,前一步输出传递给下一步 |
| 需要迭代改进 | LoopAgent | 循环优化直到满足条件 |
| 先并行后串行 | Sequential + Parallel | 嵌套工作流 |
// ✅ 推荐:使用 stream.Reader 流式处理
reader := workflow.Execute(ctx, msg)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Printf("错误: %v", err)
continue
}
// 实时处理事件,内存占用 O(1)
handleEvent(event)
}
// ❌ 避免:收集所有结果再处理
var results []Event
reader := workflow.Execute(ctx, msg)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
results = append(results, event) // 内存占用 O(n)
}
// 然后处理 results...
reader := sequential.Execute(ctx, "任务")
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
// 记录错误
log.Printf("错误: %v", err)
// 根据业务决定是否继续
if isCriticalError(err) {
break // 中断工作流
}
continue // 继续处理下一个事件
}
// 处理正常事件
processEvent(event)
}
// 设置超时
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
// 执行工作流
reader := workflow.Execute(ctx, "任务")
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
// 检查上下文取消
if ctx.Err() != nil {
fmt.Println("工作流被取消或超时")
break
}
// 处理事件
handleEvent(event)
}
// 方式1: 基于质量分数
StopCondition: func(event *session.Event) bool {
return event.Metadata["quality_score"].(int) >= 90
}
// 方式2: 基于错误检测
StopCondition: func(event *session.Event) bool {
return event.Metadata["error_count"].(int) == 0
}
// 方式3: 组合条件
StopCondition: func(event *session.Event) bool {
score := event.Metadata["quality_score"].(int)
attempts := event.Metadata["attempts"].(int)
// 质量达标或尝试次数过多
return score >= 90 || attempts >= 10
}
aster 的工作流 Agent 设计参考了 Google ADK-Go:
| 特性 | ADK-Go | aster | 说明 |
|---|---|---|---|
| ParallelAgent | ✅ | ✅ | 并行执行多个子 Agent |
| SequentialAgent | ✅ | ✅ | 顺序执行(LoopAgent 特例) |
| LoopAgent | ✅ | ✅ | 循环执行直到条件满足 |
| stream.Reader | - | ✅ | 高效流式接口 |
| StopCondition | ✅ | ✅ | 自定义停止条件 |
| Escalate 机制 | ✅ | ✅ | 通过 EventActions.Escalate |
| Branch 追踪 | ✅ | ✅ | event.Branch 字段 |
| 元数据丰富 | ✅ | ✅ | 自动添加 iteration、step 等 |
差异:
A: 没有区别。SequentialAgent 内部就是使用 LoopAgent(MaxIterations=1) 实现的。
A: 所有子 Agent 同时启动,但事件返回顺序不确定(取决于哪个 Agent 先完成)。如果需要确定顺序,使用 SequentialAgent。
A: 必须设置 MaxIterations 或 StopCondition 之一。建议同时设置两者:
MaxIterations: 10, // 最多10次迭代
StopCondition: func(event *session.Event) bool {
return event.Metadata["success"].(bool) // 或提前停止
}
A: 使用 event.Branch 字段追踪事件来源:
reader := nestedWorkflow.Execute(ctx, msg)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
// Branch 示例: "Pipeline.ParallelCollector.Source1"
fmt.Printf("[%s] %s\n", event.Branch, event.Content.Content)
}
A: 是的,所有事件可以通过 Session 系统持久化到 PostgreSQL 或 MySQL。参见 Session 持久化文档。