分层的艺术:从 OSI 七层到 Agent OS 五层
1. TL;DR
分层架构是人类应对复杂性的终极武器。从 OSI 七层模型到 TCP/IP 四层,从 MVC 到微服务,再到 Agent OS 的五层架构——分层的本质从未改变:将混沌切割为有意义的边界。
Agent OS 的五层架构:
- 接口层 (Interface):与外部世界对话的窗口
- 编排层 (Orchestration):任务调度与流控制
- 认知层 (Cognition):推理、规划与决策
- 记忆层 (Memory):知识的存储与检索
- 执行层 (Execution):真实世界的行动
记住:好的分层不是物理隔离,而是逻辑边界;层间通信的成本决定了架构的成败。
2. 📋 本文结构
第3章 │ 分层的历史:从 OSI 七层到现代软件架构
───────┼────────────────────────────────────────
第4章 │ 为什么分层有效:关注点分离与抽象的力量
───────┼────────────────────────────────────────
第5章 │ Agent OS 五层架构:重新定义 AI 系统边界
───────┼────────────────────────────────────────
第6章 │ 层间通信:Context 传递的艺术
───────┼────────────────────────────────────────
第7章 │ 何时打破分层:性能与灵活性的权衡
───────┼────────────────────────────────────────
第8章 │ 反直觉洞察:分层架构的隐藏代价
───────┼────────────────────────────────────────
第9章 │ 实战:从零设计分层的 Agent 系统
───────┼────────────────────────────────────────
第10章 │ 结语:分层是一种思维方式
3. 分层架构的历史
3.1 OSI 七层模型:网络通信的圣经
1970年代,ISO 推出了开放系统互连(OSI)参考模型,将网络通信抽象为七个层次:
┌─────────────────────────────────────────────────┐
│ 第7层 │ 应用层 (Application) │ ← HTTP, FTP, SMTP
│ 第6层 │ 表示层 (Presentation) │ ← 加密、压缩、编码
│ 第5层 │ 会话层 (Session) │ ← 连接管理、同步点
│ 第4层 │ 传输层 (Transport) │ ← TCP, UDP
│ 第3层 │ 网络层 (Network) │ ← IP, 路由
│ 第2层 │ 数据链路层 (Data Link) │ ← MAC, 帧
│ 第1层 │ 物理层 (Physical) │ ← 比特流、电缆
└─────────────────────────────────────────────────┘
↓ 每一层只为上一层提供服务
核心原则:
- 封装:每一层不关心其他层的实现细节
- 标准化:层间接口定义明确,允许独立演化
- 分层独立:某层变更不应影响其他层
但 OSI 的问题在于过度设计。七层对于实际应用过于繁琐,会话层和表示层的边界模糊,导致实现困难。
3.2 TCP/IP 四层模型:实用主义的胜利
TCP/IP 协议栈用四层模型取代了 OSI 的七层:
┌─────────────────────────────────────────────────┐
│ 应用层 │ HTTP, FTP, DNS, SSH... │
├─────────────────────────────────────────────────┤
│ 传输层 │ TCP, UDP │
├─────────────────────────────────────────────────┤
│ 网络层 │ IP, ICMP, 路由协议 │
├─────────────────────────────────────────────────┤
│ 网络接口层 │ Ethernet, WiFi, 物理设备 │
└─────────────────────────────────────────────────┘
关键简化:
- 会话层和表示层被并入应用层
- 数据链路层和物理层合并为网络接口层
- 每层职责更清晰,实现更简单
💡 洞察:分层的数量不是固定的,而是取决于问题域的复杂度。合适的分层是刚好能解决问题的那一层数。
3.3 软件架构中的分层模式
MVC(Model-View-Controller)
┌─────────┐ 用户操作 ┌─────────┐
│ View │ ───────────────→ │Controller│
│ (视图) │ │ (控制器) │
└─────────┘ ←─────────────── └────┬────┘
↑ │
│ 数据变化通知 │ 数据操作
└────────────────────────────┘
┌─────────┐
│ Model │
│ (模型) │
└─────────┘
MVC 的核心思想:将数据、逻辑、呈现分离。这种分层使得 UI 可以独立变化而不影响业务逻辑。
三层架构(Presentation/Business/Data)
企业应用中最经典的分层:
┌─────────────────────────────────────────┐
│ 表示层 (Presentation Layer) │
│ REST API / Web / Mobile UI │
├─────────────────────────────────────────┤
│ 业务层 (Business Logic Layer) │
│ Domain Services / Use Cases │
├─────────────────────────────────────────┤
│ 数据层 (Data Access Layer) │
│ Repository / ORM / Database │
└─────────────────────────────────────────┘
微服务的分层
微服务本身也是一种分层——按业务领域垂直分层:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ 用户服务 │ │ 订单服务 │ │ 库存服务 │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
┌────┴────────────┴────────────┴────┐
│ 服务编排层 (API Gateway) │
└────────────────────────────────────┘
4. 为什么分层有效
4.1 关注点分离(Separation of Concerns)
“The secret to building large apps is never build large apps. Break your applications into small pieces. Then, assemble those testable, bite-sized pieces into your big application.” — Justin Meyer
分层架构的核心价值在于将不同的关注点隔离到不同的层。
考虑一个不分层的 Agent 系统:
# 反模式:面条代码
class Agent:
def process(self, user_input):
# 解析输入
parsed = self.parse(user_input)
# 直接调用 LLM
response = requests.post("https://api.openai.com/...", json={...})
# 解析响应
action = json.loads(response.text)
# 直接操作数据库
conn = sqlite3.connect("memory.db")
conn.execute("INSERT INTO history ...")
# 直接调用工具
if action["type"] == "search":
result = self.google_search(action["query"])
elif action["type"] == "file":
with open(action["path"], "w") as f:
f.write(action["content"])
return result
问题在哪里?
- 难以测试:需要模拟 HTTP、数据库、文件系统
- 难以修改:改动一处可能影响全局
- 难以复用:逻辑与具体实现深度耦合
4.2 抽象的力量
分层通过抽象隐藏了底层复杂性:
# 好的分层:每层只关心下一层的接口
class AgentInterface:
"""接口层:只关心如何接收和返回数据"""
def receive(self, message: str) -> Response:
context = self.parse_input(message)
result = self.orchestrator.execute(context)
return self.format_output(result)
class TaskOrchestrator:
"""编排层:只关心任务调度"""
def execute(self, context: Context) -> Result:
plan = self.planner.create_plan(context)
for step in plan.steps:
yield self.execute_step(step)
class CognitionEngine:
"""认知层:只关心推理和决策"""
def decide(self, context: Context, memory: Memory) -> Action:
prompt = self.build_prompt(context, memory.retrieve(context))
return self.llm.generate(prompt)
class MemoryStore:
"""记忆层:只关心知识的存储和检索"""
def retrieve(self, query: Context) -> Knowledge:
return self.vector_db.search(query.embedding)
每一层都只依赖下一层的抽象接口,而不是具体实现。
4.3 可替换性
分层架构的另一个巨大优势是组件可替换性。
# 更换 LLM 提供商?只需修改认知层的实现
class CognitionEngine:
def __init__(self, llm_provider: LLMProvider):
self.llm = llm_provider # 依赖注入
# 可以是 OpenAI
engine = CognitionEngine(OpenAIProvider(api_key="..."))
# 也可以是 Anthropic
engine = CognitionEngine(AnthropicProvider(api_key="..."))
# 甚至是本地模型
engine = CognitionEngine(OllamaProvider(model="llama3"))
💡 分层架构的核心收益:层与层之间是契约关系,而非实现依赖。
5. Agent OS 的五层架构模型
基于对分层架构历史的理解,我们提出了 Agent OS 的五层架构:
┌─────────────────────────────────────────────────────────────┐
│ Agent OS 架构 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 第5层 │ 接口层 (Interface Layer) │ │
│ │ │ WebSocket, HTTP, CLI, Voice │ │
│ │ │ 职责:协议适配、输入验证、响应格式化 │ │
│ └─────────┼─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 第4层 │ 编排层 (Orchestration Layer) │ │
│ │ │ Task Scheduler, Flow Control │ │
│ │ │ 职责:任务分解、并发管理、错误恢复 │ │
│ └─────────┼─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 第3层 │ 认知层 (Cognition Layer) │ │
│ │ │ LLM, Reasoning, Planning │ │
│ │ │ 职责:推理、决策、策略生成 │ │
│ └─────────┼─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 第2层 │ 记忆层 (Memory Layer) │ │
│ │ │ Vector DB, Graph DB, Cache │ │
│ │ │ 职责:知识存储、上下文检索、长期记忆 │ │
│ └─────────┼─────────────────────────────────────────────┘ │
│ ↓ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 第1层 │ 执行层 (Execution Layer) │ │
│ │ │ Tools, APIs, File System, Browser │ │
│ │ │ 职责:实际动作、副作用、外部交互 │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘
5.1 层详解
第5层:接口层 (Interface Layer)
职责:
- 接收用户输入(多模态)
- 协议适配(HTTP/WebSocket/gRPC)
- 输入验证和 sanitization
- 响应格式化
from dataclasses import dataclass
from typing import Optional, Dict, Any
from enum import Enum
class InputType(Enum):
TEXT = "text"
VOICE = "voice"
IMAGE = "image"
FILE = "file"
@dataclass
class UserInput:
content: str
input_type: InputType
metadata: Dict[str, Any]
session_id: str
timestamp: float
class InterfaceLayer:
def __init__(self, orchestrator: OrchestrationLayer):
self.orchestrator = orchestrator
async def handle(self, raw_input: Dict[str, Any]) -> Dict[str, Any]:
# 1. 解析和验证
user_input = self.parse(raw_input)
# 2. 创建 Context
context = Context(
input=user_input,
session_id=user_input.session_id,
metadata=user_input.metadata
)
# 3. 调用编排层
result = await self.orchestrator.execute(context)
# 4. 格式化响应
return self.format_response(result)
def parse(self, raw: Dict[str, Any]) -> UserInput:
# 验证必需字段
if "content" not in raw:
raise ValidationError("Missing 'content' field")
return UserInput(
content=raw["content"],
input_type=InputType(raw.get("type", "text")),
metadata=raw.get("metadata", {}),
session_id=raw.get("session_id", generate_uuid()),
timestamp=time.time()
)
第4层:编排层 (Orchestration Layer)
职责:
- 任务分解与规划
- 子任务调度
- 并发控制
- 错误恢复与重试
from typing import List, AsyncIterator
import asyncio
@dataclass
class Task:
id: str
description: str
dependencies: List[str]
max_retries: int = 3
timeout: float = 30.0
class OrchestrationLayer:
def __init__(self, cognition: CognitionLayer, memory: MemoryLayer):
self.cognition = cognition
self.memory = memory
self.execution_graph: Dict[str, Task] = {}
async def execute(self, context: Context) -> Result:
# 1. 任务规划(调用认知层)
plan = await self.cognition.plan(context)
# 2. 构建执行图
self.build_execution_graph(plan)
# 3. 拓扑排序执行
results = []
for batch in self.topological_batches():
batch_results = await asyncio.gather(
*[self.execute_task(t, context) for t in batch],
return_exceptions=True
)
results.extend(batch_results)
# 4. 聚合结果
return self.aggregate_results(results)
async def execute_task(self, task: Task, context: Context) -> TaskResult:
for attempt in range(task.max_retries):
try:
# 带超时的执行
result = await asyncio.wait_for(
self.cognition.execute(task, context, self.memory),
timeout=task.timeout
)
return TaskResult(success=True, data=result)
except Exception as e:
if attempt == task.max_retries - 1:
return TaskResult(success=False, error=e)
await asyncio.sleep(2 ** attempt) # 指数退避
第3层:认知层 (Cognition Layer)
职责:
- LLM 调用与响应解析
- 推理链(Chain of Thought)
- 工具选择与参数生成
- 策略优化
from abc import ABC, abstractmethod
class LLMProvider(ABC):
@abstractmethod
async def generate(self, prompt: str, **kwargs) -> str:
pass
class CognitionLayer:
def __init__(self, llm: LLMProvider, tools: ToolRegistry):
self.llm = llm
self.tools = tools
self.reasoning_chain: List[Thought] = []
async def plan(self, context: Context) -> Plan:
"""基于 ReAct 模式进行规划"""
system_prompt = """你是一个任务规划专家。请将用户的请求分解为可执行的子任务。
可用工具:
{tool_descriptions}
请以 JSON 格式返回执行计划:
tasks
]
}}"""
prompt = system_prompt.format(
tool_descriptions=self.tools.describe_all()
) + f"\n\n用户请求:{context.input.content}"
response = await self.llm.generate(prompt, temperature=0.2)
return self.parse_plan(response)
async def execute(self, task: Task, context: Context, memory: MemoryLayer) -> Any:
"""执行单个任务"""
# 1. 检索相关记忆
relevant_memory = await memory.retrieve_relevant(context)
# 2. 构建推理上下文
reasoning_context = self.build_reasoning_context(
task, context, relevant_memory
)
# 3. 生成决策
response = await self.llm.generate(reasoning_context)
# 4. 解析动作
action = self.parse_action(response)
# 5. 记录推理过程
self.reasoning_chain.append(Thought(
task_id=task.id,
reasoning=response,
action=action
))
return action
第2层:记忆层 (Memory Layer)
职责:
- 短期记忆(当前对话上下文)
- 长期记忆(知识库检索)
- 语义记忆(向量存储)
- 程序性记忆(技能/工具记忆)
from typing import Protocol
import numpy as np
class VectorStore(Protocol):
async def search(self, embedding: List[float], top_k: int = 5) -> List[Document]:
...
async def upsert(self, documents: List[Document]) -> None:
...
class MemoryLayer:
def __init__(
self,
vector_store: VectorStore,
graph_store: Optional[GraphStore] = None,
cache: Optional[Cache] = None
):
self.vector_store = vector_store
self.graph_store = graph_store
self.cache = cache
self.short_term: List[Message] = [] # 当前对话窗口
async def retrieve_relevant(self, context: Context) -> Knowledge:
"""检索与当前上下文相关的知识"""
# 1. 生成查询嵌入
query_embedding = await self.embed(context.input.content)
# 2. 向量检索
vector_results = await self.vector_store.search(
query_embedding, top_k=5
)
# 3. 图谱检索(如果有)
graph_results = []
if self.graph_store:
graph_results = await self.graph_store.traverse(
start_node=context.input.content,
depth=2
)
# 4. 短期记忆
recent_messages = self.short_term[-10:] # 最近10条
return Knowledge(
long_term=vector_results,
relational=graph_results,
short_term=recent_messages
)
async def remember(self, context: Context, outcome: Any) -> None:
"""将经验存入长期记忆"""
# 编码为向量
embedding = await self.embed(f"{context.input} → {outcome}")
# 存储到向量数据库
document = Document(
id=generate_uuid(),
content=str(outcome),
metadata={
"input": context.input.content,
"timestamp": time.time(),
"session_id": context.session_id
},
embedding=embedding
)
await self.vector_store.upsert([document])
# 更新短期记忆
self.short_term.append(Message(
role="assistant",
content=str(outcome),
timestamp=time.time()
))
第1层:执行层 (Execution Layer)
职责:
- 工具执行
- API 调用
- 文件系统操作
- 浏览器自动化
- 所有与外部世界的交互
import subprocess
import aiohttp
from typing import Callable, Dict
ToolFunction = Callable[..., Any]
class ExecutionLayer:
def __init__(self):
self.tools: Dict[str, ToolFunction] = {}
self._register_default_tools()
def _register_default_tools(self):
"""注册默认工具"""
self.register_tool("search", self.web_search)
self.register_tool("read_file", self.read_file)
self.register_tool("write_file", self.write_file)
self.register_tool("execute_command", self.execute_command)
self.register_tool("http_request", self.http_request)
def register_tool(self, name: str, func: ToolFunction) -> None:
self.tools[name] = func
async def execute(self, action: Action) -> ExecutionResult:
"""执行动作"""
tool_name = action.tool
params = action.parameters
if tool_name not in self.tools:
return ExecutionResult(
success=False,
error=f"Unknown tool: {tool_name}"
)
try:
# 执行前验证
self.validate_params(tool_name, params)
# 执行工具
result = await self.tools[tool_name](**params)
return ExecutionResult(success=True, data=result)
except Exception as e:
return ExecutionResult(success=False, error=str(e))
async def web_search(self, query: str, top_n: int = 5) -> List[SearchResult]:
"""网页搜索"""
# 实际实现会调用搜索引擎 API
pass
async def read_file(self, path: str) -> str:
"""读取文件"""
# 安全检查
if not self.is_path_allowed(path):
raise PermissionError(f"Access denied: {path}")
async with aiofiles.open(path, 'r') as f:
return await f.read()
async def execute_command(self, command: str, timeout: int = 30) -> str:
"""执行系统命令(沙箱化)"""
# 使用受限环境执行
proc = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
limit=1024 * 1024 # 1MB 输出限制
)
try:
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout
)
return stdout.decode()
except asyncio.TimeoutError:
proc.kill()
raise TimeoutError(f"Command timed out after {timeout}s")
6. 分层与跨层通信
6.1 Context 传递模式
层间通信的核心是 Context 对象——它像快递包裹一样在各层之间传递:
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime
@dataclass
class Context:
"""贯穿所有层的上下文对象"""
# 输入信息
input: UserInput
session_id: str
# 执行状态
current_step: int = 0
execution_trace: List[TraceEvent] = field(default_factory=list)
# 层间数据
metadata: Dict[str, Any] = field(default_factory=dict)
# 性能指标
timestamps: Dict[str, datetime] = field(default_factory=dict)
def mark(self, layer: str) -> "Context":
"""记录进入某层的时间"""
self.timestamps[f"enter_{layer}"] = datetime.now()
return self
def fork(self) -> "Context":
"""创建子上下文(用于并发执行)"""
return Context(
input=self.input,
session_id=self.session_id,
metadata=self.metadata.copy()
)
6.2 层间通信的三种模式
模式1:直接调用(Synchronous)
─────────────────────────────────
Layer N Layer N-1
│ │
│ execute(ctx) │
│ ───────────────→ │
│ │
│ result │
│ ←─────────────── │
模式2:事件总线(Event-Driven)
─────────────────────────────────
Layer N Event Bus Layer N-1
│ │ │
│ publish(evt) │ │
│ ───────────────→ │ │
│ │ route(evt) │
│ │ ───────────────→ │
│ │ │
模式3:消息队列(Async Queue)
─────────────────────────────────
Layer N Queue Layer N-1
│ │ │
│ enqueue(msg) │ │
│ ───────────────→ │ │
│ │ dequeue() │
│ │ ←─────────────── │
6.3 事件总线实现
from typing import Callable, Awaitable
import asyncio
EventHandler = Callable[[Event], Awaitable[None]]
class EventBus:
"""层间解耦的事件总线"""
def __init__(self):
self.subscribers: Dict[str, List[EventHandler]] = {}
self.event_history: List[Event] = []
def subscribe(self, event_type: str, handler: EventHandler):
"""订阅特定类型的事件"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event: Event):
"""发布事件到所有订阅者"""
self.event_history.append(event)
handlers = self.subscribers.get(event.type, [])
# 并行执行所有处理器
await asyncio.gather(
*[self._safe_handle(h, event) for h in handlers],
return_exceptions=True
)
async def _safe_handle(self, handler: EventHandler, event: Event):
try:
await handler(event)
except Exception as e:
logger.error(f"Event handler failed: {e}")
# 使用示例
class AgentSystem:
def __init__(self):
self.event_bus = EventBus()
self.interface = InterfaceLayer(self.event_bus)
self.orchestrator = OrchestrationLayer(self.event_bus)
self.cognition = CognitionLayer(self.event_bus)
self.memory = MemoryLayer(self.event_bus)
self.execution = ExecutionLayer(self.event_bus)
self._wire_events()
def _wire_events(self):
"""连接各层的事件流"""
# 接口层 → 编排层
self.event_bus.subscribe("user_input",
lambda e: self.orchestrator.on_user_input(e))
# 编排层 → 认知层
self.event_bus.subscribe("plan_required",
lambda e: self.cognition.on_plan_request(e))
# 认知层 → 执行层
self.event_bus.subscribe("action_ready",
lambda e: self.execution.on_action(e))
# 执行层 → 记忆层(记录执行结果)
self.event_bus.subscribe("execution_complete",
lambda e: self.memory.on_execution_result(e))
7. 何时打破分层
7.1 性能优化场景
分层是有代价的。每一层都增加了:
- 序列化/反序列化开销
- 函数调用开销
- Context 拷贝开销
# 正常分层(可读性好)
async def process(self, context: Context):
validated = await self.interface.validate(context)
planned = await self.orchestrator.plan(validated)
result = await self.cognition.execute(planned)
return result
# 性能优化版本(跳过中间层)
async def process_fast(self, raw_input: str):
# 直接调用认知层,绕过编排层
# 适用于简单、确定性的任务
return await self.cognition.quick_answer(raw_input)
7.2 特殊场景的直接通信
class FastPath:
"""快速路径:允许跨层直接通信"""
FAST_OPERATIONS = {"ping", "health", "echo"}
async def handle(self, context: Context) -> Result:
# 识别是否可以使用快速路径
if context.input.content in self.FAST_OPERATIONS:
# 跳过编排层和认知层,直接返回
return await self.execution.execute_cached(context)
# 否则走正常分层流程
return await self.full_pipeline(context)
7.3 分层的打破原则
💡 打破分层的准则:
- 性能瓶颈已确认:通过 profiling 证明分层是瓶颈
- 场景可预测:快速路径只用于确定性操作
- 可回滚:保留分层版本作为 fallback
- 文档化:明确标注哪些是”捷径”
8. 反直觉洞察
8.1 分层越多 ≠ 架构越好
OSI 七层 vs TCP/IP 四层的教训:分层数量应与问题复杂度匹配。
| 层数 | 适用场景 | 示例 |
|---|---|---|
| 2-3层 | 简单工具、单一职责 | 命令行工具、脚本 |
| 4-5层 | 复杂系统、多阶段处理 | Web 应用、Agent 系统 |
| 6-7层 | 超大规模、极端解耦 | 电信系统、企业级中间件 |
8.2 完美的分层是动态分层的
class AdaptiveLayering:
"""根据任务复杂度动态调整分层深度"""
async def execute(self, task: Task) -> Result:
complexity = self.assess_complexity(task)
if complexity < 0.3:
# 简单任务:单层处理
return await self.execution.direct_execute(task)
elif complexity < 0.7:
# 中等任务:三层架构
return await self.three_layer_execute(task)
else:
# 复杂任务:完整五层
return await self.full_layer_execute(task)
def assess_complexity(self, task: Task) -> float:
"""评估任务复杂度"""
factors = [
len(task.subtasks) / 10, # 子任务数量
len(task.required_tools) / 5, # 涉及工具数
1.0 if task.requires_reasoning else 0.0, # 是否需要推理
1.0 if task.has_side_effects else 0.0, # 是否有副作用
]
return min(sum(factors) / len(factors), 1.0)
8.3 层间依赖的方向比层本身更重要
关键原则:
- ✅ 上层可以依赖下层
- ✅ 同层组件可以相互依赖(谨慎)
- ❌ 下层绝不应该依赖上层
- ❌ 避免循环依赖
# 好的依赖方向
Interface → Orchestration → Cognition → Memory → Execution
# 坏的依赖方向(循环依赖)
Cognition ──→ Memory
↑ │
└──────────┘
8.4 Context 膨胀是分层架构的隐形杀手
随着系统演化,Context 对象会不断膨胀:
# 初期:简洁的 Context
@dataclass
class Context:
input: str
session_id: str
# 演化后:臃肿的 Context
@dataclass
class Context:
input: str
session_id: str
user_profile: UserProfile # 后来添加
permission: Permission # 后来添加
cache: Cache # 后来添加
audit_log: AuditLog # 后来添加
feature_flags: FeatureFlags # 后来添加
# ... 持续膨胀
解决方案:使用分层 Context
@dataclass
class BaseContext:
"""所有层共享的最小上下文"""
input: str
session_id: str
@dataclass
class InterfaceContext(BaseContext):
"""接口层特有数据"""
raw_request: Request
protocol: str
@dataclass
class CognitionContext(BaseContext):
"""认知层特有数据"""
reasoning_chain: List[Thought]
tool_calls: List[ToolCall]
9. 实战:设计分层的 Agent 系统
9.1 需求分析
我们要构建一个代码审查 Agent,具备以下能力:
- 接收代码 diff 或 PR 链接
- 分析代码质量和潜在问题
- 检索历史审查记录作为参考
- 输出结构化的审查报告
9.2 架构设计
┌────────────────────────────────────────────────────────────┐
│ 接口层:支持 GitHub Webhook、CLI、API │
├────────────────────────────────────────────────────────────┤
│ 编排层:解析 PR → 分解文件 → 并发分析 → 聚合报告 │
├────────────────────────────────────────────────────────────┤
│ 认知层:代码分析、问题识别、建议生成 │
├────────────────────────────────────────────────────────────┤
│ 记忆层:历史审查记录、团队规范、最佳实践 │
├────────────────────────────────────────────────────────────┤
│ 执行层:GitHub API、静态分析工具、LLM API │
└────────────────────────────────────────────────────────────┘
9.3 完整代码实现
#!/usr/bin/env python3
"""
Code Review Agent - 分层架构示例
"""
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any, Protocol
from enum import Enum
import asyncio
import hashlib
from datetime import datetime
# ==================== 领域模型 ====================
class Severity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class CodeChange:
file_path: str
diff: str
language: str
additions: int
deletions: int
@dataclass
class ReviewComment:
file_path: str
line_number: int
message: str
severity: Severity
category: str # "security", "performance", "style", etc.
suggestion: Optional[str] = None
@dataclass
class ReviewReport:
pr_id: str
summary: str
comments: List[ReviewComment]
statistics: Dict[str, int]
generated_at: datetime = field(default_factory=datetime.now)
# ==================== 第5层:接口层 ====================
@dataclass
class PRWebhookPayload:
"""GitHub PR Webhook 数据结构"""
action: str
pr_number: int
repository: str
head_sha: str
diff_url: str
author: str
class InterfaceLayer:
"""接口层:处理多种输入协议"""
def __init__(self, orchestrator: "OrchestrationLayer"):
self.orchestrator = orchestrator
async def handle_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理 GitHub Webhook"""
pr_payload = self._parse_webhook(payload)
# 构建上下文
context = ReviewContext(
pr_id=f"{pr_payload.repository}#{pr_payload.pr_number}",
repository=pr_payload.repository,
head_sha=pr_payload.head_sha,
diff_url=pr_payload.diff_url,
author=pr_payload.author,
timestamp=datetime.now()
)
# 调用编排层
report = await self.orchestrator.review(context)
return self._format_response(report)
async def handle_cli(self, repo: str, pr_number: int) -> str:
"""处理 CLI 输入"""
context = ReviewContext(
pr_id=f"{repo}#{pr_number}",
repository=repo,
head_sha="", # 将通过 API 获取
diff_url=f"https://api.github.com/repos/{repo}/pulls/{pr_number}",
author="cli_user",
timestamp=datetime.now()
)
report = await self.orchestrator.review(context)
return self._format_markdown(report)
def _parse_webhook(self, payload: Dict) -> PRWebhookPayload:
"""解析并验证 Webhook 数据"""
return PRWebhookPayload(
action=payload.get("action", ""),
pr_number=payload["pull_request"]["number"],
repository=payload["repository"]["full_name"],
head_sha=payload["pull_request"]["head"]["sha"],
diff_url=payload["pull_request"]["diff_url"],
author=payload["pull_request"]["user"]["login"]
)
def _format_response(self, report: ReviewReport) -> Dict[str, Any]:
return {
"pr_id": report.pr_id,
"summary": report.summary,
"comment_count": len(report.comments),
"critical_issues": report.statistics.get("critical", 0)
}
def _format_markdown(self, report: ReviewReport) -> str:
lines = [
f"# Code Review Report: {report.pr_id}",
"",
f"**Generated:** {report.generated_at}",
"",
"## Summary",
report.summary,
"",
"## Issues Found",
]
for comment in sorted(report.comments, key=lambda c: c.severity.value):
lines.extend([
f"### [{comment.severity.value.upper()}] {comment.category}",
f"**File:** `{comment.file_path}`:{comment.line_number}",
f"{comment.message}",
])
if comment.suggestion:
lines.extend([
"**Suggestion:**",
f"```\n{comment.suggestion}\n```"
])
lines.append("")
return "\n".join(lines)
# ==================== 第4层:编排层 ====================
@dataclass
class ReviewContext:
"""贯穿审查流程的上下文"""
pr_id: str
repository: str
head_sha: str
diff_url: str
author: str
timestamp: datetime
changes: List[CodeChange] = field(default_factory=list)
history_reviews: List[Dict] = field(default_factory=list)
class OrchestrationLayer:
"""编排层:任务分解与调度"""
def __init__(
self,
cognition: "CognitionLayer",
memory: "MemoryLayer",
execution: "ExecutionLayer"
):
self.cognition = cognition
self.memory = memory
self.execution = execution
async def review(self, context: ReviewContext) -> ReviewReport:
"""编排完整的代码审查流程"""
# Phase 1: 获取代码变更(执行层)
context.changes = await self.execution.fetch_diff(context.diff_url)
# Phase 2: 检索历史审查(记忆层)
context.history_reviews = await self.memory.get_similar_reviews(
context.repository, context.author
)
# Phase 3: 并发分析每个文件
file_tasks = [
self._analyze_file(change, context)
for change in context.changes
]
file_results = await asyncio.gather(*file_tasks)
# Phase 4: 聚合所有评论
all_comments = []
for comments in file_results:
all_comments.extend(comments)
# Phase 5: 生成总结(认知层)
summary = await self.cognition.generate_summary(
context.changes, all_comments
)
# Phase 6: 记录本次审查(记忆层)
await self.memory.store_review(context, all_comments)
# 构建报告
return ReviewReport(
pr_id=context.pr_id,
summary=summary,
comments=all_comments,
statistics=self._calculate_stats(all_comments)
)
async def _analyze_file(
self,
change: CodeChange,
context: ReviewContext
) -> List[ReviewComment]:
"""分析单个文件"""
# 并行执行多种分析
analysis_tasks = [
self.cognition.analyze_security(change),
self.cognition.analyze_performance(change),
self.cognition.analyze_style(change, context.history_reviews),
]
results = await asyncio.gather(*analysis_tasks)
# 合并所有发现
all_comments = []
for comments in results:
all_comments.extend(comments)
return all_comments
def _calculate_stats(self, comments: List[ReviewComment]) -> Dict[str, int]:
stats = {"total": len(comments)}
for sev in Severity:
stats[sev.value] = sum(1 for c in comments if c.severity == sev)
return stats
# ==================== 第3层:认知层 ====================
class LLMProvider(Protocol):
"""LLM 提供商抽象"""
async def analyze(self, prompt: str, context: Dict) -> str:
...
class CognitionLayer:
"""认知层:代码分析与推理"""
def __init__(self, llm: LLMProvider):
self.llm = llm
self.security_patterns = self._load_security_patterns()
async def analyze_security(self, change: CodeChange) -> List[ReviewComment]:
"""安全分析"""
prompt = f"""
Analyze the following {change.language} code for security issues:
```{change.language}
{change.diff}
```
Look for: SQL injection, XSS, path traversal, hardcoded secrets, etc.
Return findings as JSON array with file_path, line, severity, message.
"""
response = await self.llm.analyze(prompt, {"type": "security"})
return self._parse_llm_response(response, change.file_path)
async def analyze_performance(self, change: CodeChange) -> List[ReviewComment]:
"""性能分析"""
prompt = f"""
Analyze this {change.language} code for performance issues:
N+1 queries, inefficient loops, memory leaks, etc.
```{change.language}
{change.diff}
```
"""
response = await self.llm.analyze(prompt, {"type": "performance"})
return self._parse_llm_response(response, change.file_path)
async def analyze_style(
self,
change: CodeChange,
history: List[Dict]
) -> List[ReviewComment]:
"""风格与最佳实践分析"""
# 结合团队历史风格偏好
style_context = self._extract_team_patterns(history)
prompt = f"""
Review this code against team style guidelines:
{style_context}
```{change.language}
{change.diff}
```
"""
response = await self.llm.analyze(prompt, {"type": "style"})
return self._parse_llm_response(response, change.file_path)
async def generate_summary(
self,
changes: List[CodeChange],
comments: List[ReviewComment]
) -> str:
"""生成审查总结"""
critical_count = sum(1 for c in comments if c.severity == Severity.CRITICAL)
high_count = sum(1 for c in comments if c.severity == Severity.HIGH)
lines = [
f"Reviewed {len(changes)} files with {len(comments)} comments.",
]
if critical_count > 0:
lines.append(f"⚠️ Found {critical_count} critical security issues that need immediate attention.")
if high_count > 0:
lines.append(f"⚡ Found {high_count} high-priority issues.")
lines.append("Overall code quality assessment based on changes.")
return " ".join(lines)
def _load_security_patterns(self) -> List[str]:
# 加载已知的安全模式
return ["eval(", "exec(", "subprocess.call", "os.system"]
def _extract_team_patterns(self, history: List[Dict]) -> str:
# 从历史审查中提取团队风格偏好
return "Team prefers explicit error handling over exceptions."
def _parse_llm_response(self, response: str, file_path: str) -> List[ReviewComment]:
# 解析 LLM 响应为结构化评论
# 简化实现,实际应使用结构化输出
return []
# ==================== 第2层:记忆层 ====================
class VectorStore(Protocol):
async def search(self, query: str, top_k: int) -> List[Dict]:
...
async def upsert(self, documents: List[Dict]) -> None:
...
class MemoryLayer:
"""记忆层:审查历史存储与检索"""
def __init__(self, vector_store: VectorStore):
self.vector_store = vector_store
self.cache: Dict[str, Any] = {}
async def get_similar_reviews(
self,
repository: str,
author: str
) -> List[Dict]:
"""检索相似的审查记录"""
cache_key = f"{repository}:{author}"
if cache_key in self.cache:
return self.cache[cache_key]
# 向量检索相关审查
query = f"code review patterns for {repository} by {author}"
results = await self.vector_store.search(query, top_k=5)
self.cache[cache_key] = results
return results
async def store_review(
self,
context: ReviewContext,
comments: List[ReviewComment]
) -> None:
"""存储审查记录供未来检索"""
# 构建文档
document = {
"id": hashlib.md5(context.pr_id.encode()).hexdigest(),
"pr_id": context.pr_id,
"repository": context.repository,
"author": context.author,
"comment_count": len(comments),
"categories": list(set(c.category for c in comments)),
"timestamp": context.timestamp.isoformat(),
"content": self._summarize_for_embedding(comments)
}
await self.vector_store.upsert([document])
def _summarize_for_embedding(self, comments: List[ReviewComment]) -> str:
"""生成用于向量化的摘要"""
categories = {}
for c in comments:
categories.setdefault(c.category, []).append(c.message)
parts = []
for cat, msgs in categories.items():
parts.append(f"{cat}: {len(msgs)} issues")
return "; ".join(parts)
# ==================== 第1层:执行层 ====================
class GitHubAPI(Protocol):
async def get_diff(self, url: str) -> str:
...
async def post_comment(self, pr_id: str, comment: str) -> None:
...
class ExecutionLayer:
"""执行层:外部交互与工具执行"""
def __init__(self, github: GitHubAPI, llm_client: Any):
self.github = github
self.llm_client = llm_client
async def fetch_diff(self, diff_url: str) -> List[CodeChange]:
"""获取 PR 的 diff 内容"""
diff_content = await self.github.get_diff(diff_url)
# 解析 diff 为 CodeChange 对象
changes = []
for file_diff in self._split_by_file(diff_content):
change = self._parse_file_diff(file_diff)
changes.append(change)
return changes
async def post_comments(self, pr_id: str, comments: List[ReviewComment]) -> None:
"""将评论发布到 GitHub"""
for comment in comments:
formatted = self._format_github_comment(comment)
await self.github.post_comment(pr_id, formatted)
def _split_by_file(self, diff_content: str) -> List[str]:
"""按文件分割 diff"""
# 简化实现
return diff_content.split("diff --git")[1:]
def _parse_file_diff(self, file_diff: str) -> CodeChange:
"""解析单个文件的 diff"""
lines = file_diff.split("\n")
# 提取文件路径
file_path = lines[0].split(" b/")[-1] if " b/" in lines[0] else "unknown"
# 统计变更
additions = sum(1 for l in lines if l.startswith("+"))
deletions = sum(1 for l in lines if l.startswith("-"))
# 检测语言
language = self._detect_language(file_path)
return CodeChange(
file_path=file_path,
diff=file_diff,
language=language,
additions=additions,
deletions=deletions
)
def _detect_language(self, file_path: str) -> str:
"""根据文件扩展名检测语言"""
ext = file_path.split(".")[-1] if "." in file_path else ""
mapping = {
"py": "python",
"js": "javascript",
"ts": "typescript",
"go": "go",
"rs": "rust",
"java": "java"
}
return mapping.get(ext, "text")
def _format_github_comment(self, comment: ReviewComment) -> str:
"""格式化为 GitHub 评论格式"""
emoji_map = {
Severity.CRITICAL: "🚨",
Severity.HIGH: "⚠️",
Severity.MEDIUM: "💡",
Severity.LOW: "ℹ️",
Severity.INFO: "📝"
}
emoji = emoji_map.get(comment.severity, "💬")
return f"{emoji} **[{comment.category.upper()}]** {comment.message}"
# ==================== 系统组装 ====================
class CodeReviewAgent:
"""完整的代码审查 Agent"""
def __init__(
self,
github_api: GitHubAPI,
llm_provider: LLMProvider,
vector_store: VectorStore
):
# 从底层开始构建
self.execution = ExecutionLayer(github_api, llm_provider)
self.memory = MemoryLayer(vector_store)
self.cognition = CognitionLayer(llm_provider)
self.orchestration = OrchestrationLayer(
self.cognition, self.memory, self.execution
)
self.interface = InterfaceLayer(self.orchestration)
async def review_pr(self, repo: str, pr_number: int) -> str:
"""CLI 入口"""
return await self.interface.handle_cli(repo, pr_number)
async def handle_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Webhook 入口"""
return await self.interface.handle_webhook(payload)
# ==================== 使用示例 ====================
async def main():
"""演示如何使用分层架构的代码审查 Agent"""
# 初始化各层依赖(使用模拟实现)
class MockGitHubAPI:
async def get_diff(self, url: str) -> str:
return """diff --git a/src/main.py b/src/main.py
new file mode 100644
index 0000000..1234567
--- /dev/null
+++ b/src/main.py
@@ -0,0 +1,10 @@
+import os
+
+def process_user_input(user_data):
+ query = f"SELECT * FROM users WHERE id = {user_data['id']}"
+ os.system(f"mysql -e '{query}'")
+ return eval(user_data['expression'])
+"""
class MockLLMProvider:
async def analyze(self, prompt: str, context: Dict) -> str:
# 模拟 LLM 响应
return "[]"
class MockVectorStore:
async def search(self, query: str, top_k: int) -> List[Dict]:
return []
async def upsert(self, documents: List[Dict]) -> None:
pass
# 构建 Agent
agent = CodeReviewAgent(
github_api=MockGitHubAPI(),
llm_provider=MockLLMProvider(),
vector_store=MockVectorStore()
)
# 执行审查
report = await agent.review_pr("myorg/myrepo", 123)
print(report)
if __name__ == "__main__":
asyncio.run(main())
9.4 架构亮点
- 清晰的层边界:每层只关心自己的职责,通过明确的数据结构通信
- 可测试性:每层的依赖都通过接口注入,易于 mock
- 可扩展性:新增分析类型只需扩展认知层,不影响其他层
- 可观测性:Context 贯穿全链路,便于追踪和调试
10. 结语:分层是一种思维方式
分层架构不仅仅是一种技术模式,更是一种思维方式。
“分层的艺术在于知道在哪里画线,而不在于画多少条线。” —— 改编自理查德·费曼
核心原则回顾
| 原则 | 含义 |
|---|---|
| 关注点分离 | 不同的问题由不同的层解决 |
| 抽象 | 上层只看到下层的接口,不看实现 |
| 可替换性 | 层内组件可以独立替换 |
| 最小知识 | 每层只知道它需要知道的 |
何时分层,何时不分层
分层 不分层
─────────────────────────────────────────────
复杂系统 简单脚本
多团队协作 个人工具
长期维护 一次性任务
需要可测试性 快速原型
多个客户端 单一入口
最后的思考
在 AI 时代,分层架构变得更加重要:
- 模型与系统解耦:认知层的 LLM 可以独立演进
- 多模态输入:接口层统一处理文本、语音、图像
- 记忆的可控性:记忆层让 Agent 拥有真正的”智慧”
- 安全与治理:执行层是唯一的副作用来源,便于审计
Agent OS 的五层架构不是终点,而是一个起点。随着 AI 系统的发展,分层模式会不断演化——但分层的本质永远不会改变:
将混沌切割为有意义的边界。
参考阅读
- Computer Networks - Andrew S. Tanenbaum (OSI 模型权威参考)
- Clean Architecture - Robert C. Martin (软件分层原则)
- Designing Data-Intensive Applications - Martin Kleppmann (系统架构设计)
- ReAct Pattern - 推理与行动结合的 Agent 架构
- The Illustrated Transformer - 注意力机制的层次理解
“简单是复杂的终极形式。” —— 达芬奇
💬 评论
💡 使用 GitHub 账号登录 即可参与讨论