import asyncio
async def main():
try:
# 尝试读取文件
content = await Read(path="config.json")
except Exception as e:
print(f"读取失败: {e}")
# 使用默认配置
content = '{"default": true}'
# 继续处理
data = json.loads(content)
print(f"配置: {data}")
asyncio.run(main())
async def retry_read(path, max_retries=3):
for attempt in range(max_retries):
try:
return await Read(path=path)
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(0.5 * (2 ** attempt))
content = await retry_read("important.txt")
# 耗时: 3秒 (1秒 × 3)
file1 = await Read(path="a.txt")
file2 = await Read(path="b.txt")
file3 = await Read(path="c.txt")
# 耗时: ~1秒
import asyncio
files = await asyncio.gather(
Read(path="a.txt"),
Read(path="b.txt"),
Read(path="c.txt"),
)
async def process_files(pattern):
# 1. 查找所有文件
files = await Glob(pattern=pattern)
# 2. 并发读取 (限制并发数)
sem = asyncio.Semaphore(10) # 最多10个并发
async def read_with_limit(path):
async with sem:
return await Read(path=path)
contents = await asyncio.gather(
*[read_with_limit(f) for f in files]
)
# 3. 处理结果
return contents
# 使用
results = await process_files("*.go")
_cache = {}
async def cached_read(path):
"""带缓存的文件读取"""
if path not in _cache:
_cache[path] = await Read(path=path)
return _cache[path]
# 多次调用只会读取一次
content1 = await cached_read("data.json")
content2 = await cached_read("data.json") # 从缓存返回
async def pipeline():
# 1. 数据源
files = await Glob(pattern="*.log")
# 2. 转换
async def extract_errors(file):
content = await Read(path=file)
return [
line for line in content.split('\n')
if 'ERROR' in line
]
# 3. 聚合
all_errors = []
for file in files:
errors = await extract_errors(file)
all_errors.extend(errors)
# 4. 输出
report = '\n'.join(all_errors)
await Write(path="error_report.txt", content=report)
return len(all_errors)
error_count = await pipeline()
print(f"发现 {error_count} 个错误")
async def mapreduce():
# Map 阶段: 并发处理每个文件
files = await Glob(pattern="data/*.csv")
async def map_file(path):
content = await Read(path=path)
lines = content.split('\n')
return len([l for l in lines if l.strip()])
line_counts = await asyncio.gather(
*[map_file(f) for f in files]
)
# Reduce 阶段: 聚合结果
total_lines = sum(line_counts)
return {
'files': len(files),
'total_lines': total_lines,
'avg_lines': total_lines / len(files) if files else 0
}
async def analyze_codebase():
# 1. 使用 Glob 找到所有文件
go_files = await Glob(pattern="**/*.go", path=".")
# 2. 使用 Grep 搜索 TODO 注释
todos = await Grep(
pattern="TODO|FIXME",
path=".",
glob="*.go"
)
# 3. 使用 Read 读取关键文件
main_content = await Read(path="main.go")
# 4. 使用 Bash 获取 Git 信息
git_info = await Bash(command="git log -1 --oneline")
# 5. 组合结果
report = f"""
# 代码库分析报告
- Go 文件数: {len(go_files)}
- 待办事项: {len(todos)} 个
- 最新提交: {git_info}
- 主文件大小: {len(main_content)} 字节
"""
# 6. 使用 Write 保存报告
await Write(path="ANALYSIS.md", content=report)
// Go 侧: 注册自定义工具
type MyCustomTool struct{}
func (t *MyCustomTool) Name() string {
return "CustomAnalyze"
}
func (t *MyCustomTool) Execute(ctx context.Context, input map[string]any, tc *tools.ToolContext) (any, error) {
// 自定义逻辑
data := input["data"].(string)
result := analyze(data)
return map[string]any{
"result": result,
}, nil
}
// 注册到 registry
registry.Register("CustomAnalyze", func(config map[string]any) (tools.Tool, error) {
return &MyCustomTool{}, nil
})
// 配置 AllowedCallers
toolSchema := provider.ToolSchema{
Name: "CustomAnalyze",
AllowedCallers: []string{"direct", "code_execution_20250825"},
// ...
}
# Python 侧: 使用自定义工具
result = await CustomAnalyze(data="sample input")
print(result)
import "log"
// 在 Go 程序中
log.SetFlags(log.LstdFlags | log.Lshortfile)
// 会输出:
// - HTTP 服务器启动信息
// - 工具调用详情
// - Python 执行日志
import sys
async def debug_wrapper():
print("=== 调试信息 ===", file=sys.stderr)
# 打印可用工具
print(f"可用工具: {globals().keys()}", file=sys.stderr)
# 执行主逻辑
try:
result = await Read(path="test.txt")
print(f"读取成功: {len(result)} 字节", file=sys.stderr)
return result
except Exception as e:
print(f"错误: {e}", file=sys.stderr)
raise
await debug_wrapper()
# 测试服务器健康
curl http://localhost:8080/health
# 列出可用工具
curl http://localhost:8080/tools/list
# 获取工具 Schema
curl "http://localhost:8080/tools/schema?name=Read"
# 调用工具
curl -X POST http://localhost:8080/tools/call \
-H "Content-Type: application/json" \
-d '{"tool": "Read", "input": {"path": "README.md"}}'
// 仅允许安全的只读工具在 Python 中调用
safeTools := []string{"Read", "Glob", "Grep"}
for _, toolName := range safeTools {
tool, _ := registry.Create(toolName, nil)
toolSchemas = append(toolSchemas, provider.ToolSchema{
Name: toolName,
AllowedCallers: []string{"direct", "code_execution_20250825"},
// ...
})
}
// 危险工具只允许 LLM 直接调用
dangerousTools := []string{"Write", "Bash"}
for _, toolName := range dangerousTools {
tool, _ := registry.Create(toolName, nil)
toolSchemas = append(toolSchemas, provider.ToolSchema{
Name: toolName,
AllowedCallers: []string{"direct"}, // 不能在 Python 中调用
// ...
})
}
// 在 Docker 容器中运行 Python 代码
config := &bridge.RuntimeConfig{
WorkDir: "/tmp/sandbox",
Env: map[string]string{
"PYTHONPATH": "/app",
},
}
runtime := bridge.NewPythonRuntime(config)
// 设置严格的超时
config := &bridge.RuntimeConfig{
Timeout: 10 * time.Second, // Python 执行超时
}
// HTTP 调用也有 60秒 超时(SDK 内置)
type metrics struct {
toolCalls int64
totalLatency time.Duration
mu sync.Mutex
}
var m metrics
// 在工具调用前后记录
start := time.Now()
result, _ := tool.Execute(ctx, input, tc)
latency := time.Since(start)
m.mu.Lock()
m.toolCalls++
m.totalLatency += latency
m.mu.Unlock()
log.Printf("平均延迟: %v", m.totalLatency/time.Duration(m.toolCalls))
title: Anthropic PTC 文档 icon: i-lucide-external-link to: https://docs.anthropic.com/en/docs/build-with-claude/tool-use#programmatic-tool-use-beta
官方协议规范
title: Python asyncio 文档 icon: i-lucide-external-link to: https://docs.python.org/3/library/asyncio.html
异步编程指南
title: aiohttp 文档 icon: i-lucide-external-link to: https://docs.aiohttp.org/
HTTP 客户端库 :: ::