Advanced Session Compression 提供 LLM 驱动的智能会话压缩功能,支持消息级、对话轮次级和会话级的多层次压缩,有效管理长会话的上下文窗口。
package main
import (
"context"
"github.com/astercloud/aster/pkg/memory"
agentext "github.com/astercloud/aster/pkg/context"
)
func main() {
// 创建 LLM 总结器配置
config := memory.DefaultLLMSummarizerConfig()
config.Model = "gpt-4o-mini"
config.Level = memory.CompressionLevelModerate
config.MaxSummaryWords = 300
// 创建总结器
summarizer := memory.NewLLMSummarizer(config)
// 准备会话消息
messages := []agentext.Message{
{Role: "user", Content: "What's the weather like today?"},
{Role: "assistant", Content: "I don't have access to real-time weather data..."},
{Role: "user", Content: "Can you tell me about quantum computing?"},
{Role: "assistant", Content: "Quantum computing is a revolutionary field..."},
}
// 生成会话总结
summary, err := summarizer.SummarizeSession(context.Background(), messages)
if err != nil {
panic(err)
}
fmt.Println("会话总结:", summary)
}
// 创建三个层级的总结器
messageLevelConfig := memory.DefaultLLMSummarizerConfig()
messageLevelConfig.Level = memory.CompressionLevelLight
turnLevelConfig := memory.DefaultLLMSummarizerConfig()
turnLevelConfig.Level = memory.CompressionLevelModerate
sessionLevelConfig := memory.DefaultLLMSummarizerConfig()
sessionLevelConfig.Level = memory.CompressionLevelAggressive
// 创建压缩器
compressor := memory.NewMultiLevelCompressor(
memory.NewLLMSummarizer(messageLevelConfig),
memory.NewLLMSummarizer(turnLevelConfig),
memory.NewLLMSummarizer(sessionLevelConfig),
memory.DefaultMultiLevelCompressorConfig(),
)
// 压缩长会话
compressed, err := compressor.CompressMessages(ctx, longMessages)
if err != nil {
panic(err)
}
// 查看压缩统计
stats := compressor.GetCompressionStats()
fmt.Printf("原始消息: %d → 压缩后: %d\n", stats.OriginalMessages, stats.CompressedMessages)
fmt.Printf("压缩比率: %.2f%%\n", stats.CompressionRatio * 100)
保留大部分细节和具体信息,适合需要保持上下文完整性的场景。
config := memory.DefaultLLMSummarizerConfig()
config.Level = memory.CompressionLevelLight
config.MaxSummaryWords = 800
summarizer := memory.NewLLMSummarizer(config)
特点:
保留关键信息和重要细节,平衡压缩率和信息保留。
config := memory.DefaultLLMSummarizerConfig()
config.Level = memory.CompressionLevelModerate // 默认级别
config.MaxSummaryWords = 500
summarizer := memory.NewLLMSummarizer(config)
特点:
只保留核心要点和结论,最大化压缩效果。
config := memory.DefaultLLMSummarizerConfig()
config.Level = memory.CompressionLevelAggressive
config.MaxSummaryWords = 200
summarizer := memory.NewLLMSummarizer(config)
特点:
对单条消息进行压缩,适合处理冗长的单个消息。
config := memory.DefaultMultiLevelCompressorConfig()
config.EnableMessageLevel = true
config.MessageThreshold = 100 // 超过 100 条消息启用
// 当会话有 100+ 条消息时,自动启用消息级压缩
将多轮对话(user + assistant)压缩为总结,适合长对话。
config := memory.DefaultMultiLevelCompressorConfig()
config.EnableTurnLevel = true
config.TurnThreshold = 20 // 超过 20 轮对话启用
// 示例:3 轮对话被压缩为 1 条总结
// 原始:
// [user] Question 1
// [assistant] Answer 1
// [user] Question 2
// [assistant] Answer 2
// [user] Question 3
// [assistant] Answer 3
//
// 压缩后:
// [assistant] [对话轮次总结] 讨论了关于...的三个问题,结论是...
对整个会话生成高层次总结,适合极长会话。
config := memory.DefaultMultiLevelCompressorConfig()
config.EnableSessionLevel = true
config.SessionThreshold = 50 // 超过 50 条消息启用
// 整个会话被压缩为单一总结
config := memory.LLMSummarizerConfig{
// === LLM 配置 ===
Provider: "openai", // LLM 提供商
Model: "gpt-4o-mini", // 模型名称
APIKey: "sk-xxx", // API 密钥
BaseURL: "", // 自定义 API URL(可选)
// === 总结配置 ===
Level: memory.CompressionLevelModerate, // 压缩级别
MaxSummaryWords: 500, // 最大字数
PreserveContext: true, // 保留上下文信息
Language: "zh", // 总结语言
// === Token 配置 ===
TokenCounter: agentext.NewGPT4Counter(), // Token 计数器
}
summarizer := memory.NewLLMSummarizer(config)
config := memory.MultiLevelCompressorConfig{
// === 消息级压缩 ===
EnableMessageLevel: true,
MessageThreshold: 100, // 超过 100 条消息启用
// === 轮次级压缩 ===
EnableTurnLevel: true,
TurnThreshold: 20, // 超过 20 轮对话启用
// === 会话级压缩 ===
EnableSessionLevel: true,
SessionThreshold: 50, // 超过 50 条消息启用
// === Token 预算 ===
TokenBudget: agentext.DefaultTokenBudget(),
}
compressor := memory.NewMultiLevelCompressor(
messageLevel,
turnLevel,
sessionLevel,
config,
)
summary, err := summarizer.SummarizeSession(ctx, messages)
if err != nil {
log.Fatal(err)
}
fmt.Println("会话总结:", summary)
// 将多条消息压缩为更少的消息
compressed, err := summarizer.CompressMessages(ctx, messages)
if err != nil {
log.Fatal(err)
}
// compressed[0] 包含整个会话的总结
fmt.Println("压缩后的消息:", compressed[0].Content)
stats := summarizer.GetCompressionStats()
fmt.Printf("原始消息数: %d\n", stats.OriginalMessages)
fmt.Printf("压缩后消息数: %d\n", stats.CompressedMessages)
fmt.Printf("原始 Token 数: %d\n", stats.OriginalTokens)
fmt.Printf("压缩后 Token 数: %d\n", stats.CompressedTokens)
fmt.Printf("压缩比率: %.2f%%\n", stats.CompressionRatio * 100)
fmt.Printf("压缩耗时: %v\n", stats.Duration)
// 当前 callLLM 是占位符实现
// 在生产环境中,你可以自定义总结逻辑:
type CustomSummarizer struct {
*memory.LLMSummarizer
}
func (s *CustomSummarizer) buildSummaryPrompt(messages []Message) string {
// 自定义提示词生成逻辑
prompt := "请用以下格式总结会话:\n"
prompt += "1. 主要议题\n"
prompt += "2. 关键决策\n"
prompt += "3. 待办事项\n\n"
for _, msg := range messages {
prompt += fmt.Sprintf("[%s]: %s\n", msg.Role, msg.Content)
}
return prompt
}
import (
"github.com/astercloud/aster/pkg/context"
"github.com/astercloud/aster/pkg/memory"
)
// 创建上下文窗口管理器
wmConfig := context.DefaultWindowManagerConfig()
wmConfig.AutoCompress = true
wmConfig.Budget = context.TokenBudget{
MaxTokens: 128000,
ReservedTokens: 4096,
}
windowManager := context.NewContextWindowManager(wmConfig)
// 创建会话压缩器
summarizer := memory.NewLLMSummarizer(memory.DefaultLLMSummarizerConfig())
// 当窗口满时,使用压缩器
if windowManager.ShouldCompress() {
messages := windowManager.GetAllMessages()
compressed, _ := summarizer.CompressMessages(ctx, messages)
// 用压缩后的消息替换原始消息
windowManager.Clear()
for _, msg := range compressed {
windowManager.AddMessage(ctx, msg)
}
}
// 对于超长会话,分段压缩更有效
func compressLongSession(messages []agentext.Message, chunkSize int) []agentext.Message {
summarizer := memory.NewLLMSummarizer(memory.DefaultLLMSummarizerConfig())
compressed := []agentext.Message{}
// 分块处理
for i := 0; i < len(messages); i += chunkSize {
end := i + chunkSize
if end > len(messages) {
end = len(messages)
}
chunk := messages[i:end]
chunkSummary, _ := summarizer.SummarizeSession(ctx, chunk)
compressed = append(compressed, agentext.Message{
Role: "system",
Content: fmt.Sprintf("[第 %d 段总结] %s", i/chunkSize+1, chunkSummary),
})
}
return compressed
}
// 使用
compressed := compressLongSession(messages, 50) // 每 50 条消息压缩一次
// 在压缩时保留标记为"重要"的消息
func compressWithImportantMessages(
messages []agentext.Message,
summarizer memory.SessionCompressor,
) []agentext.Message {
important := []agentext.Message{}
regular := []agentext.Message{}
// 分离重要消息和普通消息
for _, msg := range messages {
if isImportant(msg) {
important = append(important, msg)
} else {
regular = append(regular, msg)
}
}
// 只压缩普通消息
compressed, _ := summarizer.CompressMessages(ctx, regular)
// 合并结果:重要消息 + 压缩后的普通消息
result := append(important, compressed...)
return result
}
func isImportant(msg agentext.Message) bool {
// 自定义重要性判断逻辑
return msg.Role == "system" ||
strings.Contains(msg.Content, "IMPORTANT") ||
strings.Contains(msg.Content, "TODO")
}
// ✅ 技术讨论、需要细节 → Light
config.Level = memory.CompressionLevelLight
// ✅ 一般对话、平衡压缩 → Moderate
config.Level = memory.CompressionLevelModerate
// ✅ 历史归档、最大压缩 → Aggressive
config.Level = memory.CompressionLevelAggressive
config := memory.DefaultMultiLevelCompressorConfig()
// 短会话(<20 条消息)→ 不压缩
// 中等会话(20-50 条)→ 轮次级压缩
config.TurnThreshold = 20
// 长会话(>50 条)→ 会话级压缩
config.SessionThreshold = 50
stats := compressor.GetCompressionStats()
// 检查压缩比率是否合理
if stats.CompressionRatio > 0.8 {
log.Warn("压缩效果不明显,可能需要调整压缩级别")
}
// 记录压缩耗时
if stats.Duration > 5*time.Second {
log.Warn("压缩耗时较长,考虑使用更快的模型")
}
// 避免阻塞主流程
go func() {
compressed, err := summarizer.CompressMessages(ctx, messages)
if err != nil {
log.Error("压缩失败:", err)
return
}
// 异步保存压缩结果
saveCompressedSession(compressed)
}()
// 客服会话通常很长,需要智能压缩
summarizer := memory.NewLLMSummarizer(memory.DefaultLLMSummarizerConfig())
// 每 10 轮对话压缩一次
ticker := time.NewTicker(10 * time.Minute)
go func() {
for range ticker.C {
messages := getSessionMessages()
if len(messages) > 20 {
compressed, _ := summarizer.CompressMessages(ctx, messages)
updateSessionMessages(compressed)
}
}
}()
// 编程会话需要保留代码细节
config := memory.DefaultLLMSummarizerConfig()
config.Level = memory.CompressionLevelLight // 轻度压缩
config.PreserveContext = true // 保留上下文
summarizer := memory.NewLLMSummarizer(config)
// 在达到 Token 限制前压缩
if currentTokens > maxTokens * 0.8 {
compressed, _ := summarizer.CompressMessages(ctx, messages)
messages = compressed
}
// 会议结束后生成总结
config := memory.DefaultLLMSummarizerConfig()
config.Level = memory.CompressionLevelModerate
config.MaxSummaryWords = 500
config.PreserveContext = true // 保留关键决策和待办事项
summarizer := memory.NewLLMSummarizer(config)
// 生成会议纪要
summary, _ := summarizer.SummarizeSession(ctx, meetingMessages)
saveMeetingMinutes(summary)
// 收集多个会话,批量压缩
sessions := [][]agentext.Message{session1, session2, session3}
results := make([]string, len(sessions))
var wg sync.WaitGroup
for i, session := range sessions {
wg.Add(1)
go func(idx int, msgs []agentext.Message) {
defer wg.Done()
summary, _ := summarizer.SummarizeSession(ctx, msgs)
results[idx] = summary
}(i, session)
}
wg.Wait()
var compressionCache sync.Map
func getCachedSummary(sessionID string, messages []agentext.Message) (string, bool) {
key := fmt.Sprintf("%s:%d", sessionID, len(messages))
if cached, ok := compressionCache.Load(key); ok {
return cached.(string), true
}
return "", false
}
func cacheSummary(sessionID string, messages []agentext.Message, summary string) {
key := fmt.Sprintf("%s:%d", sessionID, len(messages))
compressionCache.Store(key, summary)
}
// 只压缩新增的消息,而不是整个会话
type IncrementalCompressor struct {
lastCompressedIndex int
existingSummary string
summarizer memory.SessionCompressor
}
func (ic *IncrementalCompressor) CompressIncremental(
messages []agentext.Message,
) (string, error) {
// 只处理新消息
newMessages := messages[ic.lastCompressedIndex:]
if len(newMessages) < 10 {
return ic.existingSummary, nil // 新消息太少,不压缩
}
// 压缩新消息
newSummary, err := ic.summarizer.SummarizeSession(ctx, newMessages)
if err != nil {
return "", err
}
// 合并旧总结和新总结
combined := ic.existingSummary + "\n" + newSummary
// 更新状态
ic.lastCompressedIndex = len(messages)
ic.existingSummary = combined
return combined, nil
}
运行会话压缩器测试:
# 运行所有压缩器测试
go test ./pkg/memory/... -run Compress -v
# 运行 LLM 总结器测试
go test ./pkg/memory/... -run TestLLMSummarizer -v
# 运行多层次压缩器测试
go test ./pkg/memory/... -run TestMultiLevel -v