核心概念

中间件系统

深入理解洋葱模型中间件架构

中间件系统

aster的中间件系统采用洋葱模型(Onion Model)架构,实现了高度可扩展的功能组合机制。

🧅 洋葱模型

核心思想

Request → M1 → M2 → M3 → Handler → M3 → M2 → M1 → Response
          ↓    ↓    ↓              ↑    ↑    ↑
      Before Before Before       After After After

每个请求和响应都会依次通过多层中间件:

  • 请求阶段:从外到内(优先级从低到高)
  • 响应阶段:从内到外(优先级从高到低)

为什么是洋葱模型?

优势

  1. 清晰的层次:功能按优先级分层
  2. 双向拦截:可以在前后都处理
  3. 易于扩展:添加中间件不影响现有代码
  4. 可组合性:中间件可以自由组合

🔧 Middleware接口

完整接口定义

type Middleware interface {
    // 基础信息
    Name() string       // 中间件名称
    Priority() int      // 优先级(数值越小越靠近核心)

    // 工具提供
    Tools() []tools.Tool  // 此中间件提供的工具

    // 双向拦截
    WrapModelCall(ctx context.Context, req *ModelRequest,
                  next ModelCallHandler) (*ModelResponse, error)

    WrapToolCall(ctx context.Context, req *ToolCallRequest,
                 next ToolCallHandler) (*ToolCallResponse, error)

    // 生命周期钩子
    OnAgentStart(ctx context.Context, agentID string) error
    OnAgentStop(ctx context.Context, agentID string) error
}

优先级规范

const (
    // 0-100: 系统级中间件
    PrioritySystem = 0

    // 100-500: 功能级中间件
    PrioritySummarization = 40   // 总结中间件
    PriorityFilesystem    = 100  // 文件系统
    PrioritySubAgent      = 200  // 子Agent

    // 500-1000: 业务级中间件
    PriorityBusiness = 500
    PriorityCustom   = 1000
)

规则

  • 数值越小,优先级越高,越靠近核心
  • 在请求阶段,优先级高的先执行
  • 在响应阶段,优先级高的后执行

📦 内置中间件

1. SummarizationMiddleware

功能:自动管理上下文长度

优先级:40(非常接近核心)

工作原理

┌─────────────────────────────────────┐
│  监控消息历史 token 数               │
│         │                           │
│         ▼                           │
│  超过阈值? (170k tokens)             │
│    No ─┐  │ Yes                     │
│        │  ▼                         │
│        │ 1. 保留最近6条消息          │
│        │ 2. 总结更早的消息           │
│        │ 3. 替换旧历史               │
│        │  │                         │
│        └──┘                         │
└─────────────────────────────────────┘

配置示例

// 使用默认配置
config := &types.AgentConfig{
    Middlewares: []string{"summarization"},
}

// 自定义配置
summaryMiddleware := middleware.NewSummarizationMiddleware(&middleware.SummarizationConfig{
    TokenThreshold:  150000,  // 150k tokens触发
    KeepRecentCount: 10,      // 保留最近10条
})

适用场景

  • 长时间对话
  • 需要保持上下文但又担心token超限
  • 自动维护对话历史

2. FilesystemMiddleware

功能:提供文件系统操作工具

优先级:100

提供的工具

  • Read - 读取文件
  • Write - 写入文件
  • Edit - 编辑文件
  • Ls - 列出目录
  • Glob - Glob匹配
  • Grep - 正则搜索

特性

  1. 自动结果驱逐
// 当工具结果超过20k tokens时自动驱逐
config := &middleware.FilesystemMiddlewareConfig{
    Backend:        backend,
    EnableEviction: true,
    TokenLimit:     20000,
}
  1. Backend抽象
// 支持多种存储后端
fsMiddleware := middleware.NewFilesystemMiddleware(&middleware.FilesystemMiddlewareConfig{
    Backend: backends.NewCompositeBackend(
        backends.NewStateBackend(),
        []backends.RouteConfig{
            {Prefix: "/workspace/", Backend: realFS},
        },
    ),
})

使用示例

// 启用文件系统中间件
config := &types.AgentConfig{
    Middlewares: []string{"filesystem"},
}

// Agent现在可以使用fs_*工具
ag.Chat(ctx, "读取README.md文件")

3. AgentMemoryMiddleware

功能:基于普通文件+搜索的长期记忆中间件。

核心特点

  • 从后端读取 /agent.md,将其作为 <agent_memory>...</agent_memory> 注入 System Prompt,提供“人格/长期指令”。
  • 基于 backends.BackendProtocol 在指定 MemoryPath 下管理记忆文件(默认为 /memories/)。
  • 自动注入 memory_search / memory_write 工具,配合 fs_* 工具构成纯文本的长期记忆系统。

典型用法

// Backend 示例: /memories/ 映射到本地目录
memoryBackend := backends.NewCompositeBackend(
    backends.NewStateBackend(),
    []backends.RouteConfig{
        {
            Prefix:  "/memories/",
            Backend: backends.NewLocalBackend("./memories"),
        },
    },
)

memoryMW, _ := middleware.NewAgentMemoryMiddleware(&middleware.AgentMemoryMiddlewareConfig{
    Backend:    memoryBackend,
    MemoryPath: "/memories/",
})

提示:在实际 AgentConfig 中,通常会同时启用 filesystem 中间件,让 Agent 既能操作项目文件,又能直接查看 /memories/ 中的记忆文件。

4. SubAgentMiddleware

功能:任务委派到子Agent

优先级:200

提供的工具

  • task - 委派任务到子Agent

工作流程

Main Agent
    │
    ▼ task("researcher", "研究Go性能优化")
┌─────────────────────┐
│  SubAgentMiddleware │
└──────────┬──────────┘
           │
           ▼ 创建子Agent
┌─────────────────────┐
│  Researcher Agent   │  独立上下文
│  (隔离执行)         │
└──────────┬──────────┘
           │
           ▼ 返回结果
Main Agent继续

配置示例

// 定义子Agent规格
specs := []middleware.SubAgentSpec{
    {
        Name:        "researcher",
        Description: "深度研究和分析专家",
        Prompt:      "你是一个研究专家,提供详细的分析。",
    },
    {
        Name:        "coder",
        Description: "代码编写专家",
        Prompt:      "你是专业程序员,编写清晰的代码。",
    },
}

// 创建工厂函数
factory := func(ctx context.Context, spec middleware.SubAgentSpec) (middleware.SubAgent, error) {
    // 创建实际的Agent实例
    return agent.Create(ctx, &types.AgentConfig{
        SystemPrompt: spec.Prompt,
        Model:        "claude-sonnet-4-5",
    }, deps)
}

// 创建中间件
subagentMiddleware := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
    Specs:          specs,
    Factory:        factory,
    EnableParallel: true,  // 允许并发执行
})

使用示例

// Main Agent委派任务
ag.Chat(ctx, `
使用researcher子Agent研究"Go并发模型的优势",
然后基于研究结果编写代码示例。
`)

🏗️ 创建自定义中间件

基础中间件

package mymiddleware

import (
    "context"
    "github.com/astercloud/aster/pkg/middleware"
    "github.com/astercloud/aster/pkg/tools"
)

type LoggingMiddleware struct {
    *middleware.BaseMiddleware
}

func NewLoggingMiddleware() *LoggingMiddleware {
    return &LoggingMiddleware{
        BaseMiddleware: middleware.NewBaseMiddleware(
            "logging",  // 名称
            300,        // 优先级
        ),
    }
}

// 提供工具(可选)
func (m *LoggingMiddleware) Tools() []tools.Tool {
    return nil  // 不提供工具
}

// 拦截模型调用
func (m *LoggingMiddleware) WrapModelCall(
    ctx context.Context,
    req *types.ModelRequest,
    next middleware.ModelCallHandler,
) (*types.ModelResponse, error) {
    // 前置处理
    log.Printf("[Before ModelCall] Messages: %d", len(req.Messages))
    start := time.Now()

    // 调用下一层
    resp, err := next(ctx, req)

    // 后置处理
    duration := time.Since(start)
    log.Printf("[After ModelCall] Duration: %v, Error: %v", duration, err)

    return resp, err
}

// 拦截工具调用
func (m *LoggingMiddleware) WrapToolCall(
    ctx context.Context,
    req *types.ToolCallRequest,
    next middleware.ToolCallHandler,
) (*types.ToolCallResponse, error) {
    // 前置处理
    log.Printf("[Before ToolCall] Tool: %s, Input: %v",
        req.ToolName, req.Input)

    // 调用下一层
    resp, err := next(ctx, req)

    // 后置处理
    log.Printf("[After ToolCall] Tool: %s, Success: %v",
        req.ToolName, err == nil)

    return resp, err
}

// 生命周期钩子
func (m *LoggingMiddleware) OnAgentStart(ctx context.Context, agentID string) error {
    log.Printf("[Agent Start] %s", agentID)
    return nil
}

func (m *LoggingMiddleware) OnAgentStop(ctx context.Context, agentID string) error {
    log.Printf("[Agent Stop] %s", agentID)
    return nil
}

带工具的中间件

type CachingMiddleware struct {
    *middleware.BaseMiddleware
    cache map[string]interface{}
}

func NewCachingMiddleware() *CachingMiddleware {
    return &CachingMiddleware{
        BaseMiddleware: middleware.NewBaseMiddleware("caching", 250),
        cache:          make(map[string]interface{}),
    }
}

// 提供缓存工具
func (m *CachingMiddleware) Tools() []tools.Tool {
    return []tools.Tool{
        &CacheGetTool{cache: m.cache},
        &CacheSetTool{cache: m.cache},
        &CacheClearTool{cache: m.cache},
    }
}

// 实现工具
type CacheGetTool struct {
    cache map[string]interface{}
}

func (t *CacheGetTool) Name() string { return "cache_get" }

func (t *CacheGetTool) Description() string {
    return "从缓存中获取值"
}

func (t *CacheGetTool) InputSchema() map[string]interface{} {
    return map[string]interface{}{
        "type": "object",
        "properties": map[string]interface{}{
            "key": map[string]string{"type": "string"},
        },
        "required": []string{"key"},
    }
}

func (t *CacheGetTool) Execute(ctx context.Context, input map[string]interface{}) (interface{}, error) {
    key := input["key"].(string)
    if value, ok := t.cache[key]; ok {
        return map[string]interface{}{
            "ok":    true,
            "value": value,
        }, nil
    }
    return map[string]interface{}{
        "ok":    false,
        "error": "key not found",
    }, nil
}

📚 中间件栈管理

创建栈

// 创建中间件实例
fsMiddleware := middleware.NewFilesystemMiddleware(fsConfig)
subagentMiddleware := middleware.NewSubAgentMiddleware(subagentConfig)
loggingMiddleware := NewLoggingMiddleware()

// 创建栈(自动按优先级排序)
stack := middleware.NewStack([]middleware.Middleware{
    loggingMiddleware,    // priority: 300
    fsMiddleware,         // priority: 100
    subagentMiddleware,   // priority: 200
})

// 实际顺序:fs(100) → subagent(200) → logging(300)

获取工具

// 收集所有中间件提供的工具
tools := stack.Tools()
fmt.Printf("可用工具: %d个\n", len(tools))

for _, tool := range tools {
    fmt.Printf("- %s: %s\n", tool.Name(), tool.Description())
}

执行调用

// 模型调用经过整个栈
response, err := stack.WrapModelCall(ctx, request, func(ctx context.Context, req *types.ModelRequest) (*types.ModelResponse, error) {
    // 最终的处理函数
    return provider.StreamMessages(ctx, req)
})

// 工具调用经过整个栈
toolResponse, err := stack.WrapToolCall(ctx, toolRequest, func(ctx context.Context, req *types.ToolCallRequest) (*types.ToolCallResponse, error) {
    // 最终的工具执行
    return executor.Execute(ctx, req)
})

🎯 使用场景

场景1:性能监控中间件

type MetricsMiddleware struct {
    *middleware.BaseMiddleware
    metrics *prometheus.Registry
}

func (m *MetricsMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    start := time.Now()
    resp, err := next(ctx, req)

    // 记录指标
    m.metrics.RecordLatency("model_call", time.Since(start))
    m.metrics.IncrementCounter("model_calls_total")
    if err != nil {
        m.metrics.IncrementCounter("model_calls_errors")
    }

    return resp, err
}

场景2:安全审计中间件

type AuditMiddleware struct {
    *middleware.BaseMiddleware
    auditLog AuditLogger
}

func (m *AuditMiddleware) WrapToolCall(ctx, req, next) (*types.ToolCallResponse, error) {
    // 记录审计日志
    m.auditLog.Log(&AuditEntry{
        Timestamp: time.Now(),
        AgentID:   req.AgentID,
        ToolName:  req.ToolName,
        Input:     req.Input,
    })

    resp, err := next(ctx, req)

    // 记录结果
    m.auditLog.Log(&AuditEntry{
        Type:    "tool_result",
        Success: err == nil,
        Output:  resp.Result,
    })

    return resp, err
}

场景3:重试中间件

type RetryMiddleware struct {
    *middleware.BaseMiddleware
    maxRetries int
}

func (m *RetryMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    var resp *types.ModelResponse
    var err error

    for i := 0; i < m.maxRetries; i++ {
        resp, err = next(ctx, req)
        if err == nil {
            return resp, nil
        }

        // 指数退避
        time.Sleep(time.Duration(1<<uint(i)) * time.Second)
    }

    return nil, fmt.Errorf("failed after %d retries: %w", m.maxRetries, err)
}

场景4:请求转换中间件

type TransformMiddleware struct {
    *middleware.BaseMiddleware
}

func (m *TransformMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    // 请求转换
    newReq := m.transformRequest(req)

    // 调用下一层
    resp, err := next(ctx, newReq)

    // 响应转换
    return m.transformResponse(resp), err
}

func (m *TransformMiddleware) transformRequest(req *types.ModelRequest) *types.ModelRequest {
    // 例如:添加额外的系统消息
    newMessages := append([]types.Message{
        {Role: "system", Content: "额外的指令"},
    }, req.Messages...)

    return &types.ModelRequest{
        Messages: newMessages,
        Model:    req.Model,
    }
}

🎨 高级模式

1. 中间件组合

// 组合多个中间件实现复杂功能
type CompositeMiddleware struct {
    middlewares []middleware.Middleware
}

func (c *CompositeMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    // 链式调用
    handler := next
    for i := len(c.middlewares) - 1; i >= 0; i-- {
        m := c.middlewares[i]
        prevHandler := handler
        handler = func(ctx context.Context, r *types.ModelRequest) (*types.ModelResponse, error) {
            return m.WrapModelCall(ctx, r, prevHandler)
        }
    }
    return handler(ctx, req)
}

2. 条件中间件

type ConditionalMiddleware struct {
    *middleware.BaseMiddleware
    condition func(*types.ModelRequest) bool
    wrapped   middleware.Middleware
}

func (c *ConditionalMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    if c.condition(req) {
        // 条件满足,使用wrapped中间件
        return c.wrapped.WrapModelCall(ctx, req, next)
    }
    // 否则直接pass through
    return next(ctx, req)
}

3. 状态共享中间件

type StatefulMiddleware struct {
    *middleware.BaseMiddleware
    state sync.Map  // 线程安全的状态存储
}

func (s *StatefulMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    // 读取状态
    callCount, _ := s.state.LoadOrStore("call_count", 0)

    // 更新状态
    s.state.Store("call_count", callCount.(int)+1)

    return next(ctx, req)
}

🎯 最佳实践

1. 单一职责

// ✅ 推荐:每个中间件只做一件事
type LoggingMiddleware struct { ... }    // 只负责日志
type MetricsMiddleware struct { ... }    // 只负责指标
type CachingMiddleware struct { ... }    // 只负责缓存

// ❌ 不推荐:一个中间件做太多事
type EverythingMiddleware struct { ... }  // 日志+指标+缓存+...

2. 合理设置优先级

// 按照功能特性设置优先级
SummarizationMiddleware  // 40  - 影响消息历史
FilesystemMiddleware     // 100 - 提供基础工具
SubAgentMiddleware       // 200 - 依赖基础工具
LoggingMiddleware        // 300 - 外层监控
AuthMiddleware           // 10  - 最优先(安全检查)

3. 错误处理

func (m *MyMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("中间件panic: %v", r)
        }
    }()

    resp, err := next(ctx, req)
    if err != nil {
        // 决定是否传播错误
        log.Printf("下游错误: %v", err)
        return nil, err  // 或者尝试恢复
    }

    return resp, nil
}

4. 性能考虑

// ✅ 推荐:快速路径
func (m *MyMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    if !m.enabled {
        return next(ctx, req)  // 快速跳过
    }

    // 实际处理
    return m.process(ctx, req, next)
}

// ❌ 不推荐:总是执行昂贵操作
func (m *MyMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
    m.expensiveOperation()  // 即使不需要也执行
    return next(ctx, req)
}

📚 下一步

🔗 相关资源