核心概念

事件系统

理解aster的三通道事件驱动架构

事件系统

aster采用事件驱动架构,通过三个独立的事件通道分离不同类型的信息流,实现清晰的关注点分离。

🎯 三通道设计

┌─────────────────────────────────────────┐
│             Agent Runtime               │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │       Progress Channel          │───┼──► UI/前端
│  │  (文本流、工具执行进度)         │   │    实时展示
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │        Control Channel          │───┼──► 审批服务
│  │  (工具审批、人机交互)           │   │    安全网关
│  └─────────────────────────────────┘   │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │        Monitor Channel          │───┼──► 监控系统
│  │  (错误、审计、性能指标)         │   │    日志平台
│  └─────────────────────────────────┘   │
└─────────────────────────────────────────┘

通道对比

通道用途典型订阅者事件频率关键性
Progress实时进度展示前端UI、聊天界面高(毫秒级)UI体验
Control审批决策审批服务、安全网关低(按需)安全控制
Monitor监控审计监控系统、日志平台中(事件级)可观测性

📡 Progress通道

Progress通道用于实时展示Agent的执行进度,是用户界面的主要数据源。

事件类型

1. ProgressTextChunkEvent

流式文本输出:

type ProgressTextChunkEvent struct {
    Delta string  // 增量文本
}

使用场景:实时显示AI回复

eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
for envelope := range eventCh {
    if e, ok := envelope.Event.(*types.ProgressTextChunkEvent); ok {
        fmt.Print(e.Delta)  // 流式输出
    }
}

2. ProgressToolStartEvent

工具调用开始:

type ProgressToolStartEvent struct {
    Call ToolCall  // 工具调用信息
}

type ToolCall struct {
    ID    string                 // 调用ID
    Name  string                 // 工具名称
    Input map[string]interface{} // 输入参数
}

使用场景:显示"正在执行XXX工具"

case *types.ProgressToolStartEvent:
    fmt.Printf("\n[🔧 Tool] %s\n", e.Call.Name)
    if desc, ok := toolDescriptions[e.Call.Name]; ok {
        fmt.Printf("  %s\n", desc)
    }

3. ProgressToolEndEvent

工具调用结束:

type ProgressToolEndEvent struct {
    CallID string                 // 调用ID
    Result map[string]interface{} // 执行结果
    Error  error                  // 错误(如有)
}

使用场景:显示工具执行结果

case *types.ProgressToolEndEvent:
    if e.Error != nil {
        fmt.Printf("  ❌ 失败: %v\n", e.Error)
    } else {
        fmt.Printf("  ✅ 完成\n")
    }

4. ProgressCompleteEvent

对话完成:

type ProgressCompleteEvent struct {
    FinalText string   // 最终文本
    StopReason string  // 停止原因
}

停止原因

  • "end_turn": 正常结束
  • "max_tokens": 达到最大token数
  • "stop_sequence": 匹配停止序列
  • "tool_use": 需要工具执行(内部使用)

完整订阅示例

package main

import (
    "fmt"
    "github.com/astercloud/aster/pkg/agent"
    "github.com/astercloud/aster/pkg/types"
)

func subscribeProgress(ag *agent.Agent) {
    eventCh := ag.Subscribe([]types.AgentChannel{
        types.ChannelProgress,
    }, nil)

    for envelope := range eventCh {
        switch e := envelope.Event.(type) {
        case *types.ProgressTextChunkEvent:
            // 流式输出文本
            fmt.Print(e.Delta)

        case *types.ProgressToolStartEvent:
            // 工具开始
            fmt.Printf("\n\n🔧 [Tool] %s\n", e.Call.Name)
            fmt.Printf("   Input: %v\n", e.Call.Input)

        case *types.ProgressToolEndEvent:
            // 工具结束
            if e.Error != nil {
                fmt.Printf("   ❌ Error: %v\n", e.Error)
            } else {
                fmt.Printf("   ✅ Success\n")
            }

        case *types.ProgressCompleteEvent:
            // 对话完成
            fmt.Printf("\n\n✅ 完成 (Reason: %s)\n", e.StopReason)
        }
    }
}

🎛️ Control通道

Control通道用于人机交互和审批流程,实现安全控制。

事件类型

1. ControlToolApprovalRequest

工具审批请求:

type ControlToolApprovalRequest struct {
    ToolCall ToolCall  // 待审批的工具调用
    Reason   string    // 需要审批的原因
}

使用场景:敏感操作审批

eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelControl}, nil)
for envelope := range eventCh {
    if e, ok := envelope.Event.(*types.ControlToolApprovalRequest); ok {
        // 显示审批请求
        fmt.Printf("⚠️  需要审批: %s\n", e.ToolCall.Name)
        fmt.Printf("   原因: %s\n", e.Reason)

        // 获取用户确认
        fmt.Print("   是否允许? (y/n): ")
        var answer string
        fmt.Scanln(&answer)

        // 响应审批
        if answer == "y" {
            ag.ApproveToolCall(envelope.ID, true)
        } else {
            ag.ApproveToolCall(envelope.ID, false)
        }
    }
}

2. ControlUserInputRequest

请求用户输入:

type ControlUserInputRequest struct {
    Prompt  string   // 提示信息
    Options []string // 可选项(如有)
}

使用场景:需要用户提供额外信息

case *types.ControlUserInputRequest:
    fmt.Printf("💭 Agent请求输入: %s\n", e.Prompt)
    if len(e.Options) > 0 {
        for i, opt := range e.Options {
            fmt.Printf("   %d. %s\n", i+1, opt)
        }
    }

    // 获取输入
    var input string
    fmt.Scanln(&input)

    // 发送响应
    ag.RespondToInputRequest(envelope.ID, input)

权限策略配置

// 配置工具权限
config := &types.AgentConfig{
    ToolPermissions: map[string]types.PermissionPolicy{
        "Write": types.PermissionAsk,    // 需要审批
        "fs_delete": types.PermissionDeny,  // 拒绝
        "Read": types.PermissionAllow,   // 允许
    },
}

权限策略:

  • PermissionAllow: 自动允许
  • PermissionAsk: 需要审批
  • PermissionDeny: 自动拒绝

📊 Monitor通道

Monitor通道用于监控、审计和性能分析。

事件类型

1. MonitorErrorEvent

错误事件:

type MonitorErrorEvent struct {
    Error   error  // 错误对象
    Context string // 错误上下文
    AgentID string // Agent ID
}

使用场景:错误监控告警

case *types.MonitorErrorEvent:
    log.Printf("[ERROR] Agent %s: %v (Context: %s)",
        e.AgentID, e.Error, e.Context)

    // 发送告警
    alerting.SendAlert("AgentError", e.Error.Error())

2. MonitorToolExecutionEvent

工具执行审计:

type MonitorToolExecutionEvent struct {
    ToolName  string        // 工具名称
    Duration  time.Duration // 执行时长
    Success   bool          // 是否成功
    InputSize int           // 输入大小
    OutputSize int          // 输出大小
}

使用场景:性能监控和审计日志

case *types.MonitorToolExecutionEvent:
    log.Printf("[AUDIT] Tool=%s Duration=%v Success=%v",
        e.ToolName, e.Duration, e.Success)

    // 记录到审计数据库
    auditDB.LogToolExecution(e)

    // 性能指标
    metrics.RecordToolDuration(e.ToolName, e.Duration)

3. MonitorModelCallEvent

模型调用监控:

type MonitorModelCallEvent struct {
    Model         string        // 模型名称
    InputTokens   int           // 输入token数
    OutputTokens  int           // 输出token数
    Duration      time.Duration // 调用时长
    Cost          float64       // 成本(美元)
}

使用场景:成本和性能监控

case *types.MonitorModelCallEvent:
    // 记录token使用
    metrics.RecordTokenUsage(e.Model, e.InputTokens, e.OutputTokens)

    // 成本统计
    totalCost += e.Cost
    fmt.Printf("本次调用成本: $%.4f (累计: $%.4f)\n", e.Cost, totalCost)

    // 性能监控
    if e.Duration > 30*time.Second {
        log.Printf("[WARN] 模型调用耗时过长: %v", e.Duration)
    }

完整监控示例

func setupMonitoring(ag *agent.Agent) {
    monitorCh := ag.Subscribe([]types.AgentChannel{
        types.ChannelMonitor,
    }, nil)

    go func() {
        var stats struct {
            TotalCost    float64
            ToolCalls    int
            ToolErrors   int
            AvgDuration  time.Duration
        }

        for envelope := range monitorCh {
            switch e := envelope.Event.(type) {
            case *types.MonitorErrorEvent:
                log.Printf("❌ Error: %v", e.Error)
                stats.ToolErrors++

            case *types.MonitorToolExecutionEvent:
                stats.ToolCalls++
                stats.AvgDuration = (stats.AvgDuration + e.Duration) / 2

            case *types.MonitorModelCallEvent:
                stats.TotalCost += e.Cost
                log.Printf("💰 Cost: $%.4f (Total: $%.4f)",
                    e.Cost, stats.TotalCost)
            }
        }
    }()
}

🔧 订阅和过滤

多通道订阅

// 订阅多个通道
eventCh := ag.Subscribe([]types.AgentChannel{
    types.ChannelProgress,
    types.ChannelMonitor,
}, nil)

for envelope := range eventCh {
    // 处理来自两个通道的事件
}

事件过滤

// 只订阅特定事件类型
eventCh := ag.Subscribe(
    []types.AgentChannel{types.ChannelProgress},
    &types.EventFilter{
        EventTypes: []string{
            "progress.text_chunk",
            "progress.complete",
        },
    },
)

多订阅者

// 订阅者1:UI展示
uiCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
go handleUI(uiCh)

// 订阅者2:日志记录
logCh := ag.Subscribe([]types.AgentChannel{types.ChannelMonitor}, nil)
go handleLogs(logCh)

// 订阅者3:审批处理
controlCh := ag.Subscribe([]types.AgentChannel{types.ChannelControl}, nil)
go handleApprovals(controlCh)

🎯 最佳实践

1. 分离关注点

// ✅ 推荐:每个通道一个处理器
go handleProgress(ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil))
go handleControl(ag.Subscribe([]types.AgentChannel{types.ChannelControl}, nil))
go handleMonitor(ag.Subscribe([]types.AgentChannel{types.ChannelMonitor}, nil))

// ❌ 不推荐:混在一起处理
eventCh := ag.Subscribe(allChannels, nil)
// 一个goroutine处理所有类型的事件

2. 异步处理

// ✅ 推荐:异步处理事件
go func() {
    for event := range eventCh {
        processEvent(event)
    }
}()

// ❌ 不推荐:同步阻塞主线程
for event := range eventCh {
    processEvent(event)  // 阻塞主线程
}

3. 错误恢复

go func() {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("事件处理panic: %v", r)
        }
    }()

    for event := range eventCh {
        if err := processEvent(event); err != nil {
            log.Printf("处理事件失败: %v", err)
            continue  // 继续处理下一个事件
        }
    }
}()

4. 资源清理

eventCh := ag.Subscribe(channels, nil)
defer func() {
    // 关闭订阅
    ag.Unsubscribe(eventCh)
}()

5. 背压处理

// 使用带缓冲的channel
eventCh := ag.Subscribe(channels, &types.SubscribeOptions{
    BufferSize: 100,  // 缓冲100个事件
})

// 或使用非阻塞读取
select {
case event := <-eventCh:
    processEvent(event)
case <-time.After(100 * time.Millisecond):
    // 超时,跳过
}

📊 性能考虑

事件频率

事件类型典型频率处理时间要求
ProgressTextChunk10-50/秒<10ms
ProgressToolStart/End1-5/对话<100ms
MonitorEvent按需<50ms
ControlEvent罕见人工决策

性能优化

// 批量处理文本chunk
var buffer strings.Builder
ticker := time.NewTicker(100 * time.Millisecond)

for {
    select {
    case event := <-eventCh:
        if e, ok := event.(*types.ProgressTextChunkEvent); ok {
            buffer.WriteString(e.Delta)
        }
    case <-ticker.C:
        if buffer.Len() > 0 {
            displayText(buffer.String())
            buffer.Reset()
        }
    }
}

📚 下一步

🔗 相关API