Memory Consolidation 是 aster 的智能内存管理功能,可以自动检测和合并冗余或冲突的记忆,保持记忆库的质量和一致性。
1. Redundancy Strategy (冗余合并)
2. Conflict Resolution Strategy (冲突解决)
3. Summarization Strategy (总结)
1. 触发条件检查
├─ 定时触发(每24小时)
├─ 记忆数量阈值(超过10条)
└─ 手动触发
2. 候选记忆查找
├─ 向量相似度计算
├─ 记忆聚类
└─ 相似度阈值过滤
3. 策略判断
├─ 检查是否应该合并
└─ 确定合并原因
4. LLM 合并
├─ 构建提示模板
├─ 调用 LLM API
└─ 生成合并结果
5. 保存和清理
├─ 保存合并后的记忆
├─ 更新溯源链
└─ 处理源记忆(删除/标记)
package main
import (
"github.com/astercloud/aster/pkg/memory"
)
func main() {
// 创建语义内存
semanticMemory := memory.NewSemanticMemory(config)
// 选择合并策略
strategy := memory.NewRedundancyStrategy(0.85) // 85% 相似度阈值
// 创建 LLM 提供者
llmProvider := &YourLLMProvider{} // 实现 LLMProvider 接口
// 配置合并引擎
config := memory.DefaultConsolidationConfig()
config.SimilarityThreshold = 0.85
config.AutoConsolidateInterval = 24 * time.Hour
// 创建合并引擎
engine := memory.NewConsolidationEngine(
semanticMemory,
strategy,
llmProvider,
config,
)
}
result, err := engine.Consolidate(ctx)
if err != nil {
log.Fatalf("Consolidation failed: %v", err)
}
fmt.Printf("Merged %d memories into %d new memories\n",
result.MergedCount, result.NewMemoryCount)
fmt.Printf("Duration: %v\n", result.Duration)
// 检查是否应该自动合并
if engine.ShouldAutoConsolidate() {
result, err := engine.Consolidate(ctx)
if err != nil {
log.Printf("Auto-consolidation failed: %v", err)
} else {
log.Printf("Auto-consolidation completed: %d memories merged",
result.MergedCount)
}
}
config := memory.ConsolidationConfig{
// 相似度阈值(0.0-1.0)
SimilarityThreshold: 0.85,
// 冲突检测阈值
ConflictThreshold: 0.75,
// 最小记忆数量(触发合并的阈值)
MinMemoryCount: 10,
// 每批处理的记忆数量
BatchSize: 50,
// 自动合并间隔
AutoConsolidateInterval: 24 * time.Hour,
// 是否保留原始记忆(标记而非删除)
PreserveOriginal: true,
// LLM 模型
LLMModel: "gpt-4",
// 最大重试次数
MaxRetries: 3,
}
合并高度相似的重复记忆:
strategy := memory.NewRedundancyStrategy(0.85)
// 示例:
// 记忆 1: "User prefers dark mode"
// 记忆 2: "User likes dark theme"
// 记忆 3: "User wants dark mode UI"
//
// 合并后: "User prefers dark mode theme for the UI"
特点:
处理矛盾信息:
calc := memory.NewConfidenceCalculator(config)
strategy := memory.NewConflictResolutionStrategy(0.75, calc)
// 示例:
// 记忆 1 (置信度 0.6): "User likes coffee"
// 记忆 2 (置信度 0.9): "User actually prefers tea"
//
// 解决后: "User prefers tea (previously mentioned liking coffee)"
特点:
总结多条相关记忆:
strategy := memory.NewSummarizationStrategy(5) // 每组最多 5 条
// 示例:
// 记忆 1: "User lives in New York"
// 记忆 2: "User works at Tech Corp"
// 记忆 3: "User has 5 years experience"
// 记忆 4: "User specializes in AI"
// 记忆 5: "User graduated from MIT"
//
// 总结后: "User is an AI specialist with 5 years of experience,
// graduated from MIT, currently working at Tech Corp in New York"
特点:
实现自定义 LLM 提供者:
type LLMProvider interface {
Complete(ctx context.Context, prompt string, options map[string]interface{}) (string, error)
}
// 示例实现
type OpenAIProvider struct {
APIKey string
Client *openai.Client
}
func (p *OpenAIProvider) Complete(ctx context.Context, prompt string, options map[string]interface{}) (string, error) {
model := options["model"].(string)
temperature := options["temperature"].(float64)
resp, err := p.Client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: model,
Messages: []openai.ChatCompletionMessage{
{
Role: "user",
Content: prompt,
},
},
Temperature: float32(temperature),
})
if err != nil {
return "", err
}
return resp.Choices[0].Message.Content, nil
}
type CustomStrategy struct {
threshold float64
}
func (s *CustomStrategy) Name() string {
return "custom"
}
func (s *CustomStrategy) ShouldConsolidate(ctx context.Context, memories []memory.MemoryWithScore) (bool, memory.ConsolidationReason) {
// 自定义判断逻辑
if len(memories) < 2 {
return false, memory.ReasonNone
}
// 检查自定义条件
// ...
return true, memory.ConsolidationReason("custom-reason")
}
func (s *CustomStrategy) Consolidate(ctx context.Context, memories []memory.MemoryWithScore, llm memory.LLMProvider) (*memory.ConsolidatedMemory, error) {
// 自定义合并逻辑
// ...
}
stats := engine.GetStats()
fmt.Printf("Total consolidations: %d\n", stats.ConsolidationCount)
fmt.Printf("Total memories merged: %d\n", stats.MergedMemoriesCount)
fmt.Printf("Last consolidation: %v\n", stats.LastConsolidation)
result, err := engine.Consolidate(ctx)
if err != nil {
log.Fatal(err)
}
// 检查每个合并组
for _, group := range result.MemoryGroups {
fmt.Printf("Group consolidated: %d memories -> %s\n",
len(group.Memories),
group.ConsolidatedID)
fmt.Printf("Reason: %s\n", group.Reason)
// 访问源记忆
for _, mem := range group.Memories {
fmt.Printf(" - %s: %s\n", mem.DocID, mem.Text)
}
}
// 检查错误
if len(result.Errors) > 0 {
fmt.Println("Errors occurred:")
for _, err := range result.Errors {
fmt.Printf(" - %s\n", err)
}
}
// 保留原始记忆(仅标记)
config := memory.ConsolidationConfig{
PreserveOriginal: true,
// ...
}
// 源记忆会被标记为:
// metadata["consolidated"] = true
// metadata["consolidated_to"] = "新记忆ID"
// metadata["consolidated_at"] = "2024-01-15T10:30:00Z"
// 删除原始记忆
config := memory.ConsolidationConfig{
PreserveOriginal: false,
// ...
}
// 源记忆会被直接删除
You are a memory consolidation assistant.
The following memory entries are redundant (saying similar things).
Please merge them into a single, concise memory that captures all the important information.
Redundant Memories:
1. User prefers dark mode
2. User likes dark theme
3. User wants dark UI
Instructions:
- Merge the information into one clear, concise statement
- Preserve all important details
- Remove redundancy
- Keep the same tone and style
- Output only the merged memory, without explanation
Merged Memory:
You are a memory conflict resolution assistant.
The following memory entries contain conflicting information.
Please analyze them and create a single, accurate memory.
Conflicting Memories:
1. User likes coffee (Confidence: 0.60)
2. User actually prefers tea (Confidence: 0.90)
Instructions:
- Analyze the conflicts carefully
- Prefer information from higher confidence sources
- If information is contradictory, indicate uncertainty
- Provide a balanced, objective statement
- Output only the resolved memory, without explanation
Resolved Memory:
// 合并后的记忆保留完整溯源链
consolidated.Provenance.Sources = [
"original-memory-1",
"original-memory-2",
"original-memory-3",
]
consolidated.Provenance.CorroborationCount = 3
result, err := engine.Consolidate(ctx)
if err != nil {
log.Printf("Consolidation failed: %v", err)
// 记录错误,稍后重试
return
}
// 检查部分失败
if len(result.Errors) > 0 {
log.Printf("Consolidation completed with %d errors", len(result.Errors))
for _, e := range result.Errors {
log.Printf(" Error: %s", e)
}
}
type Agent struct {
memory *memory.SemanticMemory
consolidationEngine *memory.ConsolidationEngine
}
func (a *Agent) Run(ctx context.Context) {
// 定期检查并执行合并
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if a.consolidationEngine.ShouldAutoConsolidate() {
go a.autoConsolidate(ctx)
}
case <-ctx.Done():
return
}
}
}
func (a *Agent) autoConsolidate(ctx context.Context) {
result, err := a.consolidationEngine.Consolidate(ctx)
if err != nil {
log.Printf("Auto-consolidation failed: %v", err)
return
}
log.Printf("Auto-consolidation completed: merged %d memories", result.MergedCount)
}
运行 Memory Consolidation 测试:
go test ./pkg/memory/ -v -run Consolidation