Aster Workflow 提供 8 种步骤类型,满足不同的业务需求。
最基础的步骤类型,执行自定义函数:
step := workflow.NewFunctionStep("my_function",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 自定义逻辑
result := doSomething(input.Input)
return &workflow.StepOutput{
Content: result,
Metadata: make(map[string]interface{}),
}, nil
},
)
// SimpleFunction - 简单的输入输出函数
step := workflow.SimpleFunction("process", func(input interface{}) (interface{}, error) {
return "processed", nil
})
// TransformFunction - 纯转换函数
step := workflow.TransformFunction("transform", func(input interface{}) interface{} {
return fmt.Sprintf("transformed: %v", input)
})
根据条件选择不同的执行路径:
trueStep := workflow.NewFunctionStep("if_true", ...)
falseStep := workflow.NewFunctionStep("if_false", ...)
condStep := workflow.NewConditionStep("check",
func(input *workflow.StepInput) bool {
// 条件判断
data := input.PreviousStepContent.(map[string]interface{})
return data["value"].(int) > 10
},
trueStep, // 条件为真时执行
falseStep, // 条件为假时执行
)
重复执行步骤,支持最大次数和提前终止:
loopBody := workflow.NewFunctionStep("body", ...)
loopStep := workflow.NewLoopStep("loop", loopBody, 10). // 最多10次
WithStopCondition(func(output *workflow.StepOutput) bool {
// 提前终止条件
return output.Content == "done"
})
特点:
并行执行多个步骤:
task1 := workflow.NewFunctionStep("task1", ...)
task2 := workflow.NewFunctionStep("task2", ...)
task3 := workflow.NewFunctionStep("task3", ...)
parallelStep := workflow.NewParallelStep("parallel", task1, task2, task3)
特点:
从映射中选择一个步骤执行(旧版简单路由):
routes := map[string]workflow.Step{
"route_a": stepA,
"route_b": stepB,
}
routerStep := workflow.NewRouterStep("router",
func(input *workflow.StepInput) string {
// 返回路由名称
return "route_a"
},
routes,
)
顺序执行多个步骤:
step1 := workflow.NewFunctionStep("s1", ...)
step2 := workflow.NewFunctionStep("s2", ...)
step3 := workflow.NewFunctionStep("s3", ...)
group := workflow.NewStepsGroup("group", step1, step2, step3)
特点:
执行一个 Agent:
agent := agent.NewAgent(...)
agentStep := workflow.NewAgentStep("agent_task", agent)
执行 Room 多 Agent 协作:
room := core.NewRoom(pool)
roomStep := workflow.NewRoomStep("team_task", room)
| 需求 | 推荐步骤类型 |
|---|---|
| 简单处理 | FunctionStep |
| 条件判断 | ConditionStep |
| 重复执行 | LoopStep |
| 并发处理 | ParallelStep |
| 动态路由 | Router 类 |
| 顺序组合 | StepsGroup |
| Agent 调用 | AgentStep |
| 团队协作 | RoomStep |