架构方案
# Agentic RAG System — 架构设计文档
## 1. 系统概览
### 1.1 设计哲学
> "能短路就短路" — 避免无意义的检索与编排开销
Agentic RAG System 采用 **Agent 驱动** 架构,区别于传统固定 Pipeline 的 Naive/Advanced RAG:
- **动态路由**: 意图识别决定检索路径,非固定流程
- **短路机制**: 3 条短路 (A/B/C) 跳过不必要的阶段,节省 15-25% 流量
- **多通道并行**: KB 向量检索 + MCP 工具调用同时执行
- **后处理流水线**: Dedup→RRF→ContextWindow→Diversity 链式精炼
### 1.2 六层架构
```
┌─────────────────────────────────────────────────┐
│ L1 Frontend (Next.js 16 + React 18 + TS) │ ← 用户界面
├─────────────────────────────────────────────────┤
│ L2 API & Streaming (Next.js Route Handler + │ ← API 代理 + SSE
│ SSE) │
├─────────────────────────────────────────────────┤
│ L3 RAG Pipeline (StreamChatPipeline 8 阶段) │ ← 核心引擎
├─────────────────────────────────────────────────┤
│ L4 Platform Services (ETL + Memory + Router │ ← 横向服务
│ + Trace + RateLimit + Observability) │
├─────────────────────────────────────────────────┤
│ L5 Infra-AI (z-ai CLI + LLM + Embedding + │ ← AI 能力
│ Rerank + MCP) │
├─────────────────────────────────────────────────┤
│ L6 Storage (JSON DB + Vector Store + Graph) │ ← 数据持久化
└─────────────────────────────────────────────────┘
```
### 1.3 四层模块分层
| 层 | 模块 | 职责 |
| ------------- | ----------------------------------------- | ------------------------------------------- |
| L1 接入层 | `frontend` + `api/` | 用户界面 + API 代理 (Next.js Route Handler) |
| L2 应用层 | `pipeline/` + `deepsearch/` + `graphrag/` | 业务编排 (StreamChatPipeline + Fusion) |
| L3 基础设施层 | `platform/` + `rag/` + `mcp/` + `lib/` | 通用框架 + AI 能力 |
| L4 资源层 | `data/` (JSON DB) | 持久化存储 |
**设计原则**:
1. **单向依赖**: 上层只依赖下层,禁止反向
2. **接口隔离**: 层间通过接口通信,便于替换实现
3. **资源无感知**: 业务层不直接操作 DB/Storage
4. **AI 能力下沉**: `lib/llm_client.py` 封装所有模型调用细节
## 2. StreamChatPipeline — 核心引擎
### 2.1 八阶段流水线
```
Stage 1 (Memory Load) ─┐
├── 并行 (取最慢者)
Stage 2 (Query Rewrite)─┤
│
Stage 3 (Intent Parse) ─┘
Stage 4 (Ambiguity Guide) ── Shortcut A?
Stage 5 (System Direct) ── Shortcut B?
Stage 6 (Multi-Channel) ── Shortcut C? KB ∥ MCP
Stage 7 (Prompt Assembly)
Stage 8 (Streaming Output) ── SSE Token-level
```
### 2.2 阶段详解
| 阶段 | 名称 | 目标耗时 | 功能 |
| ---- | ---------------- | --------- | --------------------------------------------- |
| S1 | Memory Load | <50ms | 加载会话历史 + 摘要 |
| S2 | Query Rewrite | <500ms | LLM 改写 (指代消解 + 复合拆分 + 对话历史重写) |
| S3 | Intent Parse | <350ms | 规则快速路径 + LLM 分类 (KB/MCP/SYSTEM) |
| S4 | Ambiguity Guide | ~0ms | **短路 A**: 置信度 <0.7 或差值 <0.15 → 反问 |
| S5 | System Direct | ~0ms | **短路 B**: 全 SYSTEM → 直接回答 |
| S6 | Multi-Channel | <2s | **短路 C**: KB+MCP 并行 + 后处理流水线 |
| S7 | Prompt Assembly | <50ms | 4 模式模板 + 动态温度 |
| S8 | Streaming Output | <800ms FT | SSE Token 级流式 |
### 2.3 三条短路
| 短路 | 触发条件 | 跳过 | 占比 |
| ---- | ------------------------ | ----- | ---- |
| A | 置信度 <0.7 或候差 <0.15 | S5-S7 | ~10% |
| B | 全部意图为 SYSTEM | S6-S7 | ~10% |
| C | KB 空 + MCP 无工具 | S7 | ~5% |
### 2.4 后处理流水线
```
KB Results → Dedup → RRF (k=60) → ContextWindow → Diversity → Truncate
```
## 3. GraphRAG
### 3.1 实体消歧三阶段
1. **String Recall**: 精确/包含/Jaccard/编辑距离/缩写 (阈值 0.3)
2. **Vector Rerank**: embedding cosine 重排 (阈值 0.75)
3. **NIL Detection**: 低于 0.5 → 新实体
### 3.2 双级检索
| 级别 | 方式 | 场景 |
| ------ | ----------------- | ------------------ |
| Local | 实体+邻居+关系 | "X 是什么" |
| Global | Map-Reduce 跨社区 | "X 领域有哪些方向" |
## 4. DeepSearch
7 步: 分解→检索→证据→验证→空白?→合成
## 5. Fusion GraphRAG (Plan-Execute-Report)
```
Planner (Clarifier→Decomposer→Reviewer)
→ Executor (并行执行, 依赖感知)
→ Reporter (一致性检查)
```
## 6. 部署架构
```
外部用户 → Caddy (:81) → Next.js (:3000) → Route Handler → FastAPI (:5050)
↓
z-ai CLI (LLM)
```
# Agentic RAG System — 技术设计文档
## 1. 技术选型
### 1.1 前端
| 组件 | 选型 | 理由 |
| -------- | -------------------------------- | ----------------------------------- |
| 框架 | Next.js 16 (App Router) | 服务端渲染 + Route Handler API 代理 |
| 语言 | TypeScript 5 | 类型安全 |
| 样式 | Tailwind CSS 4 + shadcn/ui | 快速开发 + 一致性 |
| 状态 | React hooks (useState/useEffect) | 轻量, 无需 Redux/Zustand |
| 主题 | next-themes | 深色模式 CSS 变量切换 |
| 图标 | lucide-react | 统一图标库 |
| Markdown | 自定义渲染器 | 无外部依赖, 支持引用标记 [N] |
### 1.2 后端
| 组件 | 选型 | 理由 |
| -------- | ------------------ | -------------------------- |
| 框架 | FastAPI 0.128 | 异步 + 自动文档 + SSE 支持 |
| 服务器 | uvicorn | ASGI, 支持双栈 socket |
| 语言 | Python 3.12 | AI 生态成熟 |
| 图谱 | networkx 3.6 | 内存图操作 + 社区检测 |
| 数值 | numpy 2.1 | 向量计算 |
| 数据验证 | pydantic 2.12 | 请求/响应模型 |
| 存储 | JSON 文件 (自定义) | 零依赖持久化 |
### 1.3 AI
| 组件 | 选型 | 理由 |
| --------- | ---------------------- | ----------------------------------- |
| LLM | z-ai-web-dev-sdk CLI | GLM-4 模型, 子进程调用 |
| Embedding | Hash n-gram (1024-dim) | 零依赖, 快速启动, 可替换为 bge-m3 |
| Rerank | 自定义 5 维加权 | 无需额外模型, 可替换为 BGE-Reranker |
## 2. 关键技术实现
### 2.1 双栈 Socket (IPv4 + IPv6)
**问题**: Caddy 的 `localhost` 解析到 `::1` (IPv6), 但 uvicorn `--host 0.0.0.0` 只监听 IPv4。
**解决**: `run_dualstack.py` 创建 `IPV6_V6ONLY=0` 的 socket:
```python
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) # 关键
sock.bind(("::", 5050))
sock.listen(2048)
server = uvicorn.Server(config)
server.run(sockets=[sock])
```
### 2.2 API 代理 (Next.js Route Handler)
**问题**: 外部 FC 网关无法直接访问 port 5050。
**解决**: Next.js Catch-all Route Handler 代理 `/api/*`:
```typescript
// src/app/api/[...path]/route.ts
export async function GET(req: NextRequest) {
const url = `http://127.0.0.1:5050/api${path}${search}`;
for (let attempt = 0; attempt < 3; attempt++) {
try { return await fetch(url) }
catch { await sleep(1000) } // 后端重启时重试
}
}
```
### 2.3 RRF 混合搜索
**问题**: 向量搜索和关键词搜索分数不可比。
**解决**: Reciprocal Rank Fusion (无需分数归一化):
```python
RRF_K = 60
for rank, result in enumerate(vector_results):
rrf_score = 1.0 / (RRF_K + rank + 1)
for rank, result in enumerate(keyword_results):
rrf_score += 1.0 / (RRF_K + rank + 1)
```
### 2.4 跨语言查询扩展
**问题**: 中文查询 "短路机制" 无法匹配英文文档 "Shortcut"。
**解决**: 47 个中英文术语映射:
```python
CROSS_LANG_TERMS = {"短路": "shortcut", "阶段": "stage", "检索": "retrieval", ...}
def _expand_query_cross_lang(query):
expansions = [query]
for cn, en in CROSS_LANG_TERMS.items():
if cn in query:
expansions.append(query.replace(cn, en))
return expansions
```
### 2.5 示例惩罚
**问题**: Prompt Engineering.md 的 few-shot 示例包含查询原文, 导致误检索。
**解决**: 检测示例标记, 惩罚 50%:
```python
EXAMPLE_MARKERS = ["examples:", "few-shot", "scenario 1:", "test command", "→ kb"]
if any(marker in chunk_text for marker in EXAMPLE_MARKERS):
final_score *= 0.5
```
### 2.6 查询重写缓存
**问题**: 相同查询重复调用 LLM 改写 (~1.2s)。
**解决**: 模块级缓存 (含对话历史 hash):
```python
cache_key = f"{message.lower()}::{hash(history_str)}"
if cached and (time.time() - cached["ts"]) < 3600:
return {"ms": 0, "status": "cache_hit"}
```
### 2.7 LLM 速率限制重试
**问题**: z-ai API 返回 429 Too Many Requests。
**解决**: 3 次指数退避重试:
```python
for attempt in range(3):
result = await run_cli(args)
if "429" in error:
await asyncio.sleep(2 ** attempt + 1) # 2s, 3s, 5s
continue
```
### 2.8 后端自动重启
**问题**: 后端进程崩溃后无人重启。
**解决**: `run_with_restart.sh` 无限循环:
```bash
while true; do
python run_dualstack.py
sleep 2
done
```
## 3. 性能优化
### 3.1 Stage 1-3 并行执行
**旧**: Stage 1-3 全并行 (Stage 2 无对话历史)
**新**: Stage 1 先执行 → Stage 2+3 并行 (Stage 2 有历史进行 query reformulation)
### 3.2 缓存
| 缓存 | TTL | 命中效果 |
| --------------------- | ---- | -------------------- |
| Query Rewrite | 1h | Stage 2: ~1.2s → 0ms |
| Intent Classification | 1h | Stage 3: ~1s → 0ms |
| Embedding | 1h | 重复文本: ~5ms → 0ms |
### 3.3 后处理流水线
5 个处理器链式执行, 总耗时 <5ms:
- Dedup: O(n) 哈希去重
- RRF: O(n log n) 排序
- ContextWindow: O(n) 相邻查找
- Diversity: O(n) 计数
- Truncate: O(1) 截断
## 4. 安全设计
### 4.1 Calculator 安全
```python
# 禁止 __builtins__
eval(sanitized, {"__builtins__": {}}, allowed_names)
# 白名单函数: sqrt, sin, cos, log, exp, pow, abs, round, floor, ceil
# 输入清洗: re.sub(r'[^0-9+\-*/().,\s\w]+', '', expr)
```
### 4.2 CORS
```python
app.add_middleware(CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"])
```
### 4.3 限流
三级限流防止滥用: 全局 1000 QPS / 用户 10 QPS / 会话 5 QPS
## 5. 可扩展性
### 5.1 新增 MCP 工具
```python
# app/mcp/tools.py init()
"my_tool": {
"name": "my_tool",
"description": "描述",
"params": {"param1": "string (required)"},
"handler": self._tool_my_tool,
}
# 实现 handler
async def _tool_my_tool(self, params): ...
# 添加规则提取
def _rule_extract(self, tool_name, message):
if tool_name == "my_tool": ...
```
### 5.2 新增后处理器
```python
class MyPostProcessor(PostProcessor):
name = "my_processor"
def process(self, results, query=""):
# 自定义逻辑
return results
pipeline.add(MyPostProcessor())
```
### 5.3 新增 Seed 文档
```python
# app/rag/etl.py SEED_DOCUMENTS
{"name": "New Doc.md", "text": "内容..."}
# 清除数据重启: rm -rf data/*.json && bash start.sh
```
### 5.4 替换 Embedding 模型
```python
# app/lib/llm_client.py embed()
# 当前: hash n-gram (1024-dim)
# 生产: 替换为 z-ai function embeddings 或 bge-m3
```
## 6. 部署配置
### 6.1 start-all.sh
```bash
# 1. 后端 (后台, 自动重启)
bash run_with_restart.sh > /tmp/agentic-rag.log 2>&1 &
# 2. 前端 (前台, exec 替换)
exec npx next dev -H 0.0.0.0 -p 3000
```
### 6.2 next.config.ts
```typescript
allowedDevOrigins: ["121.41.131.46", "0.0.0.0", "localhost", "*.space-z.ai"]
```
### 6.3 Caddyfile
```
:81 {
@transform_port_query { query XTransformPort=* }
handle @transform_port_query {
reverse_proxy 127.0.0.1:{query.XTransformPort}
}
handle {
reverse_proxy 127.0.0.1:3000
}
}
```
### 6.4 package.json
```json
{
"dev": "bash scripts/start-all.sh",
"dev:frontend": "bash scripts/start-all.sh",
"dev:backend": "bash scripts/start-all.sh",
"start": "bash scripts/start-all.sh",
"lint": "bash scripts/start-all.sh"
}
```
## 7. 参考项目对照
| 特性 | 参考项目 | 实现文件 |
| --------------------- | --------------- | --------------------------------- |
| 多路检索+后处理流水线 | ragent | `post_processors.py` |
| 三态熔断器+首包探测 | ragent | `model_router.py` |
| 节点编排 Pipeline | ragent | `etl.py` (pipeline_nodes) |
| Plan-Execute-Report | graph-rag-agent | `multi_agent.py` |
| 实体消歧 (3 阶段) | graph-rag-agent | `entity_disambiguation.py` |
| 增量更新+去重 | graph-rag-agent | `etl.py` (content_hash) |
| BM25+Semantic+RRF | RAGLight | `vector_store.py` (search_hybrid) |
| Query Reformulation | RAGLight | `stages.py` (stage2_rewrite) |
| Langfuse 可观测性 | RAGLight | `observability.py` |
| 流式输出 | RAGLight | `sse.py` + `stream_chat.py` |
------------------------------------------------------------------------------
# Agentic RAG System — 详细设计文档
## 1. StreamChatPipeline 详细设计
### 1.1 Stage 1: Memory Load
**文件**: `app/pipeline/stages.py` → `stage1_memory()`
```python
async def stage1_memory(session_id: str, user_message: str) -> dict:
history = await memory_manager.load_history(session_id, limit=20)
summary = await memory_manager.get_summary(session_id)
return {"context": {"history": history[-10:], "summary": summary}}
```
**数据流**:
- 输入: `session_id`, `user_message`
- 输出: `{ms, status, context: {history, summary}, history_count}`
- 存储: `data/sessions.json` (20 轮 + 摘要)
- 压缩: 超过 20 轮触发 LLM 异步摘要 (10:1 压缩比)
### 1.2 Stage 2: Query Rewrite
**文件**: `app/pipeline/stages.py` → `stage2_rewrite()`
**核心逻辑**:
1. **缓存检查**: `cache_key = message::history_hash` (1h TTL)
2. **LLM 改写**: 指代消解 + 口语化转书面化 + 复合问题拆分
3. **对话历史注入**: 最近 3 轮 (6 条消息) 注入 system prompt
4. **规则降级**: LLM 失败时按 "和/与/vs" 拆分
5. **跨语言扩展**: 47 个中英文术语映射 (短路→shortcut)
**缓存策略**:
```python
_rewrite_cache: dict[str, dict] = {} # key → {rewrites, ts}
_REWRITE_CACHE_TTL = 3600 # 1 hour
```
### 1.3 Stage 3: Intent Parse
**文件**: `app/pipeline/stages.py` → `stage3_intent()`
**双层分类**:
1. **规则快速路径** (0ms): 正则匹配 SYSTEM/MCP 关键词
2. **LLM 分类** (仅当规则返回 "default KB"): 8 个工具 hint
**规则匹配优先级**:
```
SYSTEM: 你好/hello/hi/你是谁/帮助/谢谢
MCP-weather: 天气/气温
MCP-sales: 销售/营业额/Q1-Q4
MCP-ticket: 工单/ticket
MCP-calculator: 计算/算/数学表达式
MCP-datetime: 几点/什么时间/今天日期
MCP-currency: 汇率/兑换/美元/人民币
MCP-system_metrics: QPS/延迟/CPU/内存
MCP-knowledge_lookup: 查询/搜索/查找
默认: KB (confidence=0.7)
```
**缓存**: `_intent_cache` (1h TTL), 与 Stage 2 缓存配合重复查询 0ms
### 1.4 Stage 6: Multi-Channel Retrieval
**文件**: `app/pipeline/stages.py` → `stage6_retrieve()`
**KB 通道**:
```python
async def _kb_retrieve(rewritten_queries, original_query):
# 跨语言扩展
expanded = [_expand_query_cross_lang(q) for q in rewritten_queries[:3]]
# 每个扩展查询: 向量搜索 + 关键词搜索
for q in expanded:
v_hits = vector_store.search_vector(q_vec, top_k=8)
k_hits = vector_store.search_keyword(q, top_k=8)
all_hits.extend(dedup(v_hits + k_hits))
# Rerank (5 维加权 + 示例惩罚)
all_hits = await reranker.rerank(original_query, all_hits, top_k=5)
```
**RRF 混合搜索** (`vector_store.py`):
```python
RRF_K = 60
for rank, r in enumerate(vector_results):
rrf_score = 1.0 / (RRF_K + rank + 1)
for rank, r in enumerate(keyword_results):
rrf_score = 1.0 / (RRF_K + rank + 1)
```
**Reranker 5 维评分** (`rerank.py`):
| 维度 | 权重 | 说明 |
| -------------- | ---- | -------------------- |
| cosine | 35% | embedding 语义相似度 |
| key_term_ratio | 25% | 查询 token 覆盖率 |
| title_boost | 20% | 文档名匹配 |
| norm_score | 15% | 归一化原始分数 |
| overlap | 5% | Jaccard 重叠 |
**示例惩罚**: 检测 "examples:", "few-shot", "scenario", "test" 等标记 → 50% 分数惩罚
**后处理流水线** (`post_processors.py`):
```python
pipeline = PostProcessingPipeline()
pipeline.add(DedupPostProcessor()) # ID + 文本去重
pipeline.add(RRFPostProcessor(k=60)) # 多源 RRF 融合
pipeline.add(ContextWindowPostProcessor(1)) # 相邻 chunk 扩展
pipeline.add(DiversityPostProcessor(3)) # 每文档最多 3 条
pipeline.add(TruncateProcessor(top_k=5)) # 截断
```
### 1.5 Stage 7: Prompt Assembly
**4 模式模板**:
| 模式 | 条件 | 模板结构 |
| ------ | ----------- | ------------------------------------------------------------ |
| KB | 仅 KB 证据 | [MODE] KB + [HISTORY] + [EVIDENCE] + [QUESTION] + [INSTRUCTIONS] |
| MCP | 仅 MCP 证据 | [MODE] MCP + [TOOL_RESULT] + [QUESTION] |
| Hybrid | KB + MCP | [MODE] HYBRID + [KB_EVIDENCE] + [MCP_RESULTS] + [QUESTION] |
| System | 短路 B | 直接回答 (无证据) |
**动态温度**:
- 知识问答: 0.1
- 代码生成: 0.2
- 摘要总结: 0.3
- 创意写作: 0.7
### 1.6 Stage 8: Streaming Output
**SSE 事件流**:
```
data: {"type":"session","session_id":"..."}
data: {"type":"stage_start","stage":1,"name":"Memory Load"}
data: {"type":"stage_end","stage":1,"ms":5}
...
data: {"type":"first_token","ms":750}
data: {"type":"token","content":"根据"}
data: {"type":"token","content":"提供的"}
...
data: {"type":"final","trace_id":"...","total_ms":3000,"response":"..."}
```
## 2. GraphRAG 详细设计
### 2.1 实体提取
**文件**: `app/graphrag/graph_builder.py` → `_extract()`
**规则模式** (19 个):
- 英文: is_a / includes / uses / supports / integrates_with / depends_on / replaces / extends
- 中文: 包含/使用/支持/融合/依赖/替代/演进
- 特殊: Stage N→Shortcut X / Layer 定义 / P0/P1/P2 优先级
**技术术语** (55+):
StreamChatPipeline, GraphRAG, DeepSearch, Fusion, MCP, Neo4j, Milvus, pgvector, Redis, S3, Docker, K8s, GLM-4, Qwen, bge-m3, BGE-Reranker, RAPTOR, HyDE, Tika, LightRAG, networkx...
**共现关系**: 同段落实体两两连接 (weight=0.5)
### 2.2 实体消歧
**文件**: `app/graphrag/entity_disambiguation.py`
```python
class EntityDisambiguator:
STRING_SIM_THRESHOLD = 0.7
VECTOR_SIM_THRESHOLD = 0.75
NIL_THRESHOLD = 0.5
async def disambiguate(mention, candidates, mention_embedding):
# Stage 1: String Recall (多度量)
string_candidates = self._string_recall(mention, candidates)
# Stage 2: Vector Rerank
reranked = await self._vector_rerank(mention, string_candidates, mention_embedding)
# Stage 3: NIL Detection
if combined_score < 0.5: return NIL
```
**相似度度量**:
- 精确匹配: 1.0
- 包含关系: 0.8
- Jaccard: 字符集交集/并集
- 编辑距离: 1 - Levenshtein/max_len
- 缩写匹配: "SCP" = "StreamChatPipeline" → 0.9
### 2.3 社区检测
```python
communities = list(nx.community.greedy_modularity_communities(graph.to_undirected()))
```
## 3. DeepSearch 详细设计
### 3.1 Thinking Engine
**文件**: `app/deepsearch/thinking_engine.py`
**迭代流程**:
```python
async def think(question, max_iters=3):
sub_questions = await self._decompose(question) # LLM 分解
for i in range(max_iters):
queries = sub_questions if i == 0 else await self._gen_followups(question, evidence)
# 三路并行检索
kb_evi, graph_evi, web_evi = await gather(
self._kb_retrieve(queries),
self._graph_retrieve(queries),
self._web_search(queries),
)
validation = await self._validate(question, all_evidence)
if validation["sufficient"]: break
conclusion = await self._synthesize(question, all_evidence)
```
### 3.2 Multi-Agent (Plan-Execute-Report)
**文件**: `app/deepsearch/multi_agent.py`
**PlanSpec 结构**:
```python
{
"plan_id": "uuid",
"tasks": [
{"task_id": "task_001", "description": "...", "tool": "local_search",
"priority": 1, "depends_on": []}
],
"question_type": "comparative|relational|factual|analytical|procedural",
"status": "ready|needs_clarification|issues"
}
```
**执行器**: 拓扑排序 + 并行执行无依赖任务
**一致性检查**: 引用标记 / 关键词重叠 / 矛盾检测
## 4. MCP 工具详细设计
### 4.1 工具注册
**文件**: `app/mcp/tools.py`
```python
self._tools = {
"weather": {"handler": self._tool_weather, "params": {"city": "string (required)"}},
"calculator": {"handler": self._tool_calculator, "params": {"expression": "string (required)"}},
# ... 8 tools total
}
```
### 4.2 参数提取
```python
def _rule_extract(tool_name, message):
# calculator: 提取数学表达式 (去除中文/标点)
# datetime: 匹配时区关键词
# currency: 正则匹配 金额+币种
# weather: 匹配 15 个中文城市
# sales: 匹配 Q1-Q4/本月/上月
# ticket: 匹配 TKT-XXXX
```
### 4.3 Calculator 安全实现
```python
# 白名单函数
allowed_names = {sqrt, sin, cos, tan, log, exp, pow, abs, round, floor, ceil, pi, e}
# 禁止 __builtins__
eval(sanitized, {"__builtins__": {}}, allowed_names)
```
## 5. 平台服务详细设计
### 5.1 模型路由
**文件**: `app/platform/model_router.py`
**三态熔断器**:
```
CLOSED → 失败率 >10% → OPEN → 5min → HALF_OPEN → 探测成功 → CLOSED
↓ 探测失败
OPEN
```
**首包探测**: 发送 "hi" (5s 超时) → 成功才发全量请求
### 5.2 会话记忆
**文件**: `app/platform/memory.py`
```python
SUMMARY_THRESHOLD = 20 # 轮
# 超过阈值: 前 60% → LLM 摘要, 后 40% → 保留原文
# 摘要格式: "用户咨询了XX,结论是YY"
# 压缩比: 10:1
# 上下文窗口: 4K tokens
```
### 5.3 限流
**文件**: `app/platform/rate_limiter.py`
| 级别 | QPS | 实现 |
| ---- | ---- | ------------- |
| 全局 | 1000 | 令牌桶 |
| 用户 | 10 | 滑动窗口 (1s) |
| 会话 | 5 | 滑动窗口 (1s) |
降级: 排队 → 429 → 熔断兜底
### 5.4 评估管道
**文件**: `app/platform/evaluation.py`
**Golden Dataset** (20 题, 16 类目):
```python
{
"id": "eval-001",
"category": "StreamChatPipeline",
"question": "StreamChatPipeline 有几个阶段?",
"expected_keywords": ["8", "八", "stage"],
"expected_sources": ["StreamChatPipeline"],
"difficulty": "easy"
}
```
**验证**:
- `keyword_pass`: 预期关键词出现在回答中
- `source_pass`: 预期来源文档出现在 evidence sources 中
- 每题 2s 间隔 (避免速率限制)
### 5.5 可观测性
**文件**: `app/platform/observability.py`
```python
trace = RAGTrace(trace_id, session_id, question)
trace.add_span("rewrite", "query_rewrite", ms, input, output)
trace.add_span("retrieve", "retrieval", ms, input, output)
trace.add_span("generate", "generation", ms, input, output)
trace.finalize(status="ok", response="...")
```
## 6. 前端详细设计
### 6.1 API 代理
**文件**: `src/app/api/[...path]/route.ts`
```typescript
// Catch-all Route Handler: 代理所有 /api/* 到 127.0.0.1:5050
// 3 次重试 (后端可能在重启)
for (let attempt = 0; attempt < 3; attempt++) {
try { return await fetch(backend) }
catch { await sleep(1000); retry }
}
```
**SSE 代理**: `src/app/api/chat/stream/route.ts`
- 流式透传 response.body
- 保持 text/event-stream Content-Type
### 6.2 Pipeline 可视化
**文件**: `src/components/agentic-rag/chat-panel.tsx` → `PipelineVisualization`
8 个阶段卡片 + 并行指示 + 短路高亮:
- Running: 蓝色脉冲 + ping 动画
- Done: 绿色勾 + 耗时
- Skip: 灰色
- Shortcut: 琥珀色 + ⚡ 标记
### 6.3 Evidence 面板
可折叠面板, 显示每条证据:
- 编号 [1] [2] (对应回答中的引用标记)
- 类型 badge (KB=violet, MCP=amber)
- 来源 + 分数 + 文本预览 (line-clamp-3)
### 6.4 GraphRAG Canvas
**文件**: `src/components/agentic-rag/graphrag-panel.tsx` → `GraphCanvas`
- **力导向布局**: Coulomb 斥力 + Hooke 引力 + 中心力 + 阻尼
- **交互**: 拖拽节点 / 滚轮缩放 / 平移 / 悬停高亮
- **边样式**: 共现 (细/透明) vs 显式 (粗/可见)
- **节点颜色**: 11 种类型 (technology/concept/stage/shortcut/storage/framework/model/layer/priority/tool/metric)
## 7. 数据结构
### 7.1 documents.json
```json
{"id": "uuid", "name": "StreamChatPipeline.md", "source": "seed",
"content_hash": "sha256", "char_count": 3500, "chunk_count": 10,
"status": "ready", "created_at": 1234567890}
```
### 7.2 chunks.json
```json
{"id": "uuid", "doc_id": "uuid", "doc_name": "StreamChatPipeline.md",
"text": "...", "index": 0, "token_count": 128, "embedding": [0.1, 0.2, ...]}
```
### 7.3 graph_nodes.json
```json
{"id": "streamchatpipeline", "name": "StreamChatPipeline", "type": "concept",
"docs": ["uuid1", "uuid2"], "mention_count": 5}
```
### 7.4 graph_edges.json
```json
{"id": "uuid", "source": "streamchatpipeline", "target": "graphrag",
"relation": "integrates_with", "doc_id": "uuid", "weight": 1.0}
```
### 7.5 sessions.json
```json
{"id": "sess-xxx", "messages": [{"role": "user", "content": "...", "ts": 123}],
"summary": "用户咨询了XX", "updated_at": 123}
```