Session 持久化是 aster Phase 7 引入的关键功能,提供了完整的会话状态和事件历史持久化能力,支持 PostgreSQL 和 MySQL 8.0+。
在生产环境中,Agent 的会话和历史记录需要持久化存储:
graph TB
Agent[Agent] -->|创建会话| SessionService[Session Service]
Agent -->|追加事件| SessionService
Agent -->|查询历史| SessionService
SessionService --> Interface{Session Interface}
Interface -->|开发/测试| Memory[Memory Store<br/>内存存储]
Interface -->|生产环境| PostgreSQL[PostgreSQL<br/>JSONB支持]
Interface -->|生产环境| MySQL[MySQL 8.0+<br/>JSON支持]
PostgreSQL --> PGTables[(sessions<br/>session_events)]
MySQL --> MySQLTables[(sessions<br/>session_events)]
style SessionService fill:#3b82f6
style PostgreSQL fill:#10b981
style MySQL fill:#f59e0b
| 存储类型 | 用途 | 数据持久化 | JSON 支持 | 适用场景 |
|---|---|---|---|---|
| Memory | 开发/测试 | ❌ 无 | ✅ 原生 | 本地开发、单元测试 |
| SQLite | 桌面应用 | ✅ 持久化 | ✅ JSON | 单机、桌面应用 |
| Redis | 分布式/高性能 | ✅ 持久化 | ✅ 原生 | 多节点部署、高并发场景 |
| PostgreSQL | 生产推荐 | ✅ 持久化 | ✅ JSONB | 复杂查询、全文搜索 |
| MySQL 8.0+ | 生产可选 | ✅ 持久化 | ✅ JSON | 已有 MySQL 基础设施 |
存储会话元数据:
erDiagram
SESSION {
string id PK "会话ID"
string app_name "应用名称"
string user_id "用户ID"
string agent_id "AgentID"
string status "状态(active/completed/failed)"
jsonb metadata "元数据(JSONB/JSON)"
timestamp created_at "创建时间"
timestamp updated_at "更新时间"
}
SESSION ||--o{ EVENT : has
EVENT {
string id PK "事件ID"
string session_id FK "会话ID"
string invocation_id "调用ID"
string agent_id "AgentID"
string author "作者(user/agent)"
jsonb content "消息内容(JSONB/JSON)"
jsonb actions "事件动作(JSONB/JSON)"
jsonb metadata "元数据(JSONB/JSON)"
string branch "工作流分支"
timestamp timestamp "时间戳"
}
Session 字段说明:
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
id | string | 会话唯一ID | "sess-20250113-abc123" |
app_name | string | 应用名称 | "my-chatbot" |
user_id | string | 用户ID | "user-001" |
agent_id | string | Agent ID | "agent-assistant" |
status | string | 会话状态 | "active", "completed", "failed" |
metadata | jsonb/json | 自定义元数据 | {"version": "1.0", "env": "prod"} |
Event 字段说明:
| 字段 | 类型 | 说明 | 示例 |
|---|---|---|---|
id | string | 事件唯一ID | "evt-001" |
session_id | string | 所属会话ID | "sess-20250113-abc123" |
invocation_id | string | 调用ID(工作流追踪) | "inv-001" |
agent_id | string | 产生事件的 Agent ID | "agent-001" |
author | string | 事件作者 | "user", "agent" |
content | jsonb/json | 消息内容 | {"role": "user", "content": "Hello"} |
actions | jsonb/json | 事件动作 | {"escalate": false} |
metadata | jsonb/json | 事件元数据 | {"loop_iteration": 1} |
branch | string | 工作流分支路径 | "Pipeline.Analyzer" |
为高性能查询创建的索引:
-- Session 表索引
CREATE INDEX idx_sessions_user_id ON sessions(user_id);
CREATE INDEX idx_sessions_agent_id ON sessions(agent_id);
CREATE INDEX idx_sessions_status ON sessions(status);
CREATE INDEX idx_sessions_created_at ON sessions(created_at);
-- Event 表索引
CREATE INDEX idx_events_session_id ON session_events(session_id);
CREATE INDEX idx_events_timestamp ON session_events(timestamp);
CREATE INDEX idx_events_invocation_id ON session_events(invocation_id);
CREATE INDEX idx_events_agent_id ON session_events(agent_id);
graph LR
Active[active<br/>活跃] -->|正常完成| Completed[completed<br/>已完成]
Active -->|发生错误| Failed[failed<br/>失败]
Active -->|超时| Failed
Completed -->|重新激活| Active
Failed -->|重试| Active
style Active fill:#3b82f6
style Completed fill:#10b981
style Failed fill:#ef4444
sequenceDiagram
participant App as 应用
participant Agent
participant SessionService
participant DB as 数据库
App->>SessionService: Create(req)
SessionService->>DB: INSERT INTO sessions
DB->>SessionService: session_id
SessionService->>App: Session对象
loop Agent 执行
App->>Agent: Chat(ctx, message)
Agent->>SessionService: AppendEvent(event)
SessionService->>DB: INSERT INTO session_events
Agent->>App: 流式响应
end
App->>SessionService: Update(session)
SessionService->>DB: UPDATE sessions SET status
SessionService->>App: 更新成功
App->>SessionService: GetEvents(session_id, filter)
SessionService->>DB: SELECT * FROM session_events WHERE...
DB->>SessionService: events[]
SessionService->>App: 事件列表
type SessionService interface {
// Session 管理
Create(ctx context.Context, req *CreateRequest) (*Session, error)
Get(ctx context.Context, id string) (*Session, error)
Update(ctx context.Context, session *Session) error
Delete(ctx context.Context, id string) error
List(ctx context.Context, filter *ListFilter) ([]*Session, error)
// Event 管理
AppendEvent(ctx context.Context, sessionID string, event *Event) error
AppendEvents(ctx context.Context, sessionID string, events []*Event) error
GetEvents(ctx context.Context, sessionID string, filter *EventFilter) ([]*Event, error)
// 批量操作
DeleteByUser(ctx context.Context, userID string) error
DeleteByApp(ctx context.Context, appName string) error
// 资源清理
Close() error
}
ListFilter - Session 列表查询:
type ListFilter struct {
AppName string // 按应用名称过滤
UserID string // 按用户ID过滤
AgentID string // 按AgentID过滤
Status string // 按状态过滤
StartTime time.Time // 时间范围开始
EndTime time.Time // 时间范围结束
Limit int // 返回数量限制
Offset int // 偏移量(分页)
}
EventFilter - Event 查询:
type EventFilter struct {
InvocationID string // 按调用ID过滤
AgentID string // 按AgentID过滤
Author string // 按作者过滤
StartTime time.Time // 时间范围开始
EndTime time.Time // 时间范围结束
Limit int // 返回数量限制
Offset int // 偏移量(分页)
}
// ✅ 推荐:批量插入事件
events := []*session.Event{ /* ... */ }
service.AppendEvents(ctx, sess.ID, events) // 单个事务
// ❌ 避免:逐条插入
for _, event := range events {
service.AppendEvent(ctx, sess.ID, event) // 多个事务,慢
}
性能对比:
// 生产环境推荐配置
config := &postgres.Config{
MaxOpenConns: 50, // 最大连接数
MaxIdleConns: 10, // 最大空闲连接
MaxLifetime: 5 * time.Minute, // 连接最大生命周期
}
// ✅ 推荐:使用索引字段查询
filter := &session.ListFilter{
UserID: "user-001", // 有索引
StartTime: yesterday, // 有索引
Limit: 100,
}
// ❌ 避免:LIKE 查询或全表扫描
// 使用 metadata 字段做复杂查询可能较慢
// 游标分页(推荐)
filter := &session.EventFilter{
StartTime: lastEventTime, // 上次查询的最后一个事件时间
Limit: 100,
}
// Offset 分页(简单但慢)
filter := &session.EventFilter{
Limit: 100,
Offset: 200, // 第3页,跳过前200条
}
-- PostgreSQL: 创建专用用户
CREATE USER aster_app WITH PASSWORD 'strong_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON sessions, session_events TO aster_app;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO aster_app;
-- MySQL: 创建专用用户
CREATE USER 'aster_app'@'%' IDENTIFIED BY 'strong_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON aster.* TO 'aster_app'@'%';
FLUSH PRIVILEGES;
// PostgreSQL SSL
DSN: "host=db.example.com port=5432 user=user dbname=db sslmode=require"
// MySQL SSL
DSN: "user:pwd@tcp(db.example.com:3306)/db?tls=custom"
// 存储前加密敏感内容
event := &session.Event{
Content: types.Message{
Role: types.RoleUser,
Content: encrypt(sensitiveContent), // 加密
},
Metadata: map[string]interface{}{
"ip": hashIP(clientIP), // 哈希化
},
}
// 定期清理历史数据
func cleanupOldSessions(service SessionService) {
cutoffTime := time.Now().Add(-30 * 24 * time.Hour) // 30天前
sessions, _ := service.List(ctx, &session.ListFilter{
EndTime: cutoffTime,
Status: session.StatusCompleted,
Limit: 1000,
})
for _, sess := range sessions {
service.Delete(ctx, sess.ID)
}
}
| 特性 | PostgreSQL | MySQL 8.0+ | 说明 |
|---|---|---|---|
| JSON 查询 | ✅ 优秀 (JSONB) | ✅ 良好 (JSON) | PG 的 JSONB 性能更好 |
| 全文搜索 | ✅ 内置 | ⚠️ 需配置 | PG 开箱即用 |
| 复杂查询 | ✅ 强大 | ✅ 良好 | PG 支持更多高级特性 |
| 并发性能 | ✅ MVCC | ✅ InnoDB | 都支持高并发 |
| 生态成熟度 | ✅ 活跃 | ✅ 活跃 | 两者都很成熟 |
| 云服务支持 | ✅ 广泛 | ✅ 广泛 | AWS、GCP、Azure 都支持 |
| 部署成本 | 💰 中 | 💰 低 | MySQL 部署稍简单 |
推荐决策:
核心特性:
使用场景:
import "github.com/astercloud/aster/pkg/store"
// 创建 Redis Store
redisStore, err := store.NewRedisStore(store.RedisConfig{
Addr: "localhost:6379",
Password: "",
DB: 0,
Prefix: "aster:",
TTL: 7 * 24 * time.Hour, // 7天过期
})
if err != nil {
panic(err)
}
defer redisStore.Close()
// 或使用工厂模式
st, err := store.NewStore(store.Config{
Type: store.StoreTypeRedis,
RedisAddr: "localhost:6379",
RedisPassword: "",
RedisDB: 0,
RedisPrefix: "aster:",
RedisTTL: 7 * 24 * time.Hour,
})
分布式场景示例:
// 节点 1: 创建 Agent
agent1, _ := agent.Create(ctx, &types.AgentConfig{
AgentID: "agt-shared-001",
// ...
}, &agent.Dependencies{
Store: redisStore, // 使用 Redis Store
// ...
})
agent1.Chat(ctx, "你好,我是节点 1")
// 节点 2: 创建相同 ID 的 Agent (不同服务器)
agent2, _ := agent.Create(ctx, &types.AgentConfig{
AgentID: "agt-shared-001", // 相同 ID
// ...
}, &agent.Dependencies{
Store: redisStore, // 共享同一个 Redis
// ...
})
// 节点 2 能够读取节点 1 的对话历史
agent2.Chat(ctx, "继续之前的对话")
性能指标:
| 操作 | 延迟 | QPS |
|---|---|---|
| SaveMessages | < 5ms | 50K+ |
| LoadMessages | < 3ms | 100K+ |
| TrimMessages | < 2ms | 30K+ |
高可用配置:
// Redis Sentinel (生产环境推荐)
import "github.com/redis/go-redis/v9"
client := redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "mymaster",
SentinelAddrs: []string{
"sentinel1:26379",
"sentinel2:26379",
"sentinel3:26379",
},
})
// Redis Cluster (大规模部署)
client := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"cluster-node1:6379",
"cluster-node2:6379",
"cluster-node3:6379",
},
})
最佳实践:
agentID := fmt.Sprintf("agt-user-%s", userID)
1 * time.Hour7 * 24 * time.Hour0 (不过期)upstream aster_backend {
least_conn;
server node1:8080;
server node2:8080;
server node3:8080;
}
redis-cli INFO stats
redis-cli INFO memory
详细文档请参考: Redis Store 使用指南
Session 持久化与工作流 Agent 无缝集成:
// 创建 Session
sess, _ := sessionService.Create(ctx, &session.CreateRequest{
AppName: "workflow-demo",
UserID: userID,
AgentID: "pipeline-agent",
})
// 执行工作流,自动持久化事件
sequential, _ := workflow.NewSequentialAgent(workflow.SequentialConfig{
Name: "DataPipeline",
SubAgents: []workflow.Agent{collector, analyzer, reporter},
})
reader := sequential.Execute(ctx, "处理数据")
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
// 追加事件到数据库
sessionService.AppendEvent(ctx, sess.ID, event)
// 事件包含丰富的工作流元数据
step := event.Metadata["sequential_step"]
branch := event.Branch // "DataPipeline.Analyzer"
}
// 更新 Session 状态
sess.Status = session.StatusCompleted
sessionService.Update(ctx, sess)
A: 使用批量操作迁移数据:
// 从内存读取
memSessions := memStore.ListSessions()
// 批量写入 PostgreSQL
for _, sess := range memSessions {
pgService.Create(ctx, sess)
events := memStore.GetEvents(sess.ID)
pgService.AppendEvents(ctx, sess.ID, events)
}
A: 使用流式分页查询:
offset := 0
limit := 1000
for {
events, _ := service.GetEvents(ctx, sess.ID, &session.EventFilter{
Limit: limit,
Offset: offset,
})
if len(events) == 0 {
break
}
processEvents(events)
offset += limit
}
A: 是的,批量操作自动使用事务:
// AppendEvents 内部使用事务
service.AppendEvents(ctx, sess.ID, events) // 全部成功或全部失败
A: 检查连接池状态:
stats := service.DB().Stats()
fmt.Printf("Open=%d Idle=%d InUse=%d WaitCount=%d\n",
stats.OpenConnections,
stats.Idle,
stats.InUse,
stats.WaitCount)
# PostgreSQL
pg_dump -h localhost -U postgres aster > backup-$(date +%Y%m%d).sql
# MySQL
mysqldump -h 127.0.0.1 -u root -p aster > backup-$(date +%Y%m%d).sql