Basic

Workflow 快速开始

开始使用 Aster Workflow 系统

Workflow 快速开始

Aster Workflow 系统提供了强大的工作流编排能力。

基础概念

Workflow 是一系列步骤(Step)的有序集合,每个步骤可以是:

  • FunctionStep - 自定义函数
  • AgentStep - Agent 执行
  • RoomStep - Room 协作执行
  • ConditionStep - 条件分支
  • LoopStep - 循环执行
  • ParallelStep - 并行执行
  • RouterStep - 路由选择
  • StepsGroup - 步骤组

创建第一个 Workflow

package main

import (
    "context"
    "errors"
    "io"

    "github.com/astercloud/aster/pkg/workflow"
)

func main() {
    ctx := context.Background()

    // 创建 Workflow
    wf := workflow.New("DataProcessing").
        WithStream().        // 启用流式
        WithDebug()          // 启用调试

    // 添加步骤
    wf.AddStep(workflow.NewFunctionStep("load",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            // 加载数据
            return &workflow.StepOutput{
                Content: map[string]interface{}{"data": "loaded"},
                Metadata: make(map[string]interface{}),
            }, nil
        },
    ))

    wf.AddStep(workflow.NewFunctionStep("process",
        func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
            // 处理数据 - 可以访问前一步的输出
            prevData := input.PreviousStepContent
            return &workflow.StepOutput{
                Content: map[string]interface{}{"result": "processed"},
                Metadata: make(map[string]interface{}),
            }, nil
        },
    ))

    // 验证
    if err := wf.Validate(); err != nil {
        panic(err)
    }

    // 执行
    input := &workflow.WorkflowInput{Input: "start"}
    reader := wf.Execute(ctx, input)
    for {
        event, err := reader.Recv()
        if err != nil {
            if errors.Is(err, io.EOF) {
                break
            }
            panic(err)
        }

        if event.Type == workflow.EventWorkflowCompleted {
            // Workflow 完成
            data := event.Data.(map[string]interface{})
            println("完成:", data["output"])
        }
    }
}

数据流传递

步骤之间自动传递数据:

step1 := workflow.NewFunctionStep("step1", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
    return &workflow.StepOutput{
        Content: "Hello",  // 这个输出会传递给下一步
    }, nil
})

step2 := workflow.NewFunctionStep("step2", func(ctx context.Context, input *workflow.StepInput) (*workflow.StepOutput, error) {
    // 访问前一步的输出
    prevContent := input.PreviousStepContent.(string)  // "Hello"

    return &workflow.StepOutput{
        Content: prevContent + " World",
    }, nil
})

wf.AddStep(step1).AddStep(step2)

流式执行

获取实时事件:

reader := wf.Execute(ctx, input)
for {
    event, err := reader.Recv()
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        log.Printf("错误: %v", err)
        continue
    }

    switch event.Type {
    case workflow.EventWorkflowStarted:
        fmt.Println("Workflow 开始")

    case workflow.EventStepStarted:
        fmt.Printf("步骤开始: %s\n", event.StepName)

    case workflow.EventStepCompleted:
        fmt.Printf("步骤完成: %s\n", event.StepName)

    case workflow.EventWorkflowCompleted:
        fmt.Println("Workflow 完成")
    }
}

下一步