Architecture

客户端 SDK 架构 (client-sdks)

客户端 SDK 架构 (client-sdks)

设计原则: 事件驱动、类型安全、框架集成 最后更新: 2024年11月17日


架构概览

client-sdks/
├── client-js/              # 核心 TypeScript 客户端
│   ├── src/
│   │   ├── index.ts
│   │   ├── client.ts       # AgentsdkClient 主类
│   │   ├── resources/      # 15 个资源类
│   │   ├── events/         # 事件系统
│   │   ├── transport/      # 传输层
│   │   ├── errors/         # 错误类
│   │   └── utils/          # 工具函数
│   └── tests/
│
├── react/                  # React 集成
│   └── src/
│       ├── provider.tsx    # AgentsdkProvider
│       ├── hooks/          # React hooks
│       └── components/     # UI 组件
│
├── ai-sdk/                 # Vercel AI SDK 集成
│   └── src/
│       ├── routes.ts       # createAgentChatRoute
│       └── adapters.ts     # AI SDK adapters
│
└── vue/                    # Vue 集成(未来)
    └── src/
        └── composables/    # Vue composables

核心设计

1. 15 个资源模块

按功能域划分,而非简单实体:

class AgentsdkClient {
  // 核心业务
  agents: AgentResource; // Agent 管理和 Chat
  memory: MemoryResource; // 三层记忆系统 ⭐
  workflows: WorkflowResource; // 工作流编排
  sessions: SessionResource; // 会话和断点恢复

  // 开发工具
  skills: SkillResource; // 技能管理
  evals: EvalResource; // 评估系统
  tools: ToolResource; // 工具执行

  // 生态集成
  mcp: MCPResource; // MCP 协议
  router: RouterResource; // 模型路由
  sandbox: SandboxResource; // 沙箱管理

  // 基础设施
  middleware: MiddlewareResource; // 中间件配置
  telemetry: TelemetryResource; // 可观测性
  provider: ProviderResource; // Provider 管理

  // 管理功能
  template: TemplateResource; // 模板管理
  security: SecurityResource; // 安全系统
}

2. 事件驱动架构

三通道设计是核心特性:

// 订阅事件
const subscription = await client.agents.subscribe([Channel.Progress, Channel.Control, Channel.Monitor], {
  agentId: "agent-123",
  eventTypes: ["thinking", "text_chunk", "tool_start"],
});

// 处理事件
for await (const event of subscription) {
  switch (event.channel) {
    case Channel.Progress:
      // 数据流:思考、文本、工具执行
      if (event.type === "text_chunk") {
        console.log(event.data.delta);
      }
      break;

    case Channel.Control:
      // 审批流:工具审批、暂停/恢复
      if (event.type === "tool_approval_request") {
        await client.security.approveToolCall(event.data.approvalId);
      }
      break;

    case Channel.Monitor:
      // 治理流:Token、成本、合规
      if (event.type === "token_usage") {
        console.log("Tokens:", event.data.tokens);
      }
      break;
  }
}

20+ 事件类型:

Progress Channel:

  • thinking - 思考过程
  • text_chunk - 流式文本
  • tool_start / tool_end - 工具执行
  • done / error - 完成/错误

Control Channel:

  • tool_approval_request / tool_approval_response - 工具审批
  • pause / resume - 暂停/恢复

Monitor Channel:

  • token_usage - Token 使用
  • latency - 延迟
  • cost - 成本
  • compliance - 合规检查

3. BaseResource 设计

所有资源类继承自 BaseResource

class BaseResource {
  protected readonly options: ClientOptions;
  protected readonly httpClient: HttpClient;
  protected readonly wsClient: WebSocketClient;

  constructor(options: ClientOptions) {
    this.options = options;
    this.httpClient = new HttpClient(options);
    this.wsClient = new WebSocketClient(options);
  }

  // 统一的请求方法
  protected async request<T>(path: string, options?: RequestOptions): Promise<T> {
    // 1. Retry 机制(exponential backoff)
    // 2. 错误处理和恢复
    // 3. Request Context 传递
    // 4. 超时控制
    // 5. 日志和追踪
    // 6. 指标收集
  }

  // 流式请求
  protected async *streamRequest<T>(path: string, options?: RequestOptions): AsyncGenerator<T> {
    // SSE 解析和 AsyncGenerator
  }

  // 事件订阅
  protected subscribe(path: string, options?: SubscribeOptions): EventSubscription {
    // WebSocket 长连接
  }
}

特性:

  • ✅ Retry 机制(指数退避,最多 3 次)
  • ✅ 错误处理(统一的错误类型)
  • ✅ Request Context(Base64 编码传递)
  • ✅ 超时控制(全局 + 请求级别)
  • ✅ 日志追踪(可配置级别)
  • ✅ 指标收集(与 Telemetry 集成)

核心资源详解

1. Agent 资源

class AgentResource extends BaseResource {
  // Chat 方法
  async chat(params: ChatParams): Promise<ChatResponse>;
  async *stream(params: ChatParams): AsyncGenerator<StreamEvent>;

  // 事件订阅(核心!)
  subscribe(channels: Channel[], filter?: EventFilter): EventSubscription;

  // Agent 管理
  async create(config: AgentConfig): Promise<Agent>;
  async get(agentId: string): Promise<AgentInfo>;
  async list(filter?: AgentFilter): Promise<PaginatedResponse<AgentInfo>>;
  async delete(agentId: string): Promise<void>;

  // 统计
  async getStats(agentId: string): Promise<AgentStats>;
}

API 端点:

POST/GET/DELETE  /v1/agents
POST             /v1/agents/chat
POST             /v1/agents/chat/stream
POST             /v1/agents/{id}/subscribe
GET              /v1/agents/{id}/stats

2. Memory 资源 ⭐ 核心特性

class MemoryResource extends BaseResource {
  // Working Memory(LLM 可主动更新!)
  working: {
    async get(key: string, scope?: 'thread' | 'resource'): Promise<any>
    async set(key: string, value: any, options?: {
      scope?: 'thread' | 'resource'
      ttl?: number           // TTL 过期
      schema?: JSONSchema    // JSON Schema 验证
    }): Promise<void>
    async delete(key: string, scope?: 'thread' | 'resource'): Promise<void>
    async list(scope?: 'thread' | 'resource'): Promise<Record<string, any>>
  }

  // Semantic Memory
  semantic: {
    async search(query: string, options?: SearchOptions): Promise<MemoryChunk[]>
    async store(content: string, metadata?: Record<string, any>): Promise<string>
    async delete(chunkId: string): Promise<void>
  }

  // 高级功能
  async getProvenance(memoryId: string): Promise<Provenance>
  async consolidate(options?: ConsolidateOptions): Promise<ConsolidationResult>
}

特性:

  • 双作用域(thread / resource)
  • LLM 可主动更新(通过 Tool)
  • 自动加载到 system prompt
  • JSON Schema 验证
  • TTL 过期机制

传输层设计

HTTP Client

class HttpClient {
  // 基础请求
  async request(path: string, options: RequestOptions): Promise<Response>;

  // Retry 逻辑
  private async retryRequest(fn: () => Promise<Response>): Promise<Response>;

  // 错误处理
  private handleError(error: any): never;
}

SSE Parser

class SSEParser {
  // 解析 SSE 流
  async *parse(stream: ReadableStream): AsyncGenerator<SSEEvent>;

  // 心跳处理
  private handleHeartbeat(): void;

  // 重连逻辑
  private async reconnect(): Promise<void>;
}

WebSocket Client

class WebSocketClient {
  // 建立连接
  async connect(url: string): Promise<WebSocket>;

  // 订阅事件
  subscribe(channels: Channel[], filter?: EventFilter): EventSubscription;

  // 自动重连
  private async reconnect(): Promise<void>;
}

类型系统

完整的 TypeScript 类型

// 1. 事件类型(20+)
type Event = ProgressEvent | ControlEvent | MonitorEvent;

// 2. 资源类型(15 个资源 × 平均 5-10 个方法)
interface AgentResource { ... }
interface MemoryResource { ... }
// ... 15 个资源接口

// 3. 配置类型
interface ClientOptions { ... }
interface AgentConfig { ... }
interface SessionConfig { ... }
// ... 所有配置类型

// 4. 响应类型
interface ChatResponse { ... }
interface WorkflowRun { ... }
// ... 所有响应类型

// 5. 错误类型
class AgentsdkError extends Error { ... }
class NetworkError extends AgentsdkError { ... }
// ... 错误类层次

错误处理策略

// 错误类层次
class AgentsdkError extends Error {
  constructor(
    message: string,
    public readonly code?: string,
    public readonly statusCode?: number,
    public readonly details?: unknown
  )
}

class NetworkError extends AgentsdkError {}
class APIError extends AgentsdkError {}
class ValidationError extends AgentsdkError {}
class StreamError extends AgentsdkError {}
class SubscriptionError extends AgentsdkError {}

// 错误处理
try {
  await client.agents.chat({ input: '...' });
} catch (error) {
  if (error instanceof NetworkError) {
    // 网络错误:重试
  } else if (error instanceof APIError) {
    // API 错误:根据状态码处理
  } else if (error instanceof ValidationError) {
    // 验证错误:修正参数
  }
}

React 集成

Provider

import { AgentsdkProvider } from '@aster/react';

function App() {
  return (
    <AgentsdkProvider
      baseURL="http://localhost:8080"
      apiKey="your-api-key"
    >
      <YourApp />
    </AgentsdkProvider>
  );
}

Hooks

// useAgent - Agent Chat
const { chat, loading, error } = useAgent({
  agentId: "agent-123",
});

const handleSubmit = async (input) => {
  const response = await chat({ input });
  console.log(response.text);
};

// useAgentStream - 流式响应
const { stream, events } = useAgentStream({
  agentId: "agent-123",
});

useEffect(() => {
  for await (const event of stream({ input: "..." })) {
    console.log(event);
  }
}, []);

// useWorkflow - 工作流
const { execute, status } = useWorkflow({
  workflowId: "workflow-123",
});

// useMemory - 记忆管理
const { get, set, search } = useMemory();

Vercel AI SDK 集成

import { createAgentChatRoute } from "@aster/ai-sdk";

// Next.js App Router
export const POST = createAgentChatRoute({
  baseURL: process.env.AGENTSDK_URL,
  apiKey: process.env.AGENTSDK_API_KEY,
  agentId: "agent-123",
});
// 客户端使用
import { useChat } from 'ai/react';

function Chat() {
  const { messages, input, handleSubmit } = useChat({
    api: '/api/chat'
  });

  return <ChatUI messages={messages} onSubmit={handleSubmit} />;
}

性能优化

1. 连接复用

  • HTTP Keep-Alive
  • WebSocket 长连接复用

2. 请求合并

  • 批量请求支持
  • 相同请求去重

3. 缓存策略

  • 响应缓存(可选)
  • TTL 配置

4. 背压控制

  • 流式响应背压
  • 事件流背压

使用示例

基础 Chat

import { AgentsdkClient } from "@aster/client-js";

const client = new AgentsdkClient({
  baseURL: "http://localhost:8080",
  apiKey: "your-api-key",
});

// 简单对话
const response = await client.agents.chat({
  agentId: "agent-123",
  input: "Hello, world!",
});
console.log(response.text);

流式响应

// 流式对话
for await (const event of client.agents.stream({
  agentId: "agent-123",
  input: "Tell me a story...",
})) {
  if (event.type === "text_chunk") {
    process.stdout.write(event.data.delta);
  }
}

事件订阅

// 订阅所有通道
const subscription = await client.agents.subscribe(["progress", "control", "monitor"], { agentId: "agent-123" });

for await (const event of subscription) {
  console.log(event.channel, event.type, event.data);
}

Working Memory

// 设置工作记忆
await client.memory.working.set("user_name", "Alice", {
  scope: "thread",
  ttl: 3600,
});

// 获取工作记忆
const name = await client.memory.working.get("user_name");

// LLM 会自动访问这些记忆

Workflow 执行

// 执行工作流
const run = await client.workflows.execute("workflow-123", {
  input: { task: "analyze data" },
});

// 监控执行状态
const status = await client.workflows.getRunDetails("workflow-123", run.id);

总结

核心设计点

  1. 15 个资源模块 - 按功能域划分
  2. 事件驱动架构 - 三通道设计
  3. BaseResource 基类 - 统一的通用功能
  4. 完整的类型系统 - 100+ 接口定义
  5. 框架集成 - React、Vercel AI SDK、Vue (未来)

关键特性

  • ✅ 完整的 100+ API 端点覆盖
  • ✅ 20+ 事件类型
  • ✅ Working Memory(LLM 可主动更新)
  • ✅ 断点恢复(7 段断点)
  • ✅ Workflow 编排(3 种模式)
  • ✅ MCP 协议(Server + Client)
  • ✅ OpenTelemetry 集成

相关文档


版本: v2.0 最后更新: 2024年11月17日