aster的中间件系统采用洋葱模型(Onion Model)架构,实现了高度可扩展的功能组合机制。
Request → M1 → M2 → M3 → Handler → M3 → M2 → M1 → Response
↓ ↓ ↓ ↑ ↑ ↑
Before Before Before After After After
每个请求和响应都会依次通过多层中间件:
优势:
type Middleware interface {
// 基础信息
Name() string // 中间件名称
Priority() int // 优先级(数值越小越靠近核心)
// 工具提供
Tools() []tools.Tool // 此中间件提供的工具
// 双向拦截
WrapModelCall(ctx context.Context, req *ModelRequest,
next ModelCallHandler) (*ModelResponse, error)
WrapToolCall(ctx context.Context, req *ToolCallRequest,
next ToolCallHandler) (*ToolCallResponse, error)
// 生命周期钩子
OnAgentStart(ctx context.Context, agentID string) error
OnAgentStop(ctx context.Context, agentID string) error
}
const (
// 0-100: 系统级中间件
PrioritySystem = 0
// 100-500: 功能级中间件
PrioritySummarization = 40 // 总结中间件
PriorityFilesystem = 100 // 文件系统
PrioritySubAgent = 200 // 子Agent
// 500-1000: 业务级中间件
PriorityBusiness = 500
PriorityCustom = 1000
)
规则:
功能:自动管理上下文长度
优先级:40(非常接近核心)
工作原理:
┌─────────────────────────────────────┐
│ 监控消息历史 token 数 │
│ │ │
│ ▼ │
│ 超过阈值? (170k tokens) │
│ No ─┐ │ Yes │
│ │ ▼ │
│ │ 1. 保留最近6条消息 │
│ │ 2. 总结更早的消息 │
│ │ 3. 替换旧历史 │
│ │ │ │
│ └──┘ │
└─────────────────────────────────────┘
配置示例:
// 使用默认配置
config := &types.AgentConfig{
Middlewares: []string{"summarization"},
}
// 自定义配置
summaryMiddleware := middleware.NewSummarizationMiddleware(&middleware.SummarizationConfig{
TokenThreshold: 150000, // 150k tokens触发
KeepRecentCount: 10, // 保留最近10条
})
适用场景:
功能:提供文件系统操作工具
优先级:100
提供的工具:
Read - 读取文件Write - 写入文件Edit - 编辑文件Ls - 列出目录Glob - Glob匹配Grep - 正则搜索特性:
// 当工具结果超过20k tokens时自动驱逐
config := &middleware.FilesystemMiddlewareConfig{
Backend: backend,
EnableEviction: true,
TokenLimit: 20000,
}
// 支持多种存储后端
fsMiddleware := middleware.NewFilesystemMiddleware(&middleware.FilesystemMiddlewareConfig{
Backend: backends.NewCompositeBackend(
backends.NewStateBackend(),
[]backends.RouteConfig{
{Prefix: "/workspace/", Backend: realFS},
},
),
})
使用示例:
// 启用文件系统中间件
config := &types.AgentConfig{
Middlewares: []string{"filesystem"},
}
// Agent现在可以使用fs_*工具
ag.Chat(ctx, "读取README.md文件")
功能:基于普通文件+搜索的长期记忆中间件。
核心特点:
/agent.md,将其作为 <agent_memory>...</agent_memory> 注入 System Prompt,提供“人格/长期指令”。backends.BackendProtocol 在指定 MemoryPath 下管理记忆文件(默认为 /memories/)。memory_search / memory_write 工具,配合 fs_* 工具构成纯文本的长期记忆系统。典型用法:
// Backend 示例: /memories/ 映射到本地目录
memoryBackend := backends.NewCompositeBackend(
backends.NewStateBackend(),
[]backends.RouteConfig{
{
Prefix: "/memories/",
Backend: backends.NewLocalBackend("./memories"),
},
},
)
memoryMW, _ := middleware.NewAgentMemoryMiddleware(&middleware.AgentMemoryMiddlewareConfig{
Backend: memoryBackend,
MemoryPath: "/memories/",
})
提示:在实际 AgentConfig 中,通常会同时启用
filesystem中间件,让 Agent 既能操作项目文件,又能直接查看/memories/中的记忆文件。
功能:任务委派到子Agent
优先级:200
提供的工具:
task - 委派任务到子Agent工作流程:
Main Agent
│
▼ task("researcher", "研究Go性能优化")
┌─────────────────────┐
│ SubAgentMiddleware │
└──────────┬──────────┘
│
▼ 创建子Agent
┌─────────────────────┐
│ Researcher Agent │ 独立上下文
│ (隔离执行) │
└──────────┬──────────┘
│
▼ 返回结果
Main Agent继续
配置示例:
// 定义子Agent规格
specs := []middleware.SubAgentSpec{
{
Name: "researcher",
Description: "深度研究和分析专家",
Prompt: "你是一个研究专家,提供详细的分析。",
},
{
Name: "coder",
Description: "代码编写专家",
Prompt: "你是专业程序员,编写清晰的代码。",
},
}
// 创建工厂函数
factory := func(ctx context.Context, spec middleware.SubAgentSpec) (middleware.SubAgent, error) {
// 创建实际的Agent实例
return agent.Create(ctx, &types.AgentConfig{
SystemPrompt: spec.Prompt,
Model: "claude-sonnet-4-5",
}, deps)
}
// 创建中间件
subagentMiddleware := middleware.NewSubAgentMiddleware(&middleware.SubAgentMiddlewareConfig{
Specs: specs,
Factory: factory,
EnableParallel: true, // 允许并发执行
})
使用示例:
// Main Agent委派任务
ag.Chat(ctx, `
使用researcher子Agent研究"Go并发模型的优势",
然后基于研究结果编写代码示例。
`)
package mymiddleware
import (
"context"
"github.com/astercloud/aster/pkg/middleware"
"github.com/astercloud/aster/pkg/tools"
)
type LoggingMiddleware struct {
*middleware.BaseMiddleware
}
func NewLoggingMiddleware() *LoggingMiddleware {
return &LoggingMiddleware{
BaseMiddleware: middleware.NewBaseMiddleware(
"logging", // 名称
300, // 优先级
),
}
}
// 提供工具(可选)
func (m *LoggingMiddleware) Tools() []tools.Tool {
return nil // 不提供工具
}
// 拦截模型调用
func (m *LoggingMiddleware) WrapModelCall(
ctx context.Context,
req *types.ModelRequest,
next middleware.ModelCallHandler,
) (*types.ModelResponse, error) {
// 前置处理
log.Printf("[Before ModelCall] Messages: %d", len(req.Messages))
start := time.Now()
// 调用下一层
resp, err := next(ctx, req)
// 后置处理
duration := time.Since(start)
log.Printf("[After ModelCall] Duration: %v, Error: %v", duration, err)
return resp, err
}
// 拦截工具调用
func (m *LoggingMiddleware) WrapToolCall(
ctx context.Context,
req *types.ToolCallRequest,
next middleware.ToolCallHandler,
) (*types.ToolCallResponse, error) {
// 前置处理
log.Printf("[Before ToolCall] Tool: %s, Input: %v",
req.ToolName, req.Input)
// 调用下一层
resp, err := next(ctx, req)
// 后置处理
log.Printf("[After ToolCall] Tool: %s, Success: %v",
req.ToolName, err == nil)
return resp, err
}
// 生命周期钩子
func (m *LoggingMiddleware) OnAgentStart(ctx context.Context, agentID string) error {
log.Printf("[Agent Start] %s", agentID)
return nil
}
func (m *LoggingMiddleware) OnAgentStop(ctx context.Context, agentID string) error {
log.Printf("[Agent Stop] %s", agentID)
return nil
}
type CachingMiddleware struct {
*middleware.BaseMiddleware
cache map[string]interface{}
}
func NewCachingMiddleware() *CachingMiddleware {
return &CachingMiddleware{
BaseMiddleware: middleware.NewBaseMiddleware("caching", 250),
cache: make(map[string]interface{}),
}
}
// 提供缓存工具
func (m *CachingMiddleware) Tools() []tools.Tool {
return []tools.Tool{
&CacheGetTool{cache: m.cache},
&CacheSetTool{cache: m.cache},
&CacheClearTool{cache: m.cache},
}
}
// 实现工具
type CacheGetTool struct {
cache map[string]interface{}
}
func (t *CacheGetTool) Name() string { return "cache_get" }
func (t *CacheGetTool) Description() string {
return "从缓存中获取值"
}
func (t *CacheGetTool) InputSchema() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"key": map[string]string{"type": "string"},
},
"required": []string{"key"},
}
}
func (t *CacheGetTool) Execute(ctx context.Context, input map[string]interface{}) (interface{}, error) {
key := input["key"].(string)
if value, ok := t.cache[key]; ok {
return map[string]interface{}{
"ok": true,
"value": value,
}, nil
}
return map[string]interface{}{
"ok": false,
"error": "key not found",
}, nil
}
// 创建中间件实例
fsMiddleware := middleware.NewFilesystemMiddleware(fsConfig)
subagentMiddleware := middleware.NewSubAgentMiddleware(subagentConfig)
loggingMiddleware := NewLoggingMiddleware()
// 创建栈(自动按优先级排序)
stack := middleware.NewStack([]middleware.Middleware{
loggingMiddleware, // priority: 300
fsMiddleware, // priority: 100
subagentMiddleware, // priority: 200
})
// 实际顺序:fs(100) → subagent(200) → logging(300)
// 收集所有中间件提供的工具
tools := stack.Tools()
fmt.Printf("可用工具: %d个\n", len(tools))
for _, tool := range tools {
fmt.Printf("- %s: %s\n", tool.Name(), tool.Description())
}
// 模型调用经过整个栈
response, err := stack.WrapModelCall(ctx, request, func(ctx context.Context, req *types.ModelRequest) (*types.ModelResponse, error) {
// 最终的处理函数
return provider.StreamMessages(ctx, req)
})
// 工具调用经过整个栈
toolResponse, err := stack.WrapToolCall(ctx, toolRequest, func(ctx context.Context, req *types.ToolCallRequest) (*types.ToolCallResponse, error) {
// 最终的工具执行
return executor.Execute(ctx, req)
})
type MetricsMiddleware struct {
*middleware.BaseMiddleware
metrics *prometheus.Registry
}
func (m *MetricsMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
start := time.Now()
resp, err := next(ctx, req)
// 记录指标
m.metrics.RecordLatency("model_call", time.Since(start))
m.metrics.IncrementCounter("model_calls_total")
if err != nil {
m.metrics.IncrementCounter("model_calls_errors")
}
return resp, err
}
type AuditMiddleware struct {
*middleware.BaseMiddleware
auditLog AuditLogger
}
func (m *AuditMiddleware) WrapToolCall(ctx, req, next) (*types.ToolCallResponse, error) {
// 记录审计日志
m.auditLog.Log(&AuditEntry{
Timestamp: time.Now(),
AgentID: req.AgentID,
ToolName: req.ToolName,
Input: req.Input,
})
resp, err := next(ctx, req)
// 记录结果
m.auditLog.Log(&AuditEntry{
Type: "tool_result",
Success: err == nil,
Output: resp.Result,
})
return resp, err
}
type RetryMiddleware struct {
*middleware.BaseMiddleware
maxRetries int
}
func (m *RetryMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
var resp *types.ModelResponse
var err error
for i := 0; i < m.maxRetries; i++ {
resp, err = next(ctx, req)
if err == nil {
return resp, nil
}
// 指数退避
time.Sleep(time.Duration(1<<uint(i)) * time.Second)
}
return nil, fmt.Errorf("failed after %d retries: %w", m.maxRetries, err)
}
type TransformMiddleware struct {
*middleware.BaseMiddleware
}
func (m *TransformMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
// 请求转换
newReq := m.transformRequest(req)
// 调用下一层
resp, err := next(ctx, newReq)
// 响应转换
return m.transformResponse(resp), err
}
func (m *TransformMiddleware) transformRequest(req *types.ModelRequest) *types.ModelRequest {
// 例如:添加额外的系统消息
newMessages := append([]types.Message{
{Role: "system", Content: "额外的指令"},
}, req.Messages...)
return &types.ModelRequest{
Messages: newMessages,
Model: req.Model,
}
}
// 组合多个中间件实现复杂功能
type CompositeMiddleware struct {
middlewares []middleware.Middleware
}
func (c *CompositeMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
// 链式调用
handler := next
for i := len(c.middlewares) - 1; i >= 0; i-- {
m := c.middlewares[i]
prevHandler := handler
handler = func(ctx context.Context, r *types.ModelRequest) (*types.ModelResponse, error) {
return m.WrapModelCall(ctx, r, prevHandler)
}
}
return handler(ctx, req)
}
type ConditionalMiddleware struct {
*middleware.BaseMiddleware
condition func(*types.ModelRequest) bool
wrapped middleware.Middleware
}
func (c *ConditionalMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
if c.condition(req) {
// 条件满足,使用wrapped中间件
return c.wrapped.WrapModelCall(ctx, req, next)
}
// 否则直接pass through
return next(ctx, req)
}
type StatefulMiddleware struct {
*middleware.BaseMiddleware
state sync.Map // 线程安全的状态存储
}
func (s *StatefulMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
// 读取状态
callCount, _ := s.state.LoadOrStore("call_count", 0)
// 更新状态
s.state.Store("call_count", callCount.(int)+1)
return next(ctx, req)
}
// ✅ 推荐:每个中间件只做一件事
type LoggingMiddleware struct { ... } // 只负责日志
type MetricsMiddleware struct { ... } // 只负责指标
type CachingMiddleware struct { ... } // 只负责缓存
// ❌ 不推荐:一个中间件做太多事
type EverythingMiddleware struct { ... } // 日志+指标+缓存+...
// 按照功能特性设置优先级
SummarizationMiddleware // 40 - 影响消息历史
FilesystemMiddleware // 100 - 提供基础工具
SubAgentMiddleware // 200 - 依赖基础工具
LoggingMiddleware // 300 - 外层监控
AuthMiddleware // 10 - 最优先(安全检查)
func (m *MyMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
defer func() {
if r := recover(); r != nil {
log.Printf("中间件panic: %v", r)
}
}()
resp, err := next(ctx, req)
if err != nil {
// 决定是否传播错误
log.Printf("下游错误: %v", err)
return nil, err // 或者尝试恢复
}
return resp, nil
}
// ✅ 推荐:快速路径
func (m *MyMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
if !m.enabled {
return next(ctx, req) // 快速跳过
}
// 实际处理
return m.process(ctx, req, next)
}
// ❌ 不推荐:总是执行昂贵操作
func (m *MyMiddleware) WrapModelCall(ctx, req, next) (*types.ModelResponse, error) {
m.expensiveOperation() // 即使不需要也执行
return next(ctx, req)
}