本指南将带您在5分钟内创建一个功能完整的AI Agent。
确保已完成:
go get github.com/astercloud/astermkdir my-first-agent
cd my-first-agent
go mod init my-first-agent
go get github.com/astercloud/aster
创建 main.go:
package main
import (
"context"
"fmt"
"log"
"os"
"github.com/astercloud/aster/pkg/agent"
"github.com/astercloud/aster/pkg/provider"
"github.com/astercloud/aster/pkg/sandbox"
"github.com/astercloud/aster/pkg/store"
"github.com/astercloud/aster/pkg/tools"
"github.com/astercloud/aster/pkg/tools/builtin"
"github.com/astercloud/aster/pkg/types"
)
func main() {
// 1. 创建工具注册表并注册内置工具
toolRegistry := tools.NewRegistry()
builtin.RegisterAll(toolRegistry)
// 2. 创建依赖
jsonStore, _ := store.NewJSONStore("./.aster")
deps := &agent.Dependencies{
Store: jsonStore,
SandboxFactory: sandbox.NewFactory(),
ToolRegistry: toolRegistry,
ProviderFactory: &provider.AnthropicFactory{},
TemplateRegistry: agent.NewTemplateRegistry(),
}
// 3. 注册Agent模板
deps.TemplateRegistry.Register(&types.AgentTemplateDefinition{
ID: "assistant",
SystemPrompt: "You are a helpful assistant with file and bash access.",
Model: "claude-sonnet-4-5",
Tools: []interface{}{"Read", "Write", "Bash"},
})
// 4. 创建Agent
ag, err := agent.Create(context.Background(), &types.AgentConfig{
TemplateID: "assistant",
ModelConfig: &types.ModelConfig{
Provider: "anthropic",
Model: "claude-sonnet-4-5",
APIKey: os.Getenv("ANTHROPIC_API_KEY"),
},
Sandbox: &types.SandboxConfig{
Kind: types.SandboxKindLocal,
WorkDir: "./workspace",
},
}, deps)
if err != nil {
log.Fatal(err)
}
defer ag.Close()
// 5. 订阅事件并实时输出
eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
go func() {
for envelope := range eventCh {
if evt, ok := envelope.Event.(types.EventType); ok {
switch e := evt.(type) {
case *types.ProgressTextChunkEvent:
fmt.Print(e.Delta) // 实时输出AI回复
case *types.ProgressToolStartEvent:
fmt.Printf("\n[Tool] %s\n", e.Call.Name)
}
}
}
}()
// 6. 同步对话
result, err := ag.Chat(context.Background(),
"请创建一个 hello.txt 文件,内容是 'Hello aster!'")
if err != nil {
log.Fatal(err)
}
fmt.Printf("\n\n✅ 完成: %s\n", result.Text)
}
export ANTHROPIC_API_KEY="sk-ant-xxxxx"
go run main.go
预期输出:
[Tool] Write
我已经创建了 hello.txt 文件,内容为 'Hello aster!'
✅ 完成: 文件创建成功
管理所有可用工具:
toolRegistry := tools.NewRegistry()
// 注册所有内置工具
builtin.RegisterAll(toolRegistry)
// 或选择性注册
toolRegistry.Register(builtin.NewFileSystemTool())
toolRegistry.Register(builtin.NewBashTool())
预定义Agent配置,便于复用:
deps.TemplateRegistry.Register(&types.AgentTemplateDefinition{
ID: "coder",
SystemPrompt: "You are an expert programmer.",
Model: "claude-sonnet-4-5",
Tools: []interface{}{"fs_*", "Bash", "Grep"},
})
// 使用模板创建Agent
ag, _ := agent.Create(ctx, &types.AgentConfig{
TemplateID: "coder",
}, deps)
三种事件通道分离关注点:
// Progress: UI展示
eventCh := ag.Subscribe([]types.AgentChannel{
types.ChannelProgress, // 流式文本、工具执行
}, nil)
// Monitor: 监控告警
monitorCh := ag.Subscribe([]types.AgentChannel{
types.ChannelMonitor, // 错误、审计日志
}, nil)
// Control: 人机交互
controlCh := ag.Subscribe([]types.AgentChannel{
types.ChannelControl, // 工具审批请求
}, nil)
安全执行代码:
// 本地沙箱(开发)
Sandbox: &types.SandboxConfig{
Kind: types.SandboxKindLocal,
WorkDir: "./workspace",
}
// 阿里云沙箱(生产)
Sandbox: &types.SandboxConfig{
Kind: types.SandboxKindAliyun,
WorkDir: "/workspace",
}
// 模板:文件助手
deps.TemplateRegistry.Register(&types.AgentTemplateDefinition{
ID: "file-helper",
SystemPrompt: "You are a file processing expert.",
Tools: []interface{}{
"Read", "Write", "Edit",
"Ls", "Glob", "Grep",
},
})
// 使用
ag, _ := agent.Create(ctx, &types.AgentConfig{
TemplateID: "file-helper",
}, deps)
ag.Chat(ctx, "列出所有.go文件并统计行数")
// 模板:代码执行器
deps.TemplateRegistry.Register(&types.AgentTemplateDefinition{
ID: "executor",
SystemPrompt: "You can execute bash commands safely.",
Tools: []interface{}{"Bash", "Read"},
})
// 使用
ag, _ := agent.Create(ctx, &types.AgentConfig{
TemplateID: "executor",
Sandbox: &types.SandboxConfig{
Kind: types.SandboxKindLocal,
},
}, deps)
ag.Chat(ctx, "运行 go test ./... 并显示结果")
// 创建Agent时启用中间件
ag, _ := agent.Create(ctx, &types.AgentConfig{
TemplateID: "assistant",
Middlewares: []string{
"filesystem", // 文件系统操作
"summarization", // 自动上下文总结
},
}, deps)
// 中间件会自动:
// 1. 提供额外工具(如 fs_* 工具)
// 2. 管理上下文(自动总结长对话)
// 3. 拦截和增强调用
aster提供丰富的内置工具:
| 类别 | 工具名称 | 功能描述 |
|---|---|---|
| 文件系统 | Read | 读取文件内容(支持分页) |
Write | 写入文件 | |
Edit | 精确编辑(字符串替换) | |
Ls | 列出目录 | |
Glob | Glob模式匹配 | |
Grep | 正则搜索(显示行号) | |
| 命令执行 | Bash | 执行Bash命令 |
| 任务管理 | todo_list | 列出待办事项 |
todo_add | 添加待办 | |
todo_update | 更新待办状态 | |
| 网络 | http_fetch | HTTP请求 |
WebSearch | 网络搜索 |
注册所有工具:
builtin.RegisterAll(toolRegistry)
选择性注册:
toolRegistry.Register(builtin.NewFileSystemTool())
toolRegistry.Register(builtin.NewBashTool())
// 启动对话(非阻塞)
eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
ag.Send(ctx, "你好")
// 处理事件流
for envelope := range eventCh {
// 处理事件
}
// 阻塞等待完成
result, err := ag.Chat(ctx, "你好")
if err != nil {
log.Fatal(err)
}
fmt.Println(result.Text)
优先使用异步订阅方式:
// ✅ 推荐:流式处理
eventCh := ag.Subscribe([]types.AgentChannel{types.ChannelProgress}, nil)
go func() {
for envelope := range eventCh {
// 实时处理
}
}()
// ❌ 不推荐:阻塞等待
result, _ := ag.Chat(ctx, message) // 等待完整响应
// 创建多个Agent并发处理
agents := make([]*agent.Agent, 5)
for i := range agents {
agents[i], _ = agent.Create(ctx, config, deps)
}
// 并发发送任务
var wg sync.WaitGroup
for i, ag := range agents {
wg.Add(1)
go func(a *agent.Agent, taskID int) {
defer wg.Done()
a.Chat(ctx, fmt.Sprintf("任务 %d", taskID))
}(ag, i)
}
wg.Wait()
monitorCh := ag.Subscribe([]types.AgentChannel{
types.ChannelProgress,
types.ChannelMonitor, // 监控事件
}, nil)
for envelope := range monitorCh {
switch e := envelope.Event.(type) {
case *types.MonitorErrorEvent:
log.Printf("错误: %v", e.Error)
case *types.MonitorToolExecutionEvent:
log.Printf("工具: %s 耗时: %v", e.ToolName, e.Duration)
}
}
case *types.ProgressToolStartEvent:
fmt.Printf("[Tool Start] %s(%v)\n", e.Call.Name, e.Call.Input)
case *types.ProgressToolEndEvent:
fmt.Printf("[Tool End] %s -> %v\n", e.Call.Name, e.Result)
ModelConfig: &types.ModelConfig{
Provider: "openai", // 改为openai
Model: "gpt-4-turbo", // 改为gpt-4
APIKey: os.Getenv("OPENAI_API_KEY"),
}
// 只允许特定工具
Tools: []interface{}{"Read", "Write"} // 不包含Bash
// 或使用权限系统
// 参见:/guides/permission-control
// 启用自动总结中间件
Middlewares: []string{"summarization"}
// 当上下文超过170k tokens时自动总结
// 实现Tool接口
type MyTool struct{}
func (t *MyTool) Name() string { return "my_tool" }
func (t *MyTool) Description() string { return "My custom tool" }
func (t *MyTool) InputSchema() map[string]interface{} {
return map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"input": map[string]string{"type": "string"},
},
}
}
func (t *MyTool) Execute(ctx context.Context, input map[string]interface{}) (interface{}, error) {
// 你的逻辑
return "结果", nil
}
// 注册
toolRegistry.Register(&MyTool{})
import "github.com/astercloud/aster/pkg/agent/workflow"
// 创建顺序工作流(数据收集 → 分析 → 报告)
sequential, _ := workflow.NewSequentialAgent(workflow.SequentialConfig{
Name: "DataPipeline",
SubAgents: []workflow.Agent{
NewDataCollectorAgent(),
NewAnalyzerAgent(),
NewReporterAgent(),
},
})
// 执行工作流
reader := sequential.Execute(ctx, "处理用户数据")
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
fmt.Printf("步骤 %d/%d: %s\n",
event.Metadata["sequential_step"],
event.Metadata["total_steps"],
event.Content.Content)
}
// 创建并行工作流(同时运行多个算法)
parallel, _ := workflow.NewParallelAgent(workflow.ParallelConfig{
Name: "MultiAlgorithm",
SubAgents: []workflow.Agent{
NewAlgorithmAgent("Fast"),
NewAlgorithmAgent("Accurate"),
NewAlgorithmAgent("Balanced"),
},
})
// 并发执行,收集所有结果
reader := parallel.Execute(ctx, "求解问题")
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
fmt.Printf("方案 %s: %s\n", event.AgentID, event.Content.Content)
}
// 创建循环工作流(迭代优化直到质量达标)
loop, _ := workflow.NewLoopAgent(workflow.LoopConfig{
Name: "OptimizationLoop",
SubAgents: []workflow.Agent{critic, improver},
MaxIterations: 5,
StopCondition: func(event *session.Event) bool {
// 质量达到90分时停止
return event.Metadata["quality_score"].(int) >= 90
},
})
reader := loop.Execute(ctx, "优化代码")
for {
event, err := reader.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
continue
}
fmt.Printf("迭代 %d: %s\n",
event.Metadata["loop_iteration"],
event.Content.Content)
}
import "github.com/astercloud/aster/pkg/session/postgres"
// 创建 PostgreSQL Session 服务
sessionService, _ := postgres.NewService(&postgres.Config{
DSN: "host=localhost port=5432 user=postgres dbname=aster",
AutoMigrate: true,
})
defer sessionService.Close()
// 创建 Session
sess, _ := sessionService.Create(ctx, &session.CreateRequest{
AppName: "my-app",
UserID: "user-001",
AgentID: "agent-001",
})
// 追加事件
event := &session.Event{
ID: "evt-001",
AgentID: "agent-001",
Content: types.Message{Role: types.RoleUser, Content: "Hello"},
}
sessionService.AppendEvent(ctx, sess.ID, event)
// 查询事件
events, _ := sessionService.GetEvents(ctx, sess.ID, nil)
fmt.Printf("查询到 %d 个事件\n", len(events))
import "github.com/astercloud/aster/pkg/session/mysql"
// MySQL 使用方式与 PostgreSQL 相同
mysqlService, _ := mysql.NewService(&mysql.Config{
DSN: "root:password@tcp(127.0.0.1:3306)/aster?charset=utf8mb4",
AutoMigrate: true,
})
// 使用方式与 PostgreSQL 相同
查看完整示例: