Aster Workflow 系统提供了强大的工作流编排能力。
Workflow 是一系列步骤(Step)的有序集合,每个步骤可以是:
package main
import (
"context"
"errors"
"io"
"github.com/astercloud/aster/pkg/workflow"
)
func main() {
ctx := context.Background()
// 创建 Workflow
wf := workflow.New("DataProcessing").
WithStream(). // 启用流式
WithDebug() // 启用调试
// 添加步骤
wf.AddStep(workflow.NewFunctionStep("load",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 加载数据
return &workflow.StepOutput{
Content: map[string]interface{}{"data": "loaded"},
Metadata: make(map[string]interface{}),
}, nil
},
))
wf.AddStep(workflow.NewFunctionStep("process",
func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 处理数据 - 可以访问前一步的输出
prevData := input.PreviousStepContent
return &workflow.StepOutput{
Content: map[string]interface{}{"result": "processed"},
Metadata: make(map[string]interface{}),
}, nil
},
))
// 验证
if err := wf.Validate(); err != nil {
panic(err)
}
// 执行
input := &workflow.WorkflowInput{Input: "start"}
reader := wf.Execute(ctx, input)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
panic(err)
}
if event.Type == workflow.EventWorkflowCompleted {
// Workflow 完成
data := event.Data.(map[string]interface{})
println("完成:", data["output"])
}
}
}
步骤之间自动传递数据:
step1 := workflow.NewFunctionStep("step1", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
return &workflow.StepOutput{
Content: "Hello", // 这个输出会传递给下一步
}, nil
})
step2 := workflow.NewFunctionStep("step2", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
// 访问前一步的输出
prevContent := input.PreviousStepContent.(string) // "Hello"
return &workflow.StepOutput{
Content: prevContent + " World",
}, nil
})
wf.AddStep(step1).AddStep(step2)
获取实时事件:
reader := wf.Execute(ctx, input)
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Printf("错误: %v", err)
continue
}
switch event.Type {
case workflow.EventWorkflowStarted:
fmt.Println("Workflow 开始")
case workflow.EventStepStarted:
fmt.Printf("步骤开始: %s\n", event.StepName)
case workflow.EventStepCompleted:
fmt.Printf("步骤完成: %s\n", event.StepName)
case workflow.EventWorkflowCompleted:
fmt.Println("Workflow 完成")
}
}