Router 是一个独立的类,提供比 RouterStep 更强大的动态路由能力。
根据条件选择单个步骤:
routeA := workflow.NewFunctionStep("route_a", ...)
routeB := workflow.NewFunctionStep("route_b", ...)
router := workflow.SimpleRouter("route",
func(input *workflow.StepInput) string {
// 根据条件返回路由名称
if data, ok := input.Input.(map[string]interface{}); ok {
if priority, ok := data["priority"].(string); ok && priority == "high" {
return "route_a" // 高优先级
}
}
return "route_b" // 默认路由
},
map[string]workflow.Step{
"route_a": routeA,
"route_b": routeB,
},
)
wf.AddStep(router)
选择多个步骤顺序执行:
analyze := workflow.NewFunctionStep("analyze", ...)
processComplex := workflow.NewFunctionStep("process_complex", ...)
processSimple := workflow.NewFunctionStep("process_simple", ...)
finalize := workflow.NewFunctionStep("finalize", ...)
router := workflow.ChainRouter("smart_processor",
func(input *workflow.StepInput) []string {
// 根据分析结果返回处理链
if input.PreviousStepContent.(map[string]interface{})["complexity"] == "high" {
return []string{"process_complex", "finalize"}
}
return []string{"process_simple", "finalize"}
},
map[string]workflow.Step{
"process_complex": processComplex,
"process_simple": processSimple,
"finalize": finalize,
},
)
// 典型使用: 先分析,再根据结果选择处理路径
wf.AddStep(analyze).AddStep(router)
完全自定义的步骤选择逻辑:
router := workflow.DynamicRouter("dynamic",
func(input *workflow.StepInput) []workflow.Step {
// 动态创建和返回步骤
steps := []workflow.Step{}
data := input.PreviousStepContent.(map[string]interface{})
tasks := data["tasks"].([]string)
// 根据数据动态创建步骤
for _, task := range tasks {
step := workflow.NewFunctionStep(task, createTaskHandler(task))
steps = append(steps, step)
}
return steps
},
)
Router 支持流式执行,实时获取事件:
reader := router.ExecuteStream(ctx, input, true)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Printf("错误: %v", err)
continue
}
switch v := event.(type) {
case *workflow.RouterEvent:
switch v.Type {
case workflow.RouterEventStarted:
fmt.Printf("Router 开始, 选择了 %d 步骤\n", len(v.SelectedSteps))
case workflow.RouterEventCompleted:
fmt.Printf("Router 完成, 执行了 %d 步骤\n", v.ExecutedSteps)
case workflow.RouterEventFailed:
fmt.Printf("Router 失败\n")
}
case map[string]interface{}:
// 步骤进度事件
if v["type"] == "router_step_progress" {
fmt.Printf("步骤 %s 执行中\n", v["step_name"])
}
case *workflow.StepOutput:
// 最终输出
fmt.Printf("最终结果: %v\n", v.Content)
}
}
Router 中的步骤会自动传递数据:
router := workflow.ChainRouter("pipeline",
func(input *workflow.StepInput) []string {
return []string{"step1", "step2", "step3"}
},
map[string]workflow.Step{
"step1": workflow.NewFunctionStep("step1", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
return &workflow.StepOutput{Content: "data1"}, nil
}),
"step2": workflow.NewFunctionStep("step2", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 可以访问 step1 的输出
prev := input.PreviousStepContent.(string) // "data1"
return &workflow.StepOutput{Content: prev + " + data2"}, nil
}),
"step3": workflow.NewFunctionStep("step3", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 可以访问 step2 的输出
prev := input.PreviousStepContent.(string) // "data1 + data2"
return &workflow.StepOutput{Content: prev + " + data3"}, nil
}),
},
)
package main
import (
"context"
"errors"
"fmt"
"io"
"github.com/astercloud/aster/pkg/workflow"
)
func main() {
ctx := context.Background()
// 创建分析步骤
analyze := workflow.NewFunctionStep("analyze",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 分析输入数据
return &workflow.StepOutput{
Content: map[string]interface{}{
"type": "complex",
"priority": "high",
},
}, nil
},
)
// 创建处理步骤
processComplex := workflow.NewFunctionStep("process_complex",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
return &workflow.StepOutput{Content: "复杂处理完成"}, nil
},
)
processSimple := workflow.NewFunctionStep("process_simple",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
return &workflow.StepOutput{Content: "简单处理完成"}, nil
},
)
finalize := workflow.NewFunctionStep("finalize",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
result := input.PreviousStepContent
return &workflow.StepOutput{
Content: fmt.Sprintf("最终结果: %v", result),
}, nil
},
)
// 创建智能路由器
router := workflow.ChainRouter("smart_router",
func(input *workflow.StepInput) []string {
data := input.PreviousStepContent.(map[string]interface{})
if data["type"] == "complex" {
return []string{"process_complex", "finalize"}
}
return []string{"process_simple", "finalize"}
},
map[string]workflow.Step{
"process_complex": processComplex,
"process_simple": processSimple,
"finalize": finalize,
},
)
// 创建 Workflow
wf := workflow.New("RouterDemo").
WithStream().
AddStep(analyze).
AddStep(router)
// 执行
input := &workflow.WorkflowInput{Input: "test data"}
reader := wf.Execute(ctx, input)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
panic(err)
}
if event.Type == workflow.EventWorkflowCompleted {
fmt.Println(event.Data.(map[string]interface{})["output"])
}
}
}