核心概念

Session 持久化

使用 PostgreSQL 和 MySQL 持久化 Agent 会话和事件历史

Session 持久化

Session 持久化是 aster Phase 7 引入的关键功能,提供了完整的会话状态和事件历史持久化能力,支持 PostgreSQL 和 MySQL 8.0+。

🎯 为什么需要 Session 持久化?

在生产环境中,Agent 的会话和历史记录需要持久化存储:

  • 会话恢复: 应用重启后能够恢复用户会话
  • 历史查询: 查看完整的对话历史和工具调用记录
  • 审计合规: 满足安全审计和合规要求
  • 数据分析: 分析用户行为和 Agent 性能
  • 调试优化: 追踪问题和优化 Agent 表现

📊 架构设计

graph TB
    Agent[Agent] -->|创建会话| SessionService[Session Service]
    Agent -->|追加事件| SessionService
    Agent -->|查询历史| SessionService

    SessionService --> Interface{Session Interface}

    Interface -->|开发/测试| Memory[Memory Store<br/>内存存储]
    Interface -->|生产环境| PostgreSQL[PostgreSQL<br/>JSONB支持]
    Interface -->|生产环境| MySQL[MySQL 8.0+<br/>JSON支持]

    PostgreSQL --> PGTables[(sessions<br/>session_events)]
    MySQL --> MySQLTables[(sessions<br/>session_events)]

    style SessionService fill:#3b82f6
    style PostgreSQL fill:#10b981
    style MySQL fill:#f59e0b

五种存储实现

存储类型用途数据持久化JSON 支持适用场景
Memory开发/测试❌ 无✅ 原生本地开发、单元测试
SQLite桌面应用✅ 持久化✅ JSON单机、桌面应用
Redis分布式/高性能✅ 持久化✅ 原生多节点部署、高并发场景
PostgreSQL生产推荐✅ 持久化✅ JSONB复杂查询、全文搜索
MySQL 8.0+生产可选✅ 持久化✅ JSON已有 MySQL 基础设施
📱 桌面应用推荐: 对于桌面应用场景,推荐使用 SQLite 会话存储,无需外部数据库服务。

📐 数据模型

Session 表

存储会话元数据:

erDiagram
    SESSION {
        string id PK "会话ID"
        string app_name "应用名称"
        string user_id "用户ID"
        string agent_id "AgentID"
        string status "状态(active/completed/failed)"
        jsonb metadata "元数据(JSONB/JSON)"
        timestamp created_at "创建时间"
        timestamp updated_at "更新时间"
    }

    SESSION ||--o{ EVENT : has

    EVENT {
        string id PK "事件ID"
        string session_id FK "会话ID"
        string invocation_id "调用ID"
        string agent_id "AgentID"
        string author "作者(user/agent)"
        jsonb content "消息内容(JSONB/JSON)"
        jsonb actions "事件动作(JSONB/JSON)"
        jsonb metadata "元数据(JSONB/JSON)"
        string branch "工作流分支"
        timestamp timestamp "时间戳"
    }

Session 字段说明:

字段类型说明示例
idstring会话唯一ID"sess-20250113-abc123"
app_namestring应用名称"my-chatbot"
user_idstring用户ID"user-001"
agent_idstringAgent ID"agent-assistant"
statusstring会话状态"active", "completed", "failed"
metadatajsonb/json自定义元数据{"version": "1.0", "env": "prod"}

Event 字段说明:

字段类型说明示例
idstring事件唯一ID"evt-001"
session_idstring所属会话ID"sess-20250113-abc123"
invocation_idstring调用ID(工作流追踪)"inv-001"
agent_idstring产生事件的 Agent ID"agent-001"
authorstring事件作者"user", "agent"
contentjsonb/json消息内容{"role": "user", "content": "Hello"}
actionsjsonb/json事件动作{"escalate": false}
metadatajsonb/json事件元数据{"loop_iteration": 1}
branchstring工作流分支路径"Pipeline.Analyzer"

索引优化

为高性能查询创建的索引:

-- Session 表索引
CREATE INDEX idx_sessions_user_id ON sessions(user_id);
CREATE INDEX idx_sessions_agent_id ON sessions(agent_id);
CREATE INDEX idx_sessions_status ON sessions(status);
CREATE INDEX idx_sessions_created_at ON sessions(created_at);

-- Event 表索引
CREATE INDEX idx_events_session_id ON session_events(session_id);
CREATE INDEX idx_events_timestamp ON session_events(timestamp);
CREATE INDEX idx_events_invocation_id ON session_events(invocation_id);
CREATE INDEX idx_events_agent_id ON session_events(agent_id);

🔄 生命周期

Session 状态机

graph LR
    Active[active<br/>活跃] -->|正常完成| Completed[completed<br/>已完成]
    Active -->|发生错误| Failed[failed<br/>失败]
    Active -->|超时| Failed

    Completed -->|重新激活| Active
    Failed -->|重试| Active

    style Active fill:#3b82f6
    style Completed fill:#10b981
    style Failed fill:#ef4444

完整流程

sequenceDiagram
    participant App as 应用
    participant Agent
    participant SessionService
    participant DB as 数据库

    App->>SessionService: Create(req)
    SessionService->>DB: INSERT INTO sessions
    DB->>SessionService: session_id
    SessionService->>App: Session对象

    loop Agent 执行
        App->>Agent: Chat(ctx, message)
        Agent->>SessionService: AppendEvent(event)
        SessionService->>DB: INSERT INTO session_events
        Agent->>App: 流式响应
    end

    App->>SessionService: Update(session)
    SessionService->>DB: UPDATE sessions SET status
    SessionService->>App: 更新成功

    App->>SessionService: GetEvents(session_id, filter)
    SessionService->>DB: SELECT * FROM session_events WHERE...
    DB->>SessionService: events[]
    SessionService->>App: 事件列表

🔧 服务接口

核心接口

type SessionService interface {
    // Session 管理
    Create(ctx context.Context, req *CreateRequest) (*Session, error)
    Get(ctx context.Context, id string) (*Session, error)
    Update(ctx context.Context, session *Session) error
    Delete(ctx context.Context, id string) error
    List(ctx context.Context, filter *ListFilter) ([]*Session, error)

    // Event 管理
    AppendEvent(ctx context.Context, sessionID string, event *Event) error
    AppendEvents(ctx context.Context, sessionID string, events []*Event) error
    GetEvents(ctx context.Context, sessionID string, filter *EventFilter) ([]*Event, error)

    // 批量操作
    DeleteByUser(ctx context.Context, userID string) error
    DeleteByApp(ctx context.Context, appName string) error

    // 资源清理
    Close() error
}

过滤器

ListFilter - Session 列表查询:

type ListFilter struct {
    AppName   string    // 按应用名称过滤
    UserID    string    // 按用户ID过滤
    AgentID   string    // 按AgentID过滤
    Status    string    // 按状态过滤
    StartTime time.Time // 时间范围开始
    EndTime   time.Time // 时间范围结束
    Limit     int       // 返回数量限制
    Offset    int       // 偏移量(分页)
}

EventFilter - Event 查询:

type EventFilter struct {
    InvocationID string    // 按调用ID过滤
    AgentID      string    // 按AgentID过滤
    Author       string    // 按作者过滤
    StartTime    time.Time // 时间范围开始
    EndTime      time.Time // 时间范围结束
    Limit        int       // 返回数量限制
    Offset       int       // 偏移量(分页)
}

🚀 性能优化

1. 批量操作

// ✅ 推荐:批量插入事件
events := []*session.Event{ /* ... */ }
service.AppendEvents(ctx, sess.ID, events)  // 单个事务

// ❌ 避免:逐条插入
for _, event := range events {
    service.AppendEvent(ctx, sess.ID, event)  // 多个事务,慢
}

性能对比:

  • 批量插入:~10ms(100条事件)
  • 逐条插入:~1000ms(100条事件,每条10ms)

2. 连接池调优

// 生产环境推荐配置
config := &postgres.Config{
    MaxOpenConns: 50,              // 最大连接数
    MaxIdleConns: 10,              // 最大空闲连接
    MaxLifetime:  5 * time.Minute, // 连接最大生命周期
}

3. 查询优化

// ✅ 推荐:使用索引字段查询
filter := &session.ListFilter{
    UserID:    "user-001",  // 有索引
    StartTime: yesterday,   // 有索引
    Limit:     100,
}

// ❌ 避免:LIKE 查询或全表扫描
// 使用 metadata 字段做复杂查询可能较慢

4. 分页最佳实践

// 游标分页(推荐)
filter := &session.EventFilter{
    StartTime: lastEventTime,  // 上次查询的最后一个事件时间
    Limit:     100,
}

// Offset 分页(简单但慢)
filter := &session.EventFilter{
    Limit:  100,
    Offset: 200,  // 第3页,跳过前200条
}

🔐 安全最佳实践

1. 数据库权限最小化

-- PostgreSQL: 创建专用用户
CREATE USER aster_app WITH PASSWORD 'strong_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON sessions, session_events TO aster_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO aster_app;

-- MySQL: 创建专用用户
CREATE USER 'aster_app'@'%' IDENTIFIED BY 'strong_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON aster.* TO 'aster_app'@'%';
FLUSH PRIVILEGES;

2. SSL/TLS 连接

// PostgreSQL SSL
DSN: "host=db.example.com port=5432 user=user dbname=db sslmode=require"

// MySQL SSL
DSN: "user:pwd@tcp(db.example.com:3306)/db?tls=custom"

3. 敏感数据加密

// 存储前加密敏感内容
event := &session.Event{
    Content: types.Message{
        Role:    types.RoleUser,
        Content: encrypt(sensitiveContent),  // 加密
    },
    Metadata: map[string]interface{}{
        "ip": hashIP(clientIP),  // 哈希化
    },
}

4. 数据保留策略

// 定期清理历史数据
func cleanupOldSessions(service SessionService) {
    cutoffTime := time.Now().Add(-30 * 24 * time.Hour) // 30天前

    sessions, _ := service.List(ctx, &session.ListFilter{
        EndTime: cutoffTime,
        Status:  session.StatusCompleted,
        Limit:   1000,
    })

    for _, sess := range sessions {
        service.Delete(ctx, sess.ID)
    }
}

📊 数据库选择指南

PostgreSQL vs MySQL

特性PostgreSQLMySQL 8.0+说明
JSON 查询✅ 优秀 (JSONB)✅ 良好 (JSON)PG 的 JSONB 性能更好
全文搜索✅ 内置⚠️ 需配置PG 开箱即用
复杂查询✅ 强大✅ 良好PG 支持更多高级特性
并发性能✅ MVCC✅ InnoDB都支持高并发
生态成熟度✅ 活跃✅ 活跃两者都很成熟
云服务支持✅ 广泛✅ 广泛AWS、GCP、Azure 都支持
部署成本💰 中💰 低MySQL 部署稍简单

推荐决策:

  • 选择 PostgreSQL: 需要复杂 JSON 查询、全文搜索、高级分析
  • 选择 MySQL: 已有 MySQL 基础设施、简单查询为主、成本敏感
  • 选择 Redis: 多节点部署、需要分布式状态共享、高并发场景 (1000+ QPS)

Redis Store 分布式存储

核心特性:

  • 高性能: 内存级读写速度 (< 5ms)
  • 🔄 分布式: 支持多节点共享状态
  • 🔒 原子操作: WATCH/MULTI 事务保证数据一致性
  • 自动过期: 支持 TTL 自动清理过期数据
  • 📈 高可用: 支持 Redis Cluster/Sentinel

使用场景:

import "github.com/astercloud/aster/pkg/store"

// 创建 Redis Store
redisStore, err := store.NewRedisStore(store.RedisConfig{
    Addr:     "localhost:6379",
    Password: "",
    DB:       0,
    Prefix:   "aster:",
    TTL:      7 * 24 * time.Hour, // 7天过期
})
if err != nil {
    panic(err)
}
defer redisStore.Close()

// 或使用工厂模式
st, err := store.NewStore(store.Config{
    Type:          store.StoreTypeRedis,
    RedisAddr:     "localhost:6379",
    RedisPassword: "",
    RedisDB:       0,
    RedisPrefix:   "aster:",
    RedisTTL:      7 * 24 * time.Hour,
})

分布式场景示例:

// 节点 1: 创建 Agent
agent1, _ := agent.Create(ctx, &types.AgentConfig{
    AgentID: "agt-shared-001",
    // ...
}, &agent.Dependencies{
    Store: redisStore,  // 使用 Redis Store
    // ...
})

agent1.Chat(ctx, "你好,我是节点 1")

// 节点 2: 创建相同 ID 的 Agent (不同服务器)
agent2, _ := agent.Create(ctx, &types.AgentConfig{
    AgentID: "agt-shared-001",  // 相同 ID
    // ...
}, &agent.Dependencies{
    Store: redisStore,  // 共享同一个 Redis
    // ...
})

// 节点 2 能够读取节点 1 的对话历史
agent2.Chat(ctx, "继续之前的对话")

性能指标:

操作延迟QPS
SaveMessages< 5ms50K+
LoadMessages< 3ms100K+
TrimMessages< 2ms30K+

高可用配置:

// Redis Sentinel (生产环境推荐)
import "github.com/redis/go-redis/v9"

client := redis.NewFailoverClient(&redis.FailoverOptions{
    MasterName:    "mymaster",
    SentinelAddrs: []string{
        "sentinel1:26379",
        "sentinel2:26379",
        "sentinel3:26379",
    },
})

// Redis Cluster (大规模部署)
client := redis.NewClusterClient(&redis.ClusterOptions{
    Addrs: []string{
        "cluster-node1:6379",
        "cluster-node2:6379",
        "cluster-node3:6379",
    },
})

最佳实践:

  1. 会话保持: 使用用户 ID 或会话 ID 作为 AgentID
    agentID := fmt.Sprintf("agt-user-%s", userID)
    
  2. TTL 设置:
    • 开发环境: 1 * time.Hour
    • 生产环境: 7 * 24 * time.Hour
    • 永久存储: 0 (不过期)
  3. 负载均衡: 配合 Nginx/HAProxy 实现多节点部署
    upstream aster_backend {
        least_conn;
        server node1:8080;
        server node2:8080;
        server node3:8080;
    }
    
  4. 监控: 使用 Redis INFO 命令监控性能
    redis-cli INFO stats
    redis-cli INFO memory
    

详细文档请参考: Redis Store 使用指南

🔗 与工作流 Agent 集成

Session 持久化与工作流 Agent 无缝集成:

// 创建 Session
sess, _ := sessionService.Create(ctx, &session.CreateRequest{
    AppName: "workflow-demo",
    UserID:  userID,
    AgentID: "pipeline-agent",
})

// 执行工作流,自动持久化事件
sequential, _ := workflow.NewSequentialAgent(workflow.SequentialConfig{
    Name: "DataPipeline",
    SubAgents: []workflow.Agent{collector, analyzer, reporter},
})

reader := sequential.Execute(ctx, "处理数据")
for {
    event, err := reader.Recv()
    if err != nil {
        if errors.Is(err, io.EOF) {
            break
        }
        continue
    }

    // 追加事件到数据库
    sessionService.AppendEvent(ctx, sess.ID, event)

    // 事件包含丰富的工作流元数据
    step := event.Metadata["sequential_step"]
    branch := event.Branch  // "DataPipeline.Analyzer"
}

// 更新 Session 状态
sess.Status = session.StatusCompleted
sessionService.Update(ctx, sess)

📚 相关资源

❓ 常见问题

Q1: 如何从内存存储迁移到 PostgreSQL?

A: 使用批量操作迁移数据:

// 从内存读取
memSessions := memStore.ListSessions()

// 批量写入 PostgreSQL
for _, sess := range memSessions {
    pgService.Create(ctx, sess)

    events := memStore.GetEvents(sess.ID)
    pgService.AppendEvents(ctx, sess.ID, events)
}

Q2: 如何处理大量事件的查询?

A: 使用流式分页查询:

offset := 0
limit := 1000

for {
    events, _ := service.GetEvents(ctx, sess.ID, &session.EventFilter{
        Limit:  limit,
        Offset: offset,
    })

    if len(events) == 0 {
        break
    }

    processEvents(events)
    offset += limit
}

Q3: 支持事务吗?

A: 是的,批量操作自动使用事务:

// AppendEvents 内部使用事务
service.AppendEvents(ctx, sess.ID, events)  // 全部成功或全部失败

Q4: 如何监控数据库性能?

A: 检查连接池状态:

stats := service.DB().Stats()
fmt.Printf("Open=%d Idle=%d InUse=%d WaitCount=%d\n",
    stats.OpenConnections,
    stats.Idle,
    stats.InUse,
    stats.WaitCount)

Q5: 数据如何备份?

# PostgreSQL
pg_dump -h localhost -U postgres aster > backup-$(date +%Y%m%d).sql

# MySQL
mysqldump -h 127.0.0.1 -u root -p aster > backup-$(date +%Y%m%d).sql

🚀 下一步