Advanced

Router 动态路由

使用 Router 实现复杂的动态路由逻辑

Router 动态路由

Router 是一个独立的类,提供比 RouterStep 更强大的动态路由能力。

核心特性

  • 动态步骤选择 - selector 函数返回要执行的步骤列表
  • 顺序链接执行 - 选中的步骤按顺序执行,数据流传递
  • 支持多步骤 - 可以返回多个步骤并按顺序执行
  • 流式支持 - 完整的流式执行和事件系统

Router 类型

1. SimpleRouter - 简单条件路由

根据条件选择单个步骤:

routeA := workflow.NewFunctionStep("route_a", ...)
routeB := workflow.NewFunctionStep("route_b", ...)

router := workflow.SimpleRouter("route",
    func(input *workflow.StepInput) string {
        // 根据条件返回路由名称
        if data, ok := input.Input.(map[string]interface{}); ok {
            if priority, ok := data["priority"].(string); ok && priority == "high" {
                return "route_a"  // 高优先级
            }
        }
        return "route_b"  // 默认路由
    },
    map[string]workflow.Step{
        "route_a": routeA,
        "route_b": routeB,
    },
)

wf.AddStep(router)

2. ChainRouter - 链式路由

选择多个步骤顺序执行:

analyze := workflow.NewFunctionStep("analyze", ...)
processComplex := workflow.NewFunctionStep("process_complex", ...)
processSimple := workflow.NewFunctionStep("process_simple", ...)
finalize := workflow.NewFunctionStep("finalize", ...)

router := workflow.ChainRouter("smart_processor",
    func(input *workflow.StepInput) []string {
        // 根据分析结果返回处理链
        if input.PreviousStepContent.(map[string]interface{})["complexity"] == "high" {
            return []string{"process_complex", "finalize"}
        }
        return []string{"process_simple", "finalize"}
    },
    map[string]workflow.Step{
        "process_complex": processComplex,
        "process_simple":  processSimple,
        "finalize":        finalize,
    },
)

// 典型使用: 先分析,再根据结果选择处理路径
wf.AddStep(analyze).AddStep(router)

3. DynamicRouter - 动态路由

完全自定义的步骤选择逻辑:

router := workflow.DynamicRouter("dynamic",
    func(input *workflow.StepInput) []workflow.Step {
        // 动态创建和返回步骤
        steps := []workflow.Step{}

        data := input.PreviousStepContent.(map[string]interface{})
        tasks := data["tasks"].([]string)

        // 根据数据动态创建步骤
        for _, task := range tasks {
            step := workflow.NewFunctionStep(task, createTaskHandler(task))
            steps = append(steps, step)
        }

        return steps
    },
)

流式执行

Router 支持流式执行,实时获取事件:

reader := router.ExecuteStream(ctx, input, true)
for {
    event, err := reader.Recv()
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        log.Printf("错误: %v", err)
        continue
    }

    switch v := event.(type) {
    case *workflow.RouterEvent:
        switch v.Type {
        case workflow.RouterEventStarted:
            fmt.Printf("Router 开始, 选择了 %d 步骤\n", len(v.SelectedSteps))

        case workflow.RouterEventCompleted:
            fmt.Printf("Router 完成, 执行了 %d 步骤\n", v.ExecutedSteps)

        case workflow.RouterEventFailed:
            fmt.Printf("Router 失败\n")
        }

    case map[string]interface{}:
        // 步骤进度事件
        if v["type"] == "router_step_progress" {
            fmt.Printf("步骤 %s 执行中\n", v["step_name"])
        }

    case *workflow.StepOutput:
        // 最终输出
        fmt.Printf("最终结果: %v\n", v.Content)
    }
}

数据流传递

Router 中的步骤会自动传递数据:

router := workflow.ChainRouter("pipeline",
    func(input *workflow.StepInput) []string {
        return []string{"step1", "step2", "step3"}
    },
    map[string]workflow.Step{
        "step1": workflow.NewFunctionStep("step1", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            return &workflow.StepOutput{Content: "data1"}, nil
        }),
        "step2": workflow.NewFunctionStep("step2", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            // 可以访问 step1 的输出
            prev := input.PreviousStepContent.(string)  // "data1"
            return &workflow.StepOutput{Content: prev + " + data2"}, nil
        }),
        "step3": workflow.NewFunctionStep("step3", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            // 可以访问 step2 的输出
            prev := input.PreviousStepContent.(string)  // "data1 + data2"
            return &workflow.StepOutput{Content: prev + " + data3"}, nil
        }),
    },
)

完整示例

package main

import (
    "context"
    "errors"
    "fmt"
    "io"

    "github.com/astercloud/aster/pkg/workflow"
)

func main() {
    ctx := context.Background()

    // 创建分析步骤
    analyze := workflow.NewFunctionStep("analyze",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            // 分析输入数据
            return &workflow.StepOutput{
                Content: map[string]interface{}{
                    "type":     "complex",
                    "priority": "high",
                },
            }, nil
        },
    )

    // 创建处理步骤
    processComplex := workflow.NewFunctionStep("process_complex",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            return &workflow.StepOutput{Content: "复杂处理完成"}, nil
        },
    )

    processSimple := workflow.NewFunctionStep("process_simple",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            return &workflow.StepOutput{Content: "简单处理完成"}, nil
        },
    )

    finalize := workflow.NewFunctionStep("finalize",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            result := input.PreviousStepContent
            return &workflow.StepOutput{
                Content: fmt.Sprintf("最终结果: %v", result),
            }, nil
        },
    )

    // 创建智能路由器
    router := workflow.ChainRouter("smart_router",
        func(input *workflow.StepInput) []string {
            data := input.PreviousStepContent.(map[string]interface{})
            if data["type"] == "complex" {
                return []string{"process_complex", "finalize"}
            }
            return []string{"process_simple", "finalize"}
        },
        map[string]workflow.Step{
            "process_complex": processComplex,
            "process_simple":  processSimple,
            "finalize":        finalize,
        },
    )

    // 创建 Workflow
    wf := workflow.New("RouterDemo").
        WithStream().
        AddStep(analyze).
        AddStep(router)

    // 执行
    input := &workflow.WorkflowInput{Input: "test data"}
    reader := wf.Execute(ctx, input)
    for {
        event, err := reader.Recv()
        if err != nil {
            if errors.Is(err, io.EOF) {
                break
            }
            panic(err)
        }
        if event.Type == workflow.EventWorkflowCompleted {
            fmt.Println(event.Data.(map[string]interface{})["output"])
        }
    }
}

最佳实践

  1. 合理选择 Router 类型
    • 简单条件 → SimpleRouter
    • 固定流程 → ChainRouter
    • 动态生成 → DynamicRouter
  2. 数据验证
    • 在 selector 函数中验证输入数据
    • 提供默认路由处理异常情况
  3. 性能考虑
    • selector 函数应该快速返回
    • 避免在 selector 中执行耗时操作
  4. 错误处理
    • 返回空数组表示无步骤执行
    • Router 会优雅处理错误情况