在前面的示例中, 我们已经通过 SemanticMemory 为 Agent 增强了语义检索能力。本示例进一步展示如何在 Workflow Agent 中结合语义检索与 LLM 调用, 搭建一个简单的 RAG 工作流。
目标:
SemanticMemory 索引一组知识片段;workflow.Agent 接口将这一逻辑作为「可组合的步骤」集成到更大的工作流中。示例代码位置: examples/workflow-semantic/main.go
示例中我们使用 内存 VectorStore + MockEmbedder, 方便本地直接运行:
ctx := context.Background()
// 1. 初始化语义记忆: MemoryStore + MockEmbedder
store := vector.NewMemoryStore()
embedder := vector.NewMockEmbedder(16)
semMem := memory.NewSemanticMemory(memory.SemanticMemoryConfig{
Store: store,
Embedder: embedder,
NamespaceScope: "resource",
TopK: 3,
})
然后索引几条「世界知识」:
docs := []struct {
id string
text string
meta map[string]interface{}
}{
{
id: "doc-paris",
text: "Paris is the capital and most populous city of France.",
meta: map[string]interface{}{"user_id": "alice", "resource_id": "world-facts"},
},
{
id: "doc-berlin",
text: "Berlin is the capital city of Germany.",
meta: map[string]interface{}{"user_id": "alice", "resource_id": "world-facts"},
},
}
for _, d := range docs {
if err := semMem.Index(ctx, d.id, d.text, d.meta); err != nil {
log.Fatalf("index %s: %v", d.id, err)
}
}
示例复用了与 examples/server-http 类似的依赖注入方式:
toolRegistry := tools.NewRegistry()
builtin.RegisterAll(toolRegistry)
memStore := storepkg() // 使用 JSON Store 持久化 Agent 状态
deps := &agent.Dependencies{
Store: memStore,
SandboxFactory: sandbox.NewFactory(),
ToolRegistry: toolRegistry,
// 使用多提供商工厂, 默认 Anthropic, 需要配置 ANTHROPIC_API_KEY
ProviderFactory: provider.NewMultiProviderFactory(),
TemplateRegistry: func() *agent.TemplateRegistry {
tr := agent.NewTemplateRegistry()
tr.Register(&types.AgentTemplateDefinition{
ID: "semantic-qa",
Model: "claude-sonnet-4-5",
SystemPrompt: "You are a helpful assistant. Use the provided context if available " +
"and answer the user's question concisely.",
Tools: []interface{}{"Read", "Write"},
})
return tr
}(),
}
提示: 要让示例真正调用在线模型, 需要在运行前设置
ANTHROPIC_API_KEY环境变量; 否则agent.Create会返回错误。
核心是一个实现了 workflow.Agent 接口的结构体:
type SemanticQAWorkflowAgent struct {
name string
templateID string
semMem *memory.SemanticMemory
deps *agent.Dependencies
}
func (a *SemanticQAWorkflowAgent) Name() string {
return a.name
}
执行逻辑分两步:
SemanticMemory 做语义检索, 产出上下文 Event;agent.Agent, 将上下文注入到 prompt 中, 得到最终回答 Event。func (a *SemanticQAWorkflowAgent) Execute(
ctx context.Context,
message string,
) *stream.Reader[*session.Event] {
reader, writer := stream.Pipe[*session.Event](10)
go func() {
defer writer.Close()
// 1) 语义检索
meta := map[string]interface{}{
"user_id": "alice",
"resource_id": "world-facts",
}
hits, err := a.semMem.Search(ctx, message, meta, 3)
if err != nil {
writer.Send(nil, fmt.Errorf("semantic search: %w", err))
return
}
contextText := buildContext(hits) // 将命中文本拼接成上下文
// 向外发出「上下文事件」
if writer.Send(&session.Event{
ID: "...-context",
Timestamp: time.Now(),
AgentID: a.name,
Author: "system",
Content: types.Message{
Role: types.RoleSystem,
Content: "Semantic context:\n\n" + contextText,
},
}, nil) {
return
}
// 2) 创建底层 Agent, 调用 LLM
ag, err := agent.Create(ctx, &types.AgentConfig{
TemplateID: a.templateID,
Metadata: map[string]interface{}{
"workflow_step": a.name,
},
}, a.deps)
if err != nil {
writer.Send(nil, fmt.Errorf("create agent: %w", err))
return
}
defer ag.Close()
prompt := fmt.Sprintf(
"Use the following context if it is helpful:\n\n%s\n\nQuestion: %s",
contextText, message,
)
res, err := ag.Chat(ctx, prompt)
// 向外发出「回答事件」
writer.Send(&session.Event{
ID: "...-answer",
Timestamp: time.Now(),
AgentID: a.name,
Author: "assistant",
Content: types.Message{
Role: types.RoleAssistant,
Content: res.Text,
},
}, err)
}()
return reader
}
为了演示 Workflow 接口, 示例将 SemanticQAWorkflowAgent 包装成一个顺序工作流:
semanticQA := NewSemanticQAWorkflowAgent("SemanticQA", "semantic-qa", semMem, deps)
seq, err := workflow.NewSequentialAgent(workflow.SequentialConfig{
Name: "SemanticQAPipeline",
SubAgents: []workflow.Agent{
semanticQA,
},
})
if err != nil {
log.Fatalf("create sequential workflow: %v", err)
}
question := "What is the capital of France?"
reader := seq.Execute(ctx, question)
for {
ev, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
log.Fatalf("workflow error: %v", err)
}
if ev == nil {
continue
}
fmt.Printf("[%s] %s\n", ev.AgentID, ev.Content.Content)
}
终端输出将类似:
=== Workflow: Semantic QA ===
Question: What is the capital of France?
[SemanticQA] Semantic context:
[DOC 1] Paris is the capital and most populous city of France.
[SemanticQA] Paris is the capital and most populous city of France.
在仓库根目录执行:
cd examples
go run ./workflow-semantic
aster.yaml 中配置了 pgvector + OpenAI 等 adapter, 可以在自己的项目中复用相同的 SemanticMemory 配置, 将本示例中的 Workflow Agent 变成真正的生产级 RAG 步骤。