aster采用事件驱动架构,通过三个独立的事件通道分离不同类型的信息流,实现清晰的关注点分离。
┌─────────────────────────────────────────┐
│ Agent Runtime │
│ │
│ ┌─────────────────────────────────┐ │
│ │ Progress Channel │───┼──► UI/前端
│ │ (文本流、工具执行进度) │ │ 实时展示
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ Control Channel │───┼──► 审批服务
│ │ (工具审批、人机交互) │ │ 安全网关
│ └─────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────┐ │
│ │ Monitor Channel │───┼──► 监控系统
│ │ (错误、审计、性能指标) │ │ 日志平台
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
| 通道 | 用途 | 典型订阅者 | 事件频率 | 关键性 |
|---|---|---|---|---|
| Progress | 实时进度展示 | 前端UI、聊天界面 | 高(毫秒级) | UI体验 |
| Control | 审批决策 | 审批服务、安全网关 | 低(按需) | 安全控制 |
| Monitor | 监控审计 | 监控系统、日志平台 | 中(事件级) | 可观测性 |
Progress通道用于实时展示Agent的执行进度,是用户界面的主要数据源。
流式文本输出:
type ProgressTextChunkEvent struct {
Delta string // 增量文本
}
使用场景:实时显示AI回复
eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
for envelope := range eventCh {
if e, ok := envelope.Event.(*types.ProgressTextChunkEvent); ok {
fmt.Print(e.Delta) // 流式输出
}
}
工具调用开始:
type ProgressToolStartEvent struct {
Call ToolCall // 工具调用信息
}
type ToolCall struct {
ID string // 调用ID
Name string // 工具名称
Input map[string]interface{} // 输入参数
}
使用场景:显示"正在执行XXX工具"
case *types.ProgressToolStartEvent:
fmt.Printf("\n[🔧 Tool] %s\n", e.Call.Name)
if desc, ok := toolDescriptions[e.Call.Name]; ok {
fmt.Printf(" %s\n", desc)
}
工具调用结束:
type ProgressToolEndEvent struct {
CallID string // 调用ID
Result map[string]interface{} // 执行结果
Error error // 错误(如有)
}
使用场景:显示工具执行结果
case *types.ProgressToolEndEvent:
if e.Error != nil {
fmt.Printf(" ❌ 失败: %v\n", e.Error)
} else {
fmt.Printf(" ✅ 完成\n")
}
对话完成:
type ProgressCompleteEvent struct {
FinalText string // 最终文本
StopReason string // 停止原因
}
停止原因:
"end_turn": 正常结束"max_tokens": 达到最大token数"stop_sequence": 匹配停止序列"tool_use": 需要工具执行(内部使用)package main
import (
"fmt"
"github.com/astercloud/aster/pkg/agent"
"github.com/astercloud/aster/pkg/types"
)
func subscribeProgress(ag *agent.Agent) {
eventCh := ag.Subscribe([]types.AgentChannel{
types.ChannelProgress,
}, nil)
for envelope := range eventCh {
switch e := envelope.Event.(type) {
case *types.ProgressTextChunkEvent:
// 流式输出文本
fmt.Print(e.Delta)
case *types.ProgressToolStartEvent:
// 工具开始
fmt.Printf("\n\n🔧 [Tool] %s\n", e.Call.Name)
fmt.Printf(" Input: %v\n", e.Call.Input)
case *types.ProgressToolEndEvent:
// 工具结束
if e.Error != nil {
fmt.Printf(" ❌ Error: %v\n", e.Error)
} else {
fmt.Printf(" ✅ Success\n")
}
case *types.ProgressCompleteEvent:
// 对话完成
fmt.Printf("\n\n✅ 完成 (Reason: %s)\n", e.StopReason)
}
}
}
Control通道用于人机交互和审批流程,实现安全控制。
工具审批请求:
type ControlToolApprovalRequest struct {
ToolCall ToolCall // 待审批的工具调用
Reason string // 需要审批的原因
}
使用场景:敏感操作审批
eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelControl}, nil)
for envelope := range eventCh {
if e, ok := envelope.Event.(*types.ControlToolApprovalRequest); ok {
// 显示审批请求
fmt.Printf("⚠️ 需要审批: %s\n", e.ToolCall.Name)
fmt.Printf(" 原因: %s\n", e.Reason)
// 获取用户确认
fmt.Print(" 是否允许? (y/n): ")
var answer string
fmt.Scanln(&answer)
// 响应审批
if answer == "y" {
ag.ApproveToolCall(envelope.ID, true)
} else {
ag.ApproveToolCall(envelope.ID, false)
}
}
}
请求用户输入:
type ControlUserInputRequest struct {
Prompt string // 提示信息
Options []string // 可选项(如有)
}
使用场景:需要用户提供额外信息
case *types.ControlUserInputRequest:
fmt.Printf("💭 Agent请求输入: %s\n", e.Prompt)
if len(e.Options) > 0 {
for i, opt := range e.Options {
fmt.Printf(" %d. %s\n", i+1, opt)
}
}
// 获取输入
var input string
fmt.Scanln(&input)
// 发送响应
ag.RespondToInputRequest(envelope.ID, input)
// 配置工具权限
config := &types.AgentConfig{
ToolPermissions: map[string]types.PermissionPolicy{
"Write": types.PermissionAsk, // 需要审批
"fs_delete": types.PermissionDeny, // 拒绝
"Read": types.PermissionAllow, // 允许
},
}
权限策略:
PermissionAllow: 自动允许PermissionAsk: 需要审批PermissionDeny: 自动拒绝Monitor通道用于监控、审计和性能分析。
错误事件:
type MonitorErrorEvent struct {
Error error // 错误对象
Context string // 错误上下文
AgentID string // Agent ID
}
使用场景:错误监控告警
case *types.MonitorErrorEvent:
log.Printf("[ERROR] Agent %s: %v (Context: %s)",
e.AgentID, e.Error, e.Context)
// 发送告警
alerting.SendAlert("AgentError", e.Error.Error())
工具执行审计:
type MonitorToolExecutionEvent struct {
ToolName string // 工具名称
Duration time.Duration // 执行时长
Success bool // 是否成功
InputSize int // 输入大小
OutputSize int // 输出大小
}
使用场景:性能监控和审计日志
case *types.MonitorToolExecutionEvent:
log.Printf("[AUDIT] Tool=%s Duration=%v Success=%v",
e.ToolName, e.Duration, e.Success)
// 记录到审计数据库
auditDB.LogToolExecution(e)
// 性能指标
metrics.RecordToolDuration(e.ToolName, e.Duration)
模型调用监控:
type MonitorModelCallEvent struct {
Model string // 模型名称
InputTokens int // 输入token数
OutputTokens int // 输出token数
Duration time.Duration // 调用时长
Cost float64 // 成本(美元)
}
使用场景:成本和性能监控
case *types.MonitorModelCallEvent:
// 记录token使用
metrics.RecordTokenUsage(e.Model, e.InputTokens, e.OutputTokens)
// 成本统计
totalCost += e.Cost
fmt.Printf("本次调用成本: $%.4f (累计: $%.4f)\n", e.Cost, totalCost)
// 性能监控
if e.Duration > 30*time.Second {
log.Printf("[WARN] 模型调用耗时过长: %v", e.Duration)
}
func setupMonitoring(ag *agent.Agent) {
monitorCh := ag.Subscribe([]types.AgentChannel{
types.ChannelMonitor,
}, nil)
go func() {
var stats struct {
TotalCost float64
ToolCalls int
ToolErrors int
AvgDuration time.Duration
}
for envelope := range monitorCh {
switch e := envelope.Event.(type) {
case *types.MonitorErrorEvent:
log.Printf("❌ Error: %v", e.Error)
stats.ToolErrors++
case *types.MonitorToolExecutionEvent:
stats.ToolCalls++
stats.AvgDuration = (stats.AvgDuration + e.Duration) / 2
case *types.MonitorModelCallEvent:
stats.TotalCost += e.Cost
log.Printf("💰 Cost: $%.4f (Total: $%.4f)",
e.Cost, stats.TotalCost)
}
}
}()
}
// 订阅多个通道
eventCh := ag.Subscribe([]types.AgentChannel{
types.ChannelProgress,
types.ChannelMonitor,
}, nil)
for envelope := range eventCh {
// 处理来自两个通道的事件
}
// 只订阅特定事件类型
eventCh := ag.Subscribe(
[]types.AgentChannel{types.ChannelProgress},
&types.EventFilter{
EventTypes: []string{
"progress.text_chunk",
"progress.complete",
},
},
)
// 订阅者1:UI展示
uiCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
go handleUI(uiCh)
// 订阅者2:日志记录
logCh := ag.Subscribe([]types.AgentChannel{types.ChannelMonitor}, nil)
go handleLogs(logCh)
// 订阅者3:审批处理
controlCh := ag.Subscribe([]types.AgentChannel{types.ChannelControl}, nil)
go handleApprovals(controlCh)
// ✅ 推荐:每个通道一个处理器
go handleProgress(ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil))
go handleControl(ag.Subscribe([]types.AgentChannel{types.ChannelControl}, nil))
go handleMonitor(ag.Subscribe([]types.AgentChannel{types.ChannelMonitor}, nil))
// ❌ 不推荐:混在一起处理
eventCh := ag.Subscribe(allChannels, nil)
// 一个goroutine处理所有类型的事件
// ✅ 推荐:异步处理事件
go func() {
for event := range eventCh {
processEvent(event)
}
}()
// ❌ 不推荐:同步阻塞主线程
for event := range eventCh {
processEvent(event) // 阻塞主线程
}
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("事件处理panic: %v", r)
}
}()
for event := range eventCh {
if err := processEvent(event); err != nil {
log.Printf("处理事件失败: %v", err)
continue // 继续处理下一个事件
}
}
}()
eventCh := ag.Subscribe(channels, nil)
defer func() {
// 关闭订阅
ag.Unsubscribe(eventCh)
}()
// 使用带缓冲的channel
eventCh := ag.Subscribe(channels, &types.SubscribeOptions{
BufferSize: 100, // 缓冲100个事件
})
// 或使用非阻塞读取
select {
case event := <-eventCh:
processEvent(event)
case <-time.After(100 * time.Millisecond):
// 超时,跳过
}
| 事件类型 | 典型频率 | 处理时间要求 |
|---|---|---|
| ProgressTextChunk | 10-50/秒 | <10ms |
| ProgressToolStart/End | 1-5/对话 | <100ms |
| MonitorEvent | 按需 | <50ms |
| ControlEvent | 罕见 | 人工决策 |
// 批量处理文本chunk
var buffer strings.Builder
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case event := <-eventCh:
if e, ok := event.(*types.ProgressTextChunkEvent); ok {
buffer.WriteString(e.Delta)
}
case <-ticker.C:
if buffer.Len() > 0 {
displayText(buffer.String())
buffer.Reset()
}
}
}