Basic

步骤类型

Workflow 中的8种步骤类型详解

步骤类型

Aster Workflow 提供 8 种步骤类型,满足不同的业务需求。

FunctionStep - 自定义函数

最基础的步骤类型,执行自定义函数:

step := workflow.NewFunctionStep("my_function",
    func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
        // 自定义逻辑
        result := doSomething(input.Input)

        return &workflow.StepOutput{
            Content: result,
            Metadata: make(map[string]interface{}),
        }, nil
    },
)

简化助手

// SimpleFunction - 简单的输入输出函数
step := workflow.SimpleFunction("process", func(input interface{}) (interface{}, error) {
    return "processed", nil
})

// TransformFunction - 纯转换函数
step := workflow.TransformFunction("transform", func(input interface{}) interface{} {
    return fmt.Sprintf("transformed: %v", input)
})

ConditionStep - 条件分支

根据条件选择不同的执行路径:

trueStep := workflow.NewFunctionStep("if_true", ...)
falseStep := workflow.NewFunctionStep("if_false", ...)

condStep := workflow.NewConditionStep("check",
    func(input *workflow.StepInput) bool {
        // 条件判断
        data := input.PreviousStepContent.(map[string]interface{})
        return data["value"].(int) > 10
    },
    trueStep,   // 条件为真时执行
    falseStep,  // 条件为假时执行
)

LoopStep - 循环执行

重复执行步骤,支持最大次数和提前终止:

loopBody := workflow.NewFunctionStep("body", ...)

loopStep := workflow.NewLoopStep("loop", loopBody, 10).  // 最多10次
    WithStopCondition(func(output *workflow.StepOutput) bool {
        // 提前终止条件
        return output.Content == "done"
    })

特点

  • 每次迭代的输出会作为下次的输入
  • 支持提前终止
  • 收集所有迭代结果

ParallelStep - 并行执行

并行执行多个步骤:

task1 := workflow.NewFunctionStep("task1", ...)
task2 := workflow.NewFunctionStep("task2", ...)
task3 := workflow.NewFunctionStep("task3", ...)

parallelStep := workflow.NewParallelStep("parallel", task1, task2, task3)

特点

  • 所有步骤同时开始
  • 等待全部完成
  • 返回所有结果

RouterStep - 简单路由

从映射中选择一个步骤执行(旧版简单路由):

routes := map[string]workflow.Step{
    "route_a": stepA,
    "route_b": stepB,
}

routerStep := workflow.NewRouterStep("router",
    func(input *workflow.StepInput) string {
        // 返回路由名称
        return "route_a"
    },
    routes,
)

StepsGroup - 步骤组

顺序执行多个步骤:

step1 := workflow.NewFunctionStep("s1", ...)
step2 := workflow.NewFunctionStep("s2", ...)
step3 := workflow.NewFunctionStep("s3", ...)

group := workflow.NewStepsGroup("group", step1, step2, step3)

特点

  • 顺序执行
  • 数据流在步骤间传递
  • 返回最后一步的输出

AgentStep - Agent 执行

执行一个 Agent:

agent := agent.NewAgent(...)

agentStep := workflow.NewAgentStep("agent_task", agent)

RoomStep - Room 协作

执行 Room 多 Agent 协作:

room := core.NewRoom(pool)

roomStep := workflow.NewRoomStep("team_task", room)

选择建议

需求推荐步骤类型
简单处理FunctionStep
条件判断ConditionStep
重复执行LoopStep
并发处理ParallelStep
动态路由Router 类
顺序组合StepsGroup
Agent 调用AgentStep
团队协作RoomStep