1. TL;DR

分层架构是人类应对复杂性的终极武器。从 OSI 七层模型到 TCP/IP 四层,从 MVC 到微服务,再到 Agent OS 的五层架构——分层的本质从未改变:将混沌切割为有意义的边界

Agent OS 的五层架构:

  • 接口层 (Interface):与外部世界对话的窗口
  • 编排层 (Orchestration):任务调度与流控制
  • 认知层 (Cognition):推理、规划与决策
  • 记忆层 (Memory):知识的存储与检索
  • 执行层 (Execution):真实世界的行动

记住:好的分层不是物理隔离,而是逻辑边界;层间通信的成本决定了架构的成败


2. 📋 本文结构

第3章 │ 分层的历史:从 OSI 七层到现代软件架构
───────┼────────────────────────────────────────
第4章 │ 为什么分层有效:关注点分离与抽象的力量
───────┼────────────────────────────────────────
第5章 │ Agent OS 五层架构:重新定义 AI 系统边界
───────┼────────────────────────────────────────
第6章 │ 层间通信:Context 传递的艺术
───────┼────────────────────────────────────────
第7章 │ 何时打破分层:性能与灵活性的权衡
───────┼────────────────────────────────────────
第8章 │ 反直觉洞察:分层架构的隐藏代价
───────┼────────────────────────────────────────
第9章 │ 实战:从零设计分层的 Agent 系统
───────┼────────────────────────────────────────
第10章 │ 结语:分层是一种思维方式

3. 分层架构的历史

3.1 OSI 七层模型:网络通信的圣经

1970年代,ISO 推出了开放系统互连(OSI)参考模型,将网络通信抽象为七个层次:

┌─────────────────────────────────────────────────┐
│  第7层  │  应用层 (Application)                  │ ← HTTP, FTP, SMTP
│  第6层  │  表示层 (Presentation)                 │ ← 加密、压缩、编码
│  第5层  │  会话层 (Session)                      │ ← 连接管理、同步点
│  第4层  │  传输层 (Transport)                    │ ← TCP, UDP
│  第3层  │  网络层 (Network)                      │ ← IP, 路由
│  第2层  │  数据链路层 (Data Link)                │ ← MAC, 帧
│  第1层  │  物理层 (Physical)                     │ ← 比特流、电缆
└─────────────────────────────────────────────────┘
         ↓ 每一层只为上一层提供服务

核心原则

  1. 封装:每一层不关心其他层的实现细节
  2. 标准化:层间接口定义明确,允许独立演化
  3. 分层独立:某层变更不应影响其他层

但 OSI 的问题在于过度设计。七层对于实际应用过于繁琐,会话层和表示层的边界模糊,导致实现困难。

3.2 TCP/IP 四层模型:实用主义的胜利

TCP/IP 协议栈用四层模型取代了 OSI 的七层:

┌─────────────────────────────────────────────────┐
│  应用层      │  HTTP, FTP, DNS, SSH...           │
├─────────────────────────────────────────────────┤
│  传输层      │  TCP, UDP                         │
├─────────────────────────────────────────────────┤
│  网络层      │  IP, ICMP, 路由协议                │
├─────────────────────────────────────────────────┤
│  网络接口层   │  Ethernet, WiFi, 物理设备          │
└─────────────────────────────────────────────────┘

关键简化

  • 会话层和表示层被并入应用层
  • 数据链路层和物理层合并为网络接口层
  • 每层职责更清晰,实现更简单

💡 洞察:分层的数量不是固定的,而是取决于问题域的复杂度。合适的分层是刚好能解决问题的那一层数

3.3 软件架构中的分层模式

MVC(Model-View-Controller)

    ┌─────────┐     用户操作      ┌─────────┐
    │  View   │ ───────────────→ │Controller│
    │ (视图)   │                  │ (控制器) │
    └─────────┘ ←─────────────── └────┬────┘
         ↑                            │
         │      数据变化通知           │ 数据操作
         └────────────────────────────┘
                   ┌─────────┐
                   │  Model  │
                   │ (模型)   │
                   └─────────┘

MVC 的核心思想:将数据、逻辑、呈现分离。这种分层使得 UI 可以独立变化而不影响业务逻辑。

三层架构(Presentation/Business/Data)

企业应用中最经典的分层:

┌─────────────────────────────────────────┐
│     表示层 (Presentation Layer)          │
│    REST API / Web / Mobile UI           │
├─────────────────────────────────────────┤
│     业务层 (Business Logic Layer)        │
│    Domain Services / Use Cases          │
├─────────────────────────────────────────┤
│     数据层 (Data Access Layer)           │
│    Repository / ORM / Database          │
└─────────────────────────────────────────┘

微服务的分层

微服务本身也是一种分层——按业务领域垂直分层

        ┌──────────┐ ┌──────────┐ ┌──────────┐
        │ 用户服务  │ │ 订单服务  │ │ 库存服务  │
        └────┬─────┘ └────┬─────┘ └────┬─────┘
             │            │            │
        ┌────┴────────────┴────────────┴────┐
        │         服务编排层 (API Gateway)     │
        └────────────────────────────────────┘

4. 为什么分层有效

4.1 关注点分离(Separation of Concerns)

“The secret to building large apps is never build large apps. Break your applications into small pieces. Then, assemble those testable, bite-sized pieces into your big application.” — Justin Meyer

分层架构的核心价值在于将不同的关注点隔离到不同的层

考虑一个不分层的 Agent 系统:

# 反模式:面条代码
class Agent:
    def process(self, user_input):
        # 解析输入
        parsed = self.parse(user_input)
        
        # 直接调用 LLM
        response = requests.post("https://api.openai.com/...", json={...})
        
        # 解析响应
        action = json.loads(response.text)
        
        # 直接操作数据库
        conn = sqlite3.connect("memory.db")
        conn.execute("INSERT INTO history ...")
        
        # 直接调用工具
        if action["type"] == "search":
            result = self.google_search(action["query"])
        elif action["type"] == "file":
            with open(action["path"], "w") as f:
                f.write(action["content"])
        
        return result

问题在哪里?

  • 难以测试:需要模拟 HTTP、数据库、文件系统
  • 难以修改:改动一处可能影响全局
  • 难以复用:逻辑与具体实现深度耦合

4.2 抽象的力量

分层通过抽象隐藏了底层复杂性:

# 好的分层:每层只关心下一层的接口

class AgentInterface:
    """接口层:只关心如何接收和返回数据"""
    def receive(self, message: str) -> Response:
        context = self.parse_input(message)
        result = self.orchestrator.execute(context)
        return self.format_output(result)

class TaskOrchestrator:
    """编排层:只关心任务调度"""
    def execute(self, context: Context) -> Result:
        plan = self.planner.create_plan(context)
        for step in plan.steps:
            yield self.execute_step(step)

class CognitionEngine:
    """认知层:只关心推理和决策"""
    def decide(self, context: Context, memory: Memory) -> Action:
        prompt = self.build_prompt(context, memory.retrieve(context))
        return self.llm.generate(prompt)

class MemoryStore:
    """记忆层:只关心知识的存储和检索"""
    def retrieve(self, query: Context) -> Knowledge:
        return self.vector_db.search(query.embedding)

每一层都只依赖下一层的抽象接口,而不是具体实现。

4.3 可替换性

分层架构的另一个巨大优势是组件可替换性

# 更换 LLM 提供商?只需修改认知层的实现

class CognitionEngine:
    def __init__(self, llm_provider: LLMProvider):
        self.llm = llm_provider  # 依赖注入

# 可以是 OpenAI
engine = CognitionEngine(OpenAIProvider(api_key="..."))

# 也可以是 Anthropic
engine = CognitionEngine(AnthropicProvider(api_key="..."))

# 甚至是本地模型
engine = CognitionEngine(OllamaProvider(model="llama3"))

💡 分层架构的核心收益:层与层之间是契约关系,而非实现依赖。


5. Agent OS 的五层架构模型

基于对分层架构历史的理解,我们提出了 Agent OS 的五层架构:

┌─────────────────────────────────────────────────────────────┐
│                    Agent OS 架构                             │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  第5层  │  接口层 (Interface Layer)                  │   │
│  │         │  WebSocket, HTTP, CLI, Voice              │   │
│  │         │  职责:协议适配、输入验证、响应格式化        │   │
│  └─────────┼─────────────────────────────────────────────┘   │
│            ↓                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  第4层  │  编排层 (Orchestration Layer)              │   │
│  │         │  Task Scheduler, Flow Control             │   │
│  │         │  职责:任务分解、并发管理、错误恢复          │   │
│  └─────────┼─────────────────────────────────────────────┘   │
│            ↓                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  第3层  │  认知层 (Cognition Layer)                  │   │
│  │         │  LLM, Reasoning, Planning                 │   │
│  │         │  职责:推理、决策、策略生成                  │   │
│  └─────────┼─────────────────────────────────────────────┘   │
│            ↓                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  第2层  │  记忆层 (Memory Layer)                     │   │
│  │         │  Vector DB, Graph DB, Cache               │   │
│  │         │  职责:知识存储、上下文检索、长期记忆          │   │
│  └─────────┼─────────────────────────────────────────────┘   │
│            ↓                                                │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  第1层  │  执行层 (Execution Layer)                  │   │
│  │         │  Tools, APIs, File System, Browser        │   │
│  │         │  职责:实际动作、副作用、外部交互              │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

5.1 层详解

第5层:接口层 (Interface Layer)

职责

  • 接收用户输入(多模态)
  • 协议适配(HTTP/WebSocket/gRPC)
  • 输入验证和 sanitization
  • 响应格式化
from dataclasses import dataclass
from typing import Optional, Dict, Any
from enum import Enum

class InputType(Enum):
    TEXT = "text"
    VOICE = "voice"
    IMAGE = "image"
    FILE = "file"

@dataclass
class UserInput:
    content: str
    input_type: InputType
    metadata: Dict[str, Any]
    session_id: str
    timestamp: float

class InterfaceLayer:
    def __init__(self, orchestrator: OrchestrationLayer):
        self.orchestrator = orchestrator
    
    async def handle(self, raw_input: Dict[str, Any]) -> Dict[str, Any]:
        # 1. 解析和验证
        user_input = self.parse(raw_input)
        
        # 2. 创建 Context
        context = Context(
            input=user_input,
            session_id=user_input.session_id,
            metadata=user_input.metadata
        )
        
        # 3. 调用编排层
        result = await self.orchestrator.execute(context)
        
        # 4. 格式化响应
        return self.format_response(result)
    
    def parse(self, raw: Dict[str, Any]) -> UserInput:
        # 验证必需字段
        if "content" not in raw:
            raise ValidationError("Missing 'content' field")
        
        return UserInput(
            content=raw["content"],
            input_type=InputType(raw.get("type", "text")),
            metadata=raw.get("metadata", {}),
            session_id=raw.get("session_id", generate_uuid()),
            timestamp=time.time()
        )

第4层:编排层 (Orchestration Layer)

职责

  • 任务分解与规划
  • 子任务调度
  • 并发控制
  • 错误恢复与重试
from typing import List, AsyncIterator
import asyncio

@dataclass
class Task:
    id: str
    description: str
    dependencies: List[str]
    max_retries: int = 3
    timeout: float = 30.0

class OrchestrationLayer:
    def __init__(self, cognition: CognitionLayer, memory: MemoryLayer):
        self.cognition = cognition
        self.memory = memory
        self.execution_graph: Dict[str, Task] = {}
    
    async def execute(self, context: Context) -> Result:
        # 1. 任务规划(调用认知层)
        plan = await self.cognition.plan(context)
        
        # 2. 构建执行图
        self.build_execution_graph(plan)
        
        # 3. 拓扑排序执行
        results = []
        for batch in self.topological_batches():
            batch_results = await asyncio.gather(
                *[self.execute_task(t, context) for t in batch],
                return_exceptions=True
            )
            results.extend(batch_results)
        
        # 4. 聚合结果
        return self.aggregate_results(results)
    
    async def execute_task(self, task: Task, context: Context) -> TaskResult:
        for attempt in range(task.max_retries):
            try:
                # 带超时的执行
                result = await asyncio.wait_for(
                    self.cognition.execute(task, context, self.memory),
                    timeout=task.timeout
                )
                return TaskResult(success=True, data=result)
            except Exception as e:
                if attempt == task.max_retries - 1:
                    return TaskResult(success=False, error=e)
                await asyncio.sleep(2 ** attempt)  # 指数退避

第3层:认知层 (Cognition Layer)

职责

  • LLM 调用与响应解析
  • 推理链(Chain of Thought)
  • 工具选择与参数生成
  • 策略优化
from abc import ABC, abstractmethod

class LLMProvider(ABC):
    @abstractmethod
    async def generate(self, prompt: str, **kwargs) -> str:
        pass

class CognitionLayer:
    def __init__(self, llm: LLMProvider, tools: ToolRegistry):
        self.llm = llm
        self.tools = tools
        self.reasoning_chain: List[Thought] = []
    
    async def plan(self, context: Context) -> Plan:
        """基于 ReAct 模式进行规划"""
        
        system_prompt = """你是一个任务规划专家。请将用户的请求分解为可执行的子任务。
        
可用工具:
{tool_descriptions}

请以 JSON 格式返回执行计划:
tasks
  ]
}}"""
        
        prompt = system_prompt.format(
            tool_descriptions=self.tools.describe_all()
        ) + f"\n\n用户请求:{context.input.content}"
        
        response = await self.llm.generate(prompt, temperature=0.2)
        return self.parse_plan(response)
    
    async def execute(self, task: Task, context: Context, memory: MemoryLayer) -> Any:
        """执行单个任务"""
        
        # 1. 检索相关记忆
        relevant_memory = await memory.retrieve_relevant(context)
        
        # 2. 构建推理上下文
        reasoning_context = self.build_reasoning_context(
            task, context, relevant_memory
        )
        
        # 3. 生成决策
        response = await self.llm.generate(reasoning_context)
        
        # 4. 解析动作
        action = self.parse_action(response)
        
        # 5. 记录推理过程
        self.reasoning_chain.append(Thought(
            task_id=task.id,
            reasoning=response,
            action=action
        ))
        
        return action

第2层:记忆层 (Memory Layer)

职责

  • 短期记忆(当前对话上下文)
  • 长期记忆(知识库检索)
  • 语义记忆(向量存储)
  • 程序性记忆(技能/工具记忆)
from typing import Protocol
import numpy as np

class VectorStore(Protocol):
    async def search(self, embedding: List[float], top_k: int = 5) -> List[Document]:
        ...
    
    async def upsert(self, documents: List[Document]) -> None:
        ...

class MemoryLayer:
    def __init__(
        self,
        vector_store: VectorStore,
        graph_store: Optional[GraphStore] = None,
        cache: Optional[Cache] = None
    ):
        self.vector_store = vector_store
        self.graph_store = graph_store
        self.cache = cache
        self.short_term: List[Message] = []  # 当前对话窗口
    
    async def retrieve_relevant(self, context: Context) -> Knowledge:
        """检索与当前上下文相关的知识"""
        
        # 1. 生成查询嵌入
        query_embedding = await self.embed(context.input.content)
        
        # 2. 向量检索
        vector_results = await self.vector_store.search(
            query_embedding, top_k=5
        )
        
        # 3. 图谱检索(如果有)
        graph_results = []
        if self.graph_store:
            graph_results = await self.graph_store.traverse(
                start_node=context.input.content,
                depth=2
            )
        
        # 4. 短期记忆
        recent_messages = self.short_term[-10:]  # 最近10条
        
        return Knowledge(
            long_term=vector_results,
            relational=graph_results,
            short_term=recent_messages
        )
    
    async def remember(self, context: Context, outcome: Any) -> None:
        """将经验存入长期记忆"""
        
        # 编码为向量
        embedding = await self.embed(f"{context.input}{outcome}")
        
        # 存储到向量数据库
        document = Document(
            id=generate_uuid(),
            content=str(outcome),
            metadata={
                "input": context.input.content,
                "timestamp": time.time(),
                "session_id": context.session_id
            },
            embedding=embedding
        )
        
        await self.vector_store.upsert([document])
        
        # 更新短期记忆
        self.short_term.append(Message(
            role="assistant",
            content=str(outcome),
            timestamp=time.time()
        ))

第1层:执行层 (Execution Layer)

职责

  • 工具执行
  • API 调用
  • 文件系统操作
  • 浏览器自动化
  • 所有与外部世界的交互
import subprocess
import aiohttp
from typing import Callable, Dict

ToolFunction = Callable[..., Any]

class ExecutionLayer:
    def __init__(self):
        self.tools: Dict[str, ToolFunction] = {}
        self._register_default_tools()
    
    def _register_default_tools(self):
        """注册默认工具"""
        self.register_tool("search", self.web_search)
        self.register_tool("read_file", self.read_file)
        self.register_tool("write_file", self.write_file)
        self.register_tool("execute_command", self.execute_command)
        self.register_tool("http_request", self.http_request)
    
    def register_tool(self, name: str, func: ToolFunction) -> None:
        self.tools[name] = func
    
    async def execute(self, action: Action) -> ExecutionResult:
        """执行动作"""
        
        tool_name = action.tool
        params = action.parameters
        
        if tool_name not in self.tools:
            return ExecutionResult(
                success=False,
                error=f"Unknown tool: {tool_name}"
            )
        
        try:
            # 执行前验证
            self.validate_params(tool_name, params)
            
            # 执行工具
            result = await self.tools[tool_name](**params)
            
            return ExecutionResult(success=True, data=result)
            
        except Exception as e:
            return ExecutionResult(success=False, error=str(e))
    
    async def web_search(self, query: str, top_n: int = 5) -> List[SearchResult]:
        """网页搜索"""
        # 实际实现会调用搜索引擎 API
        pass
    
    async def read_file(self, path: str) -> str:
        """读取文件"""
        # 安全检查
        if not self.is_path_allowed(path):
            raise PermissionError(f"Access denied: {path}")
        
        async with aiofiles.open(path, 'r') as f:
            return await f.read()
    
    async def execute_command(self, command: str, timeout: int = 30) -> str:
        """执行系统命令(沙箱化)"""
        # 使用受限环境执行
        proc = await asyncio.create_subprocess_shell(
            command,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            limit=1024 * 1024  # 1MB 输出限制
        )
        
        try:
            stdout, stderr = await asyncio.wait_for(
                proc.communicate(),
                timeout=timeout
            )
            return stdout.decode()
        except asyncio.TimeoutError:
            proc.kill()
            raise TimeoutError(f"Command timed out after {timeout}s")

6. 分层与跨层通信

6.1 Context 传递模式

层间通信的核心是 Context 对象——它像快递包裹一样在各层之间传递:

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
from datetime import datetime

@dataclass
class Context:
    """贯穿所有层的上下文对象"""
    
    # 输入信息
    input: UserInput
    session_id: str
    
    # 执行状态
    current_step: int = 0
    execution_trace: List[TraceEvent] = field(default_factory=list)
    
    # 层间数据
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    # 性能指标
    timestamps: Dict[str, datetime] = field(default_factory=dict)
    
    def mark(self, layer: str) -> "Context":
        """记录进入某层的时间"""
        self.timestamps[f"enter_{layer}"] = datetime.now()
        return self
    
    def fork(self) -> "Context":
        """创建子上下文(用于并发执行)"""
        return Context(
            input=self.input,
            session_id=self.session_id,
            metadata=self.metadata.copy()
        )

6.2 层间通信的三种模式

模式1:直接调用(Synchronous)
─────────────────────────────────
Layer N          Layer N-1
   │                  │
   │  execute(ctx)    │
   │ ───────────────→ │
   │                  │
   │  result          │
   │ ←─────────────── │


模式2:事件总线(Event-Driven)
─────────────────────────────────
Layer N          Event Bus          Layer N-1
   │                  │                  │
   │  publish(evt)    │                  │
   │ ───────────────→ │                  │
   │                  │  route(evt)      │
   │                  │ ───────────────→ │
   │                  │                  │


模式3:消息队列(Async Queue)
─────────────────────────────────
Layer N          Queue              Layer N-1
   │                  │                  │
   │  enqueue(msg)    │                  │
   │ ───────────────→ │                  │
   │                  │  dequeue()       │
   │                  │ ←─────────────── │

6.3 事件总线实现

from typing import Callable, Awaitable
import asyncio

EventHandler = Callable[[Event], Awaitable[None]]

class EventBus:
    """层间解耦的事件总线"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[EventHandler]] = {}
        self.event_history: List[Event] = []
    
    def subscribe(self, event_type: str, handler: EventHandler):
        """订阅特定类型的事件"""
        if event_type not in self.subscribers:
            self.subscribers[event_type] = []
        self.subscribers[event_type].append(handler)
    
    async def publish(self, event: Event):
        """发布事件到所有订阅者"""
        self.event_history.append(event)
        
        handlers = self.subscribers.get(event.type, [])
        
        # 并行执行所有处理器
        await asyncio.gather(
            *[self._safe_handle(h, event) for h in handlers],
            return_exceptions=True
        )
    
    async def _safe_handle(self, handler: EventHandler, event: Event):
        try:
            await handler(event)
        except Exception as e:
            logger.error(f"Event handler failed: {e}")


# 使用示例
class AgentSystem:
    def __init__(self):
        self.event_bus = EventBus()
        self.interface = InterfaceLayer(self.event_bus)
        self.orchestrator = OrchestrationLayer(self.event_bus)
        self.cognition = CognitionLayer(self.event_bus)
        self.memory = MemoryLayer(self.event_bus)
        self.execution = ExecutionLayer(self.event_bus)
        
        self._wire_events()
    
    def _wire_events(self):
        """连接各层的事件流"""
        
        # 接口层 → 编排层
        self.event_bus.subscribe("user_input", 
            lambda e: self.orchestrator.on_user_input(e))
        
        # 编排层 → 认知层
        self.event_bus.subscribe("plan_required",
            lambda e: self.cognition.on_plan_request(e))
        
        # 认知层 → 执行层
        self.event_bus.subscribe("action_ready",
            lambda e: self.execution.on_action(e))
        
        # 执行层 → 记忆层(记录执行结果)
        self.event_bus.subscribe("execution_complete",
            lambda e: self.memory.on_execution_result(e))

7. 何时打破分层

7.1 性能优化场景

分层是有代价的。每一层都增加了:

  • 序列化/反序列化开销
  • 函数调用开销
  • Context 拷贝开销
# 正常分层(可读性好)
async def process(self, context: Context):
    validated = await self.interface.validate(context)
    planned = await self.orchestrator.plan(validated)
    result = await self.cognition.execute(planned)
    return result

# 性能优化版本(跳过中间层)
async def process_fast(self, raw_input: str):
    # 直接调用认知层,绕过编排层
    # 适用于简单、确定性的任务
    return await self.cognition.quick_answer(raw_input)

7.2 特殊场景的直接通信

class FastPath:
    """快速路径:允许跨层直接通信"""
    
    FAST_OPERATIONS = {"ping", "health", "echo"}
    
    async def handle(self, context: Context) -> Result:
        # 识别是否可以使用快速路径
        if context.input.content in self.FAST_OPERATIONS:
            # 跳过编排层和认知层,直接返回
            return await self.execution.execute_cached(context)
        
        # 否则走正常分层流程
        return await self.full_pipeline(context)

7.3 分层的打破原则

💡 打破分层的准则

  1. 性能瓶颈已确认:通过 profiling 证明分层是瓶颈
  2. 场景可预测:快速路径只用于确定性操作
  3. 可回滚:保留分层版本作为 fallback
  4. 文档化:明确标注哪些是”捷径”

8. 反直觉洞察

8.1 分层越多 ≠ 架构越好

OSI 七层 vs TCP/IP 四层的教训:分层数量应与问题复杂度匹配

层数 适用场景 示例
2-3层 简单工具、单一职责 命令行工具、脚本
4-5层 复杂系统、多阶段处理 Web 应用、Agent 系统
6-7层 超大规模、极端解耦 电信系统、企业级中间件

8.2 完美的分层是动态分层的

class AdaptiveLayering:
    """根据任务复杂度动态调整分层深度"""
    
    async def execute(self, task: Task) -> Result:
        complexity = self.assess_complexity(task)
        
        if complexity < 0.3:
            # 简单任务:单层处理
            return await self.execution.direct_execute(task)
        elif complexity < 0.7:
            # 中等任务:三层架构
            return await self.three_layer_execute(task)
        else:
            # 复杂任务:完整五层
            return await self.full_layer_execute(task)
    
    def assess_complexity(self, task: Task) -> float:
        """评估任务复杂度"""
        factors = [
            len(task.subtasks) / 10,           # 子任务数量
            len(task.required_tools) / 5,       # 涉及工具数
            1.0 if task.requires_reasoning else 0.0,  # 是否需要推理
            1.0 if task.has_side_effects else 0.0,    # 是否有副作用
        ]
        return min(sum(factors) / len(factors), 1.0)

8.3 层间依赖的方向比层本身更重要

关键原则

  • ✅ 上层可以依赖下层
  • ✅ 同层组件可以相互依赖(谨慎)
  • ❌ 下层绝不应该依赖上层
  • ❌ 避免循环依赖
# 好的依赖方向
Interface  Orchestration  Cognition  Memory  Execution

# 坏的依赖方向(循环依赖)
Cognition ──→ Memory
              
    └──────────┘

8.4 Context 膨胀是分层架构的隐形杀手

随着系统演化,Context 对象会不断膨胀:

# 初期:简洁的 Context
@dataclass
class Context:
    input: str
    session_id: str

# 演化后:臃肿的 Context
@dataclass
class Context:
    input: str
    session_id: str
    user_profile: UserProfile      # 后来添加
    permission: Permission         # 后来添加
    cache: Cache                   # 后来添加
    audit_log: AuditLog            # 后来添加
    feature_flags: FeatureFlags    # 后来添加
    # ... 持续膨胀

解决方案:使用分层 Context

@dataclass
class BaseContext:
    """所有层共享的最小上下文"""
    input: str
    session_id: str

@dataclass
class InterfaceContext(BaseContext):
    """接口层特有数据"""
    raw_request: Request
    protocol: str

@dataclass
class CognitionContext(BaseContext):
    """认知层特有数据"""
    reasoning_chain: List[Thought]
    tool_calls: List[ToolCall]

9. 实战:设计分层的 Agent 系统

9.1 需求分析

我们要构建一个代码审查 Agent,具备以下能力:

  1. 接收代码 diff 或 PR 链接
  2. 分析代码质量和潜在问题
  3. 检索历史审查记录作为参考
  4. 输出结构化的审查报告

9.2 架构设计

┌────────────────────────────────────────────────────────────┐
│  接口层:支持 GitHub Webhook、CLI、API                      │
├────────────────────────────────────────────────────────────┤
│  编排层:解析 PR → 分解文件 → 并发分析 → 聚合报告            │
├────────────────────────────────────────────────────────────┤
│  认知层:代码分析、问题识别、建议生成                        │
├────────────────────────────────────────────────────────────┤
│  记忆层:历史审查记录、团队规范、最佳实践                    │
├────────────────────────────────────────────────────────────┤
│  执行层:GitHub API、静态分析工具、LLM API                  │
└────────────────────────────────────────────────────────────┘

9.3 完整代码实现

#!/usr/bin/env python3
"""
Code Review Agent - 分层架构示例
"""

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Any, Protocol
from enum import Enum
import asyncio
import hashlib
from datetime import datetime

# ==================== 领域模型 ====================

class Severity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class CodeChange:
    file_path: str
    diff: str
    language: str
    additions: int
    deletions: int

@dataclass
class ReviewComment:
    file_path: str
    line_number: int
    message: str
    severity: Severity
    category: str  # "security", "performance", "style", etc.
    suggestion: Optional[str] = None

@dataclass
class ReviewReport:
    pr_id: str
    summary: str
    comments: List[ReviewComment]
    statistics: Dict[str, int]
    generated_at: datetime = field(default_factory=datetime.now)


# ==================== 第5层:接口层 ====================

@dataclass
class PRWebhookPayload:
    """GitHub PR Webhook 数据结构"""
    action: str
    pr_number: int
    repository: str
    head_sha: str
    diff_url: str
    author: str


class InterfaceLayer:
    """接口层:处理多种输入协议"""
    
    def __init__(self, orchestrator: "OrchestrationLayer"):
        self.orchestrator = orchestrator
    
    async def handle_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """处理 GitHub Webhook"""
        pr_payload = self._parse_webhook(payload)
        
        # 构建上下文
        context = ReviewContext(
            pr_id=f"{pr_payload.repository}#{pr_payload.pr_number}",
            repository=pr_payload.repository,
            head_sha=pr_payload.head_sha,
            diff_url=pr_payload.diff_url,
            author=pr_payload.author,
            timestamp=datetime.now()
        )
        
        # 调用编排层
        report = await self.orchestrator.review(context)
        
        return self._format_response(report)
    
    async def handle_cli(self, repo: str, pr_number: int) -> str:
        """处理 CLI 输入"""
        context = ReviewContext(
            pr_id=f"{repo}#{pr_number}",
            repository=repo,
            head_sha="",  # 将通过 API 获取
            diff_url=f"https://api.github.com/repos/{repo}/pulls/{pr_number}",
            author="cli_user",
            timestamp=datetime.now()
        )
        
        report = await self.orchestrator.review(context)
        return self._format_markdown(report)
    
    def _parse_webhook(self, payload: Dict) -> PRWebhookPayload:
        """解析并验证 Webhook 数据"""
        return PRWebhookPayload(
            action=payload.get("action", ""),
            pr_number=payload["pull_request"]["number"],
            repository=payload["repository"]["full_name"],
            head_sha=payload["pull_request"]["head"]["sha"],
            diff_url=payload["pull_request"]["diff_url"],
            author=payload["pull_request"]["user"]["login"]
        )
    
    def _format_response(self, report: ReviewReport) -> Dict[str, Any]:
        return {
            "pr_id": report.pr_id,
            "summary": report.summary,
            "comment_count": len(report.comments),
            "critical_issues": report.statistics.get("critical", 0)
        }
    
    def _format_markdown(self, report: ReviewReport) -> str:
        lines = [
            f"# Code Review Report: {report.pr_id}",
            "",
            f"**Generated:** {report.generated_at}",
            "",
            "## Summary",
            report.summary,
            "",
            "## Issues Found",
        ]
        
        for comment in sorted(report.comments, key=lambda c: c.severity.value):
            lines.extend([
                f"### [{comment.severity.value.upper()}] {comment.category}",
                f"**File:** `{comment.file_path}`:{comment.line_number}",
                f"{comment.message}",
            ])
            if comment.suggestion:
                lines.extend([
                    "**Suggestion:**",
                    f"```\n{comment.suggestion}\n```"
                ])
            lines.append("")
        
        return "\n".join(lines)


# ==================== 第4层:编排层 ====================

@dataclass
class ReviewContext:
    """贯穿审查流程的上下文"""
    pr_id: str
    repository: str
    head_sha: str
    diff_url: str
    author: str
    timestamp: datetime
    changes: List[CodeChange] = field(default_factory=list)
    history_reviews: List[Dict] = field(default_factory=list)


class OrchestrationLayer:
    """编排层:任务分解与调度"""
    
    def __init__(
        self,
        cognition: "CognitionLayer",
        memory: "MemoryLayer",
        execution: "ExecutionLayer"
    ):
        self.cognition = cognition
        self.memory = memory
        self.execution = execution
    
    async def review(self, context: ReviewContext) -> ReviewReport:
        """编排完整的代码审查流程"""
        
        # Phase 1: 获取代码变更(执行层)
        context.changes = await self.execution.fetch_diff(context.diff_url)
        
        # Phase 2: 检索历史审查(记忆层)
        context.history_reviews = await self.memory.get_similar_reviews(
            context.repository, context.author
        )
        
        # Phase 3: 并发分析每个文件
        file_tasks = [
            self._analyze_file(change, context)
            for change in context.changes
        ]
        file_results = await asyncio.gather(*file_tasks)
        
        # Phase 4: 聚合所有评论
        all_comments = []
        for comments in file_results:
            all_comments.extend(comments)
        
        # Phase 5: 生成总结(认知层)
        summary = await self.cognition.generate_summary(
            context.changes, all_comments
        )
        
        # Phase 6: 记录本次审查(记忆层)
        await self.memory.store_review(context, all_comments)
        
        # 构建报告
        return ReviewReport(
            pr_id=context.pr_id,
            summary=summary,
            comments=all_comments,
            statistics=self._calculate_stats(all_comments)
        )
    
    async def _analyze_file(
        self,
        change: CodeChange,
        context: ReviewContext
    ) -> List[ReviewComment]:
        """分析单个文件"""
        
        # 并行执行多种分析
        analysis_tasks = [
            self.cognition.analyze_security(change),
            self.cognition.analyze_performance(change),
            self.cognition.analyze_style(change, context.history_reviews),
        ]
        
        results = await asyncio.gather(*analysis_tasks)
        
        # 合并所有发现
        all_comments = []
        for comments in results:
            all_comments.extend(comments)
        
        return all_comments
    
    def _calculate_stats(self, comments: List[ReviewComment]) -> Dict[str, int]:
        stats = {"total": len(comments)}
        for sev in Severity:
            stats[sev.value] = sum(1 for c in comments if c.severity == sev)
        return stats


# ==================== 第3层:认知层 ====================

class LLMProvider(Protocol):
    """LLM 提供商抽象"""
    async def analyze(self, prompt: str, context: Dict) -> str:
        ...


class CognitionLayer:
    """认知层:代码分析与推理"""
    
    def __init__(self, llm: LLMProvider):
        self.llm = llm
        self.security_patterns = self._load_security_patterns()
    
    async def analyze_security(self, change: CodeChange) -> List[ReviewComment]:
        """安全分析"""
        prompt = f"""
        Analyze the following {change.language} code for security issues:
        
        ```{change.language}
        {change.diff}
        ```
        
        Look for: SQL injection, XSS, path traversal, hardcoded secrets, etc.
        Return findings as JSON array with file_path, line, severity, message.
        """
        
        response = await self.llm.analyze(prompt, {"type": "security"})
        return self._parse_llm_response(response, change.file_path)
    
    async def analyze_performance(self, change: CodeChange) -> List[ReviewComment]:
        """性能分析"""
        prompt = f"""
        Analyze this {change.language} code for performance issues:
        N+1 queries, inefficient loops, memory leaks, etc.
        
        ```{change.language}
        {change.diff}
        ```
        """
        
        response = await self.llm.analyze(prompt, {"type": "performance"})
        return self._parse_llm_response(response, change.file_path)
    
    async def analyze_style(
        self,
        change: CodeChange,
        history: List[Dict]
    ) -> List[ReviewComment]:
        """风格与最佳实践分析"""
        
        # 结合团队历史风格偏好
        style_context = self._extract_team_patterns(history)
        
        prompt = f"""
        Review this code against team style guidelines:
        {style_context}
        
        ```{change.language}
        {change.diff}
        ```
        """
        
        response = await self.llm.analyze(prompt, {"type": "style"})
        return self._parse_llm_response(response, change.file_path)
    
    async def generate_summary(
        self,
        changes: List[CodeChange],
        comments: List[ReviewComment]
    ) -> str:
        """生成审查总结"""
        
        critical_count = sum(1 for c in comments if c.severity == Severity.CRITICAL)
        high_count = sum(1 for c in comments if c.severity == Severity.HIGH)
        
        lines = [
            f"Reviewed {len(changes)} files with {len(comments)} comments.",
        ]
        
        if critical_count > 0:
            lines.append(f"⚠️ Found {critical_count} critical security issues that need immediate attention.")
        if high_count > 0:
            lines.append(f"⚡ Found {high_count} high-priority issues.")
        
        lines.append("Overall code quality assessment based on changes.")
        
        return " ".join(lines)
    
    def _load_security_patterns(self) -> List[str]:
        # 加载已知的安全模式
        return ["eval(", "exec(", "subprocess.call", "os.system"]
    
    def _extract_team_patterns(self, history: List[Dict]) -> str:
        # 从历史审查中提取团队风格偏好
        return "Team prefers explicit error handling over exceptions."
    
    def _parse_llm_response(self, response: str, file_path: str) -> List[ReviewComment]:
        # 解析 LLM 响应为结构化评论
        # 简化实现,实际应使用结构化输出
        return []


# ==================== 第2层:记忆层 ====================

class VectorStore(Protocol):
    async def search(self, query: str, top_k: int) -> List[Dict]:
        ...
    async def upsert(self, documents: List[Dict]) -> None:
        ...


class MemoryLayer:
    """记忆层:审查历史存储与检索"""
    
    def __init__(self, vector_store: VectorStore):
        self.vector_store = vector_store
        self.cache: Dict[str, Any] = {}
    
    async def get_similar_reviews(
        self,
        repository: str,
        author: str
    ) -> List[Dict]:
        """检索相似的审查记录"""
        
        cache_key = f"{repository}:{author}"
        if cache_key in self.cache:
            return self.cache[cache_key]
        
        # 向量检索相关审查
        query = f"code review patterns for {repository} by {author}"
        results = await self.vector_store.search(query, top_k=5)
        
        self.cache[cache_key] = results
        return results
    
    async def store_review(
        self,
        context: ReviewContext,
        comments: List[ReviewComment]
    ) -> None:
        """存储审查记录供未来检索"""
        
        # 构建文档
        document = {
            "id": hashlib.md5(context.pr_id.encode()).hexdigest(),
            "pr_id": context.pr_id,
            "repository": context.repository,
            "author": context.author,
            "comment_count": len(comments),
            "categories": list(set(c.category for c in comments)),
            "timestamp": context.timestamp.isoformat(),
            "content": self._summarize_for_embedding(comments)
        }
        
        await self.vector_store.upsert([document])
    
    def _summarize_for_embedding(self, comments: List[ReviewComment]) -> str:
        """生成用于向量化的摘要"""
        categories = {}
        for c in comments:
            categories.setdefault(c.category, []).append(c.message)
        
        parts = []
        for cat, msgs in categories.items():
            parts.append(f"{cat}: {len(msgs)} issues")
        
        return "; ".join(parts)


# ==================== 第1层:执行层 ====================

class GitHubAPI(Protocol):
    async def get_diff(self, url: str) -> str:
        ...
    async def post_comment(self, pr_id: str, comment: str) -> None:
        ...


class ExecutionLayer:
    """执行层:外部交互与工具执行"""
    
    def __init__(self, github: GitHubAPI, llm_client: Any):
        self.github = github
        self.llm_client = llm_client
    
    async def fetch_diff(self, diff_url: str) -> List[CodeChange]:
        """获取 PR 的 diff 内容"""
        
        diff_content = await self.github.get_diff(diff_url)
        
        # 解析 diff 为 CodeChange 对象
        changes = []
        for file_diff in self._split_by_file(diff_content):
            change = self._parse_file_diff(file_diff)
            changes.append(change)
        
        return changes
    
    async def post_comments(self, pr_id: str, comments: List[ReviewComment]) -> None:
        """将评论发布到 GitHub"""
        for comment in comments:
            formatted = self._format_github_comment(comment)
            await self.github.post_comment(pr_id, formatted)
    
    def _split_by_file(self, diff_content: str) -> List[str]:
        """按文件分割 diff"""
        # 简化实现
        return diff_content.split("diff --git")[1:]
    
    def _parse_file_diff(self, file_diff: str) -> CodeChange:
        """解析单个文件的 diff"""
        lines = file_diff.split("\n")
        
        # 提取文件路径
        file_path = lines[0].split(" b/")[-1] if " b/" in lines[0] else "unknown"
        
        # 统计变更
        additions = sum(1 for l in lines if l.startswith("+"))
        deletions = sum(1 for l in lines if l.startswith("-"))
        
        # 检测语言
        language = self._detect_language(file_path)
        
        return CodeChange(
            file_path=file_path,
            diff=file_diff,
            language=language,
            additions=additions,
            deletions=deletions
        )
    
    def _detect_language(self, file_path: str) -> str:
        """根据文件扩展名检测语言"""
        ext = file_path.split(".")[-1] if "." in file_path else ""
        mapping = {
            "py": "python",
            "js": "javascript",
            "ts": "typescript",
            "go": "go",
            "rs": "rust",
            "java": "java"
        }
        return mapping.get(ext, "text")
    
    def _format_github_comment(self, comment: ReviewComment) -> str:
        """格式化为 GitHub 评论格式"""
        emoji_map = {
            Severity.CRITICAL: "🚨",
            Severity.HIGH: "⚠️",
            Severity.MEDIUM: "💡",
            Severity.LOW: "ℹ️",
            Severity.INFO: "📝"
        }
        
        emoji = emoji_map.get(comment.severity, "💬")
        return f"{emoji} **[{comment.category.upper()}]** {comment.message}"


# ==================== 系统组装 ====================

class CodeReviewAgent:
    """完整的代码审查 Agent"""
    
    def __init__(
        self,
        github_api: GitHubAPI,
        llm_provider: LLMProvider,
        vector_store: VectorStore
    ):
        # 从底层开始构建
        self.execution = ExecutionLayer(github_api, llm_provider)
        self.memory = MemoryLayer(vector_store)
        self.cognition = CognitionLayer(llm_provider)
        self.orchestration = OrchestrationLayer(
            self.cognition, self.memory, self.execution
        )
        self.interface = InterfaceLayer(self.orchestration)
    
    async def review_pr(self, repo: str, pr_number: int) -> str:
        """CLI 入口"""
        return await self.interface.handle_cli(repo, pr_number)
    
    async def handle_webhook(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Webhook 入口"""
        return await self.interface.handle_webhook(payload)


# ==================== 使用示例 ====================

async def main():
    """演示如何使用分层架构的代码审查 Agent"""
    
    # 初始化各层依赖(使用模拟实现)
    class MockGitHubAPI:
        async def get_diff(self, url: str) -> str:
            return """diff --git a/src/main.py b/src/main.py
new file mode 100644
index 0000000..1234567
--- /dev/null
+++ b/src/main.py
@@ -0,0 +1,10 @@
+import os
+
+def process_user_input(user_data):
+    query = f"SELECT * FROM users WHERE id = {user_data['id']}"
+    os.system(f"mysql -e '{query}'")
+    return eval(user_data['expression'])
+"""
    
    class MockLLMProvider:
        async def analyze(self, prompt: str, context: Dict) -> str:
            # 模拟 LLM 响应
            return "[]"
    
    class MockVectorStore:
        async def search(self, query: str, top_k: int) -> List[Dict]:
            return []
        async def upsert(self, documents: List[Dict]) -> None:
            pass
    
    # 构建 Agent
    agent = CodeReviewAgent(
        github_api=MockGitHubAPI(),
        llm_provider=MockLLMProvider(),
        vector_store=MockVectorStore()
    )
    
    # 执行审查
    report = await agent.review_pr("myorg/myrepo", 123)
    print(report)


if __name__ == "__main__":
    asyncio.run(main())

9.4 架构亮点

  1. 清晰的层边界:每层只关心自己的职责,通过明确的数据结构通信
  2. 可测试性:每层的依赖都通过接口注入,易于 mock
  3. 可扩展性:新增分析类型只需扩展认知层,不影响其他层
  4. 可观测性:Context 贯穿全链路,便于追踪和调试

10. 结语:分层是一种思维方式

分层架构不仅仅是一种技术模式,更是一种思维方式

“分层的艺术在于知道在哪里画线,而不在于画多少条线。” —— 改编自理查德·费曼

核心原则回顾

原则 含义
关注点分离 不同的问题由不同的层解决
抽象 上层只看到下层的接口,不看实现
可替换性 层内组件可以独立替换
最小知识 每层只知道它需要知道的

何时分层,何时不分层

分层                    不分层
─────────────────────────────────────────────
复杂系统               简单脚本
多团队协作             个人工具
长期维护               一次性任务
需要可测试性            快速原型
多个客户端             单一入口

最后的思考

在 AI 时代,分层架构变得更加重要:

  1. 模型与系统解耦:认知层的 LLM 可以独立演进
  2. 多模态输入:接口层统一处理文本、语音、图像
  3. 记忆的可控性:记忆层让 Agent 拥有真正的”智慧”
  4. 安全与治理:执行层是唯一的副作用来源,便于审计

Agent OS 的五层架构不是终点,而是一个起点。随着 AI 系统的发展,分层模式会不断演化——但分层的本质永远不会改变:

将混沌切割为有意义的边界。


参考阅读

  1. Computer Networks - Andrew S. Tanenbaum (OSI 模型权威参考)
  2. Clean Architecture - Robert C. Martin (软件分层原则)
  3. Designing Data-Intensive Applications - Martin Kleppmann (系统架构设计)
  4. ReAct Pattern - 推理与行动结合的 Agent 架构
  5. The Illustrated Transformer - 注意力机制的层次理解

“简单是复杂的终极形式。” —— 达芬奇