Context Window Manager 是 aster 的核心组件,负责管理与 LLM 的对话上下文窗口。它提供自动 Token 计数、智能压缩和多种压缩策略,确保对话始终在 Token 预算内运行。
package main
import (
"context"
"fmt"
"github.com/astercloud/aster/pkg/context"
)
func main() {
// 创建 Token 计数器
tokenCounter := context.NewGPT4Counter()
// 创建压缩策略(滑动窗口)
strategy := context.NewSlidingWindowStrategy(10) // 保留 10 条消息
// 创建 Window Manager
config := context.DefaultWindowManagerConfig()
manager := context.NewContextWindowManager(config, tokenCounter, strategy)
// 添加消息
err := manager.AddMessage(context.Background(), context.Message{
Role: "user",
Content: "Hello, how are you?",
})
if err != nil {
panic(err)
}
// 获取统计信息
stats := manager.GetStats()
fmt.Printf("当前消息数: %d\n", stats.CurrentMessages)
fmt.Printf("Token 使用: %d (%.1f%%)\n",
stats.CurrentTokens, stats.UsagePercentage)
}
config := context.WindowManagerConfig{
Budget: context.TokenBudget{
MaxTokens: 128000, // GPT-4 Turbo 上下文窗口
ReservedTokens: 4096, // 预留给输出
WarningThreshold: 0.8, // 80% 时发出警告
},
AutoCompress: true,
CompressionThreshold: 0.85, // 85% 时自动压缩
MinMessagesToKeep: 3,
AlwaysKeepSystem: true,
AlwaysKeepRecent: 2,
}
aster 提供多种预配置的 Token 计数器:
// GPT 系列
gpt4Counter := context.NewGPT4Counter()
gpt4Counter := context.NewSimpleTokenCounter(context.GPT4Config)
gpt35Counter := context.NewSimpleTokenCounter(context.GPT35TurboConfig)
// Claude 系列
claudeCounter := context.NewClaudeCounter()
claudeCounter := context.NewSimpleTokenCounter(context.ClaudeOpusConfig)
// 自定义配置
customCounter := context.NewSimpleTokenCounter(context.ModelConfig{
Name: "custom-model",
CharsPerToken: 4.0,
BaseTokenOverhead: 3,
RoleTokenCost: 1,
})
// 1. 单个文本计数
count, err := counter.Count(ctx, "Hello, world!")
// 2. 批量计数
texts := []string{"Message 1", "Message 2", "Message 3"}
counts, err := counter.CountBatch(ctx, texts)
// 3. 消息列表计数
messages := []context.Message{
{Role: "system", Content: "You are helpful."},
{Role: "user", Content: "Hello"},
}
totalTokens, err := counter.EstimateMessages(ctx, messages)
// 创建多模型计数器
multi := context.NewMultiModelTokenCounter()
// 注册不同模型
multi.RegisterCounter("gpt-4", context.NewGPT4Counter())
multi.RegisterCounter("claude", context.NewClaudeCounter())
// 为特定模型计数
count, err := multi.CountForModel(ctx, "gpt-4-turbo", text)
// 自动模糊匹配
count, err := multi.CountForModel(ctx, "gpt-4-turbo-preview", text)
// 会匹配到 "gpt-4"
detailedCounter := context.NewDetailedTokenCounter(baseCounter)
estimate, err := detailedCounter.EstimateMessagesDetailed(ctx, messages)
fmt.Printf("总 Token: %d\n", estimate.TotalTokens)
fmt.Printf("内容 Token: %d\n", estimate.Breakdown["content"])
fmt.Printf("开销 Token: %d\n", estimate.Breakdown["overhead"])
for i, tokens := range estimate.MessageTokens {
fmt.Printf("消息 %d: %d tokens\n", i, tokens)
}
保留最近的 N 条消息,删除旧消息:
strategy := context.NewSlidingWindowStrategy(10)
// 特点:
// - 简单高效
// - 保证消息顺序
// - 适合一般对话场景
根据消息优先级智能保留重要消息:
calc := context.NewDefaultPriorityCalculator()
strategy := context.NewPriorityBasedStrategy(10, calc)
// 优先级计算考虑:
// - 最近度(越新越重要)
// - 角色(system > user > assistant)
// - 长度(较长消息可能包含更多信息)
自定义优先级计算器:
type MyPriorityCalculator struct{}
func (c *MyPriorityCalculator) CalculatePriority(
ctx context.Context,
msg context.Message,
position int,
totalMessages int,
) (context.MessagePriority, float64) {
// 自定义逻辑
if strings.Contains(msg.Content, "重要") {
return context.PriorityCritical, 1.0
}
return context.PriorityMedium, 0.5
}
strategy := context.NewPriorityBasedStrategy(10, &MyPriorityCalculator{})
从旧消息开始删除,直到满足 Token 预算:
strategy := context.NewTokenBasedStrategy(tokenCounter, 0.7) // 目标 70%
// 特点:
// - 精确控制 Token 使用
// - 自动平衡消息数量和 Token 预算
// - 适合严格 Token 限制的场景
结合多种策略的优点:
strategies := []context.CompressionStrategy{
context.NewSlidingWindowStrategy(10),
context.NewPriorityBasedStrategy(10, calc),
context.NewTokenBasedStrategy(counter, 0.7),
}
weights := []float64{0.3, 0.5, 0.2} // 各策略权重
strategy := context.NewHybridStrategy(strategies, weights)
// 特点:
// - 综合多种策略优势
// - 更智能的消息选择
// - 适合复杂场景
config := context.DefaultWindowManagerConfig()
config.AutoCompress = true
config.CompressionThreshold = 0.85 // 85% 触发压缩
manager := context.NewContextWindowManager(config, counter, strategy)
// 添加消息时自动检查并压缩
for _, msg := range messages {
_ = manager.AddMessage(ctx, msg)
// 当 Token 使用超过 85% 时自动触发压缩
}
// 手动触发压缩
err := manager.Compress(ctx)
if err != nil {
log.Printf("压缩失败: %v", err)
}
// 查看压缩历史
history := manager.GetCompressionHistory()
for _, event := range history {
fmt.Printf("压缩时间: %s\n", event.Timestamp)
fmt.Printf("消息: %d -> %d\n", event.BeforeMessages, event.AfterMessages)
fmt.Printf("Token: %d -> %d\n", event.BeforeTokens, event.AfterTokens)
fmt.Printf("压缩率: %.2f%%\n", event.CompressionRatio*100)
}
// 检查是否在预算内
if !manager.IsWithinBudget() {
log.Println("警告:超出 Token 预算!")
}
// 检查是否应该警告
if manager.ShouldWarn() {
usage := manager.GetUsagePercentage()
log.Printf("警告:Token 使用率达到 %.1f%%\n", usage)
}
// 获取详细统计
stats := manager.GetStats()
fmt.Printf("当前消息数: %d\n", stats.CurrentMessages)
fmt.Printf("累计消息数: %d\n", stats.TotalMessages)
fmt.Printf("当前 Token: %d\n", stats.CurrentTokens)
fmt.Printf("剩余 Token: %d\n", stats.RemainingTokens)
fmt.Printf("使用百分比: %.1f%%\n", stats.UsagePercentage)
fmt.Printf("压缩次数: %d\n", stats.CompressionCount)
calc := context.NewDefaultPriorityCalculator()
// 计算所有消息的优先级
messagesWithPriority := context.CalculateMessagePriorities(ctx, messages, calc)
// 按优先级排序
context.SortMessagesByPriority(messagesWithPriority, true) // 降序
// 过滤低优先级消息
highPriorityMessages := context.FilterMessagesByPriority(
messagesWithPriority,
context.PriorityHigh,
)
for _, msg := range highPriorityMessages {
fmt.Printf("[%d] %.2f - %s: %s\n",
msg.Priority, msg.Score, msg.Message.Role, msg.Message.Content)
}
// 清空消息(保留历史)
manager.Clear()
// 完全重置(包括历史)
manager.Reset()
SlidingWindowStrategyPriorityBasedStrategyTokenBasedStrategyHybridStrategy// 生产环境建议配置
config.Budget.MaxTokens = 128000 // 根据模型调整
config.Budget.ReservedTokens = 4096 // 为输出预留充足空间
config.Budget.WarningThreshold = 0.8 // 提前警告
config.CompressionThreshold = 0.85 // 留有缓冲空间
config.AlwaysKeepSystem = true // 始终保留 system prompt
config.AlwaysKeepRecent = 2 // 保留最近 2 条消息(上下文连贯)
config.MinMessagesToKeep = 3 // 最少保留 3 条消息
// 定期检查使用情况
ticker := time.NewTicker(1 * time.Minute)
go func() {
for range ticker.C {
stats := manager.GetStats()
if stats.ShouldWarn {
log.Printf("警告:Token 使用率 %.1f%%\n", stats.UsagePercentage)
}
}
}()
// 记录压缩事件
history := manager.GetCompressionHistory()
if len(history) > 0 {
last := history[len(history)-1]
log.Printf("最近压缩: 消息 %d->%d, Token %d->%d, 策略: %s\n",
last.BeforeMessages, last.AfterMessages,
last.BeforeTokens, last.AfterTokens,
last.Strategy)
}
type WindowManagerConfig struct {
Budget TokenBudget // Token 预算配置
AutoCompress bool // 是否自动压缩
CompressionThreshold float64 // 压缩触发阈值 (0.0-1.0)
MinMessagesToKeep int // 最少保留消息数
AlwaysKeepSystem bool // 始终保留 system 消息
AlwaysKeepRecent int // 始终保留最近 N 条消息
EnablePrioritization bool // 是否启用优先级
}
type TokenBudget struct {
MaxTokens int // 最大 Token 数
ReservedTokens int // 预留 Token 数
WarningThreshold float64 // 警告阈值 (0.0-1.0)
}
// 常用配置
DefaultTokenBudget() // GPT-4 Turbo 默认配置 (128K)
type ModelConfig struct {
Name string // 模型名称
CharsPerToken float64 // 平均字符/Token
BaseTokenOverhead int // 每条消息基础开销
RoleTokenCost int // Role 标签开销
}
// 预定义配置
GPT4Config, GPT4TurboConfig, GPT35TurboConfig
ClaudeConfig, ClaudeOpusConfig, ClaudeSonnetConfig
DefaultConfig
// Token 计数是轻量级的,但可以缓存结果
type CachedCounter struct {
base context.TokenCounter
cache map[string]int
mu sync.RWMutex
}
// 使用 AddMessages 代替多次 AddMessage
manager.AddMessages(ctx, messages) // 更高效
// 对于非关键路径,可以异步压缩
go func() {
if manager.ShouldWarn() {
_ = manager.Compress(context.Background())
}
}()
运行 Context Window Manager 测试:
go test ./pkg/context/... -v