记忆

Memory Consolidation

内存自动合并 - LLM驱动的智能记忆管理和优化

Memory Consolidation (内存合并)

概述

Memory Consolidation 是 aster 的智能内存管理功能,可以自动检测和合并冗余或冲突的记忆,保持记忆库的质量和一致性。

功能特性

  • 自动冗余检测: 识别内容相似的重复记忆
  • 冲突解决: 处理矛盾信息,选择最可信的来源
  • 智能总结: 将多条相关记忆总结为简洁表述
  • LLM 驱动: 使用大语言模型进行语义理解和合并
  • 溯源保留: 合并后保留完整的记忆谱系
  • 自动触发: 支持定期自动合并

核心概念

合并策略

1. Redundancy Strategy (冗余合并)

  • 检测高度相似的重复记忆
  • 合并为单条精炼的记忆
  • 适用场景:用户多次表达相同信息

2. Conflict Resolution Strategy (冲突解决)

  • 检测矛盾或不一致的信息
  • 根据置信度和新鲜度选择最佳版本
  • 适用场景:信息更新、纠错

3. Summarization Strategy (总结)

  • 将多条相关记忆总结为概括性表述
  • 减少记忆数量,提高检索效率
  • 适用场景:长期记忆管理

合并流程

1. 触发条件检查
   ├─ 定时触发(每24小时)
   ├─ 记忆数量阈值(超过10条)
   └─ 手动触发

2. 候选记忆查找
   ├─ 向量相似度计算
   ├─ 记忆聚类
   └─ 相似度阈值过滤

3. 策略判断
   ├─ 检查是否应该合并
   └─ 确定合并原因

4. LLM 合并
   ├─ 构建提示模板
   ├─ 调用 LLM API
   └─ 生成合并结果

5. 保存和清理
   ├─ 保存合并后的记忆
   ├─ 更新溯源链
   └─ 处理源记忆(删除/标记)

快速开始

1. 创建合并引擎

package main

import (
    "github.com/astercloud/aster/pkg/memory"
)

func main() {
    // 创建语义内存
    semanticMemory := memory.NewSemanticMemory(config)

    // 选择合并策略
    strategy := memory.NewRedundancyStrategy(0.85) // 85% 相似度阈值

    // 创建 LLM 提供者
    llmProvider := &YourLLMProvider{} // 实现 LLMProvider 接口

    // 配置合并引擎
    config := memory.DefaultConsolidationConfig()
    config.SimilarityThreshold = 0.85
    config.AutoConsolidateInterval = 24 * time.Hour

    // 创建合并引擎
    engine := memory.NewConsolidationEngine(
        semanticMemory,
        strategy,
        llmProvider,
        config,
    )
}

2. 手动触发合并

result, err := engine.Consolidate(ctx)
if err != nil {
    log.Fatalf("Consolidation failed: %v", err)
}

fmt.Printf("Merged %d memories into %d new memories\n",
    result.MergedCount, result.NewMemoryCount)
fmt.Printf("Duration: %v\n", result.Duration)

3. 自动合并

// 检查是否应该自动合并
if engine.ShouldAutoConsolidate() {
    result, err := engine.Consolidate(ctx)
    if err != nil {
        log.Printf("Auto-consolidation failed: %v", err)
    } else {
        log.Printf("Auto-consolidation completed: %d memories merged",
            result.MergedCount)
    }
}

配置选项

ConsolidationConfig

config := memory.ConsolidationConfig{
    // 相似度阈值(0.0-1.0)
    SimilarityThreshold: 0.85,

    // 冲突检测阈值
    ConflictThreshold: 0.75,

    // 最小记忆数量(触发合并的阈值)
    MinMemoryCount: 10,

    // 每批处理的记忆数量
    BatchSize: 50,

    // 自动合并间隔
    AutoConsolidateInterval: 24 * time.Hour,

    // 是否保留原始记忆(标记而非删除)
    PreserveOriginal: true,

    // LLM 模型
    LLMModel: "gpt-4",

    // 最大重试次数
    MaxRetries: 3,
}

合并策略详解

RedundancyStrategy - 冗余合并

合并高度相似的重复记忆:

strategy := memory.NewRedundancyStrategy(0.85)

// 示例:
// 记忆 1: "User prefers dark mode"
// 记忆 2: "User likes dark theme"
// 记忆 3: "User wants dark mode UI"
//
// 合并后: "User prefers dark mode theme for the UI"

特点

  • 保留所有重要信息
  • 消除重复表述
  • 提升记忆质量

ConflictResolutionStrategy - 冲突解决

处理矛盾信息:

calc := memory.NewConfidenceCalculator(config)
strategy := memory.NewConflictResolutionStrategy(0.75, calc)

// 示例:
// 记忆 1 (置信度 0.6): "User likes coffee"
// 记忆 2 (置信度 0.9): "User actually prefers tea"
//
// 解决后: "User prefers tea (previously mentioned liking coffee)"

特点

  • 基于置信度权衡
  • 考虑记忆新鲜度
  • 保留历史变化

SummarizationStrategy - 总结

总结多条相关记忆:

strategy := memory.NewSummarizationStrategy(5) // 每组最多 5 条

// 示例:
// 记忆 1: "User lives in New York"
// 记忆 2: "User works at Tech Corp"
// 记忆 3: "User has 5 years experience"
// 记忆 4: "User specializes in AI"
// 记忆 5: "User graduated from MIT"
//
// 总结后: "User is an AI specialist with 5 years of experience,
//          graduated from MIT, currently working at Tech Corp in New York"

特点

  • 压缩信息密度
  • 保持关键事实
  • 提高检索效率

LLM Provider 接口

实现自定义 LLM 提供者:

type LLMProvider interface {
    Complete(ctx context.Context, prompt string, options map[string]interface{}) (string, error)
}

// 示例实现
type OpenAIProvider struct {
    APIKey string
    Client *openai.Client
}

func (p *OpenAIProvider) Complete(ctx context.Context, prompt string, options map[string]interface{}) (string, error) {
    model := options["model"].(string)
    temperature := options["temperature"].(float64)

    resp, err := p.Client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
        Model: model,
        Messages: []openai.ChatCompletionMessage{
            {
                Role:    "user",
                Content: prompt,
            },
        },
        Temperature: float32(temperature),
    })

    if err != nil {
        return "", err
    }

    return resp.Choices[0].Message.Content, nil
}

高级用法

1. 自定义合并策略

type CustomStrategy struct {
    threshold float64
}

func (s *CustomStrategy) Name() string {
    return "custom"
}

func (s *CustomStrategy) ShouldConsolidate(ctx context.Context, memories []memory.MemoryWithScore) (bool, memory.ConsolidationReason) {
    // 自定义判断逻辑
    if len(memories) < 2 {
        return false, memory.ReasonNone
    }

    // 检查自定义条件
    // ...

    return true, memory.ConsolidationReason("custom-reason")
}

func (s *CustomStrategy) Consolidate(ctx context.Context, memories []memory.MemoryWithScore, llm memory.LLMProvider) (*memory.ConsolidatedMemory, error) {
    // 自定义合并逻辑
    // ...
}

2. 监控合并统计

stats := engine.GetStats()
fmt.Printf("Total consolidations: %d\n", stats.ConsolidationCount)
fmt.Printf("Total memories merged: %d\n", stats.MergedMemoriesCount)
fmt.Printf("Last consolidation: %v\n", stats.LastConsolidation)

3. 合并结果分析

result, err := engine.Consolidate(ctx)
if err != nil {
    log.Fatal(err)
}

// 检查每个合并组
for _, group := range result.MemoryGroups {
    fmt.Printf("Group consolidated: %d memories -> %s\n",
        len(group.Memories),
        group.ConsolidatedID)
    fmt.Printf("Reason: %s\n", group.Reason)

    // 访问源记忆
    for _, mem := range group.Memories {
        fmt.Printf("  - %s: %s\n", mem.DocID, mem.Text)
    }
}

// 检查错误
if len(result.Errors) > 0 {
    fmt.Println("Errors occurred:")
    for _, err := range result.Errors {
        fmt.Printf("  - %s\n", err)
    }
}

4. 保留与删除策略

// 保留原始记忆(仅标记)
config := memory.ConsolidationConfig{
    PreserveOriginal: true,
    // ...
}

// 源记忆会被标记为:
// metadata["consolidated"] = true
// metadata["consolidated_to"] = "新记忆ID"
// metadata["consolidated_at"] = "2024-01-15T10:30:00Z"

// 删除原始记忆
config := memory.ConsolidationConfig{
    PreserveOriginal: false,
    // ...
}
// 源记忆会被直接删除

LLM 提示工程

冗余合并提示模板

You are a memory consolidation assistant.
The following memory entries are redundant (saying similar things).
Please merge them into a single, concise memory that captures all the important information.

Redundant Memories:
1. User prefers dark mode
2. User likes dark theme
3. User wants dark UI

Instructions:
- Merge the information into one clear, concise statement
- Preserve all important details
- Remove redundancy
- Keep the same tone and style
- Output only the merged memory, without explanation

Merged Memory:

冲突解决提示模板

You are a memory conflict resolution assistant.
The following memory entries contain conflicting information.
Please analyze them and create a single, accurate memory.

Conflicting Memories:
1. User likes coffee (Confidence: 0.60)
2. User actually prefers tea (Confidence: 0.90)

Instructions:
- Analyze the conflicts carefully
- Prefer information from higher confidence sources
- If information is contradictory, indicate uncertainty
- Provide a balanced, objective statement
- Output only the resolved memory, without explanation

Resolved Memory:

最佳实践

1. 合并时机

  • 定期合并: 每24小时执行一次全面合并
  • 阈值触发: 当记忆数量超过1000条时触发
  • 空闲时段: 在系统负载较低时执行

2. 相似度阈值选择

  • 保守合并: 0.90+ 相似度(减少误合并)
  • 平衡合并: 0.85 相似度(推荐)
  • 激进合并: 0.75 相似度(最大化压缩)

3. 溯源管理

// 合并后的记忆保留完整溯源链
consolidated.Provenance.Sources = [
    "original-memory-1",
    "original-memory-2",
    "original-memory-3",
]
consolidated.Provenance.CorroborationCount = 3

4. 错误处理

result, err := engine.Consolidate(ctx)
if err != nil {
    log.Printf("Consolidation failed: %v", err)
    // 记录错误,稍后重试
    return
}

// 检查部分失败
if len(result.Errors) > 0 {
    log.Printf("Consolidation completed with %d errors", len(result.Errors))
    for _, e := range result.Errors {
        log.Printf("  Error: %s", e)
    }
}

5. 性能优化

  • 使用批处理(BatchSize: 50-100)
  • 并发处理不相关的记忆组
  • 缓存向量相似度计算结果
  • 使用更快的 LLM 模型(如 GPT-3.5-turbo)

集成示例

与 Agent 集成

type Agent struct {
    memory             *memory.SemanticMemory
    consolidationEngine *memory.ConsolidationEngine
}

func (a *Agent) Run(ctx context.Context) {
    // 定期检查并执行合并
    ticker := time.NewTicker(1 * time.Hour)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            if a.consolidationEngine.ShouldAutoConsolidate() {
                go a.autoConsolidate(ctx)
            }
        case <-ctx.Done():
            return
        }
    }
}

func (a *Agent) autoConsolidate(ctx context.Context) {
    result, err := a.consolidationEngine.Consolidate(ctx)
    if err != nil {
        log.Printf("Auto-consolidation failed: %v", err)
        return
    }

    log.Printf("Auto-consolidation completed: merged %d memories", result.MergedCount)
}

测试

运行 Memory Consolidation 测试:

go test ./pkg/memory/ -v -run Consolidation

相关文档

参考资源