LangGraph 进阶
使用 LangGraph 构建分层自治的金融分析系统 (附核心代码实现)
1. 系统概述与设计哲学
笔记:此部分为高阶架构思想,用于理解系统设计的“为什么”,非常适合在面试中作为开场和展示架构认知深度的部分。
1.1. 系统目标
构建一个持久化、自适应的 AI 自治系统,模拟量化交易团队的工作流。该系统能够持续监控市场数据,自主形成投资假设,通过动态代码生成进行回测验证,并最终产出可执行的交易提案。
1.2. 核心架构原则
分层抽象与委托 (Hierarchical Abstraction & Delegation):系统采用“图之图”(Graph-of-Graphs)的层次结构。高阶的“主图”负责战略,独立的“子图”负责执行。
异步事件驱动通信 (Asynchronous, Event-Driven Communication):各图智能体通过共享的持久化状态存储(如数据库)进行解耦的异步通信,而非直接调用。
动态能力生成 (Dynamic Capability Generation):系统的核心是能够根据问题动态生成并执行代码,而非依赖固定的工具集。
状态持久化与长时运行 (Stateful, Long-Running Operation):系统被设计为可长时间运行的服务,其状态可被持久化,允许任务中断与恢复。
2. 系统组件详解 (附状态定义)
系统由三个核心的专家图(Specialist Graphs)构成。
2.1. MarketMonitor_Graph
(数据引擎)
职责: 持续从外部世界摄取、处理并结构化市场数据。
触发机制: 时间驱动。
内部状态 (
MonitorState
):Pythonfrom pydantic import BaseModel, Field from typing import List, Dict, Any class MonitorState(BaseModel): # 输入,定义我们要监控哪些股票 symbols_to_watch: List[str] # 用于存储本轮运行中新生成的所有信号 latest_signals: List[Dict[str, Any]] = Field(default_factory=list)
2.2. StrategyForge_Graph
(策略大脑)
职责: 从信号中发现机会,形成策略,并通过“假设-编码-回测-决策”的循环进行验证。
触发机制: 事件驱动。
内部状态 (
StrategyState
):Pythonfrom typing import Optional class StrategyState(BaseModel): triggering_signals: List[Dict[str, Any]] = Field(default_factory=list) current_portfolio: Dict[str, Any] = Field(default_factory=dict) hypothesis: Optional[str] = None sub_hypotheses: List[Dict[str, Any]] = Field(default_factory=list) backtest_code: Optional[str] = None backtest_results: Optional[Dict[str, Any]] = None trade_proposal: Optional[Dict[str, Any]] = None
2.3. PortfolioManager_Graph
(交易执行者)
职责: 执行交易、风险管理和投资组合状态维护。
触发机制: 事件驱动。
3. 核心技术细节剖析 (附代码实现)
笔记:此部分是面试中最关键的“亮点”环节,展示了具体的技术选型和解决问题的能力。
3.1. 共享状态总线 (Shared State Bus)
我们选择 sqlite3
作为图之间异步通信的媒介,它模拟了一个解耦的、持久化的消息总线。
Python
import sqlite3
import json
DB_FILE = "shared_database.db"
def initialize_database():
"""初始化 SQLite 数据库和表结构。"""
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# 市场信号表,data 字段存储 JSON 字符串
cursor.execute("""
CREATE TABLE IF NOT EXISTS market_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
source TEXT NOT NULL,
type TEXT NOT NULL,
symbol TEXT NOT NULL,
data TEXT NOT NULL
);
""")
# 投资组合状态表
cursor.execute("""
CREATE TABLE IF NOT EXISTS portfolio_state ( /* ... schema ... */ );
""")
conn.commit()
conn.close()
3.2. 自定义工具 (@tool
)
通过 @tool
装饰器,我们将一个简单的数据库读取函数,封装成了一个可被 LangChain 生态系统识别和调用的工具。
Python
from langchain_core.tools import tool
@tool
def read_signals_from_db(max_signals: int = 10) -> List[Dict[str, Any]]:
"""从共享数据库的 market_signals 表中读取最近的 N 条信号。"""
conn = sqlite3.connect(DB_FILE)
# ... (数据库连接和查询逻辑) ...
signals = []
for row in rows:
signal_dict = dict(zip(column_names, row))
# 将 JSON 字符串解析回 Python 字典
signal_dict['data'] = json.loads(signal_dict['data'])
signals.append(signal_dict)
return signals
3.3. 动态代码生成与安全执行
这是“策略大脑”最核心的能力,分为两步:生成和执行。
Step 1: 高质量的代码生成 (BacktestCodeGenNode)
我们通过一个高度结构化的 Prompt,引导 LLM 生成带有机器可读标记的、格式统一的 Python 代码。
Python
# 节点方法中的关键部分
def generate_backtest_code(self, state: StrategyState) -> Dict[str, Any]:
# ...
prompt = ChatPromptTemplate.from_template(
"""你是一位顶级的 Python 量化开发专家...
... (详细的分步指令和角色扮演) ...
7. **打印结果 (JSON格式)**: ...必须用 .to_json() 方法将其转换为 JSON 字符串再打印...请用特殊标记符将它包裹起来,格式如下:
```
print("---BACKTEST_STATS_START---")
print(stats.to_json())
print("---BACKTEST_STATS_END---")
```
...
"""
)
chain = prompt | self.llm | StrOutputParser()
code_str = chain.invoke(...)
return {"backtest_code": code_str}
Step 2: 健壮的代码执行 (BacktestExecuteNode)
在执行前,我们先对 LLM 的输出进行“净化”,然后才送入沙盒环境执行,并解析结果。
Python
import re
from langchain_community.tools.python import PythonREPLTool
# 节点方法中的关键部分
def execute_backtest(self, state: StrategyState) -> Dict[str, Any]:
raw_code_str = state.backtest_code
# 1. 输出净化:使用正则表达式提取纯代码
pure_code = ""
match = re.search(r"```python\n(.*?)```", raw_code_str, re.DOTALL)
if match:
pure_code = match.group(1)
# 2. 沙盒执行:调用 PythonREPLTool
# self.python_repl_tool 是通过依赖注入传入的
repl_output = self.python_repl_tool.invoke(pure_code)
# 3. 结果解析:利用预设的标记符提取并解析JSON
try:
start_marker = "---BACKTEST_STATS_START---"
# ... (字符串查找和 json.loads 解析逻辑) ...
stats_dict = json.loads(stats_json_str)
return {"backtest_results": stats_dict}
except Exception as e:
# 优雅地处理解析失败的情况
return {"backtest_results": {"error": str(e), "raw_output": repl_output}}
3.4. 图的组合 (Hierarchical Composition)
虽然我们没有完全实现“CEO主图”,但其核心技术原理在于,任何编译好的 LangGraph app
都是一个 Runnable
。这意味着主图可以像调用一个普通工具一样调用子图。
Python
# 伪代码:展示如何在主图中调用子图
from langgraph.graph import StateGraph
# 假设 forge_app 是我们编译好的 StrategyForge_Graph
# forge_app = forge_graph.compile()
# 在主图的某个节点中
def ceo_delegation_node(state):
# 从主图状态中获取给研发部的任务
dev_task = state['dev_task']
# 直接调用子图的 .invoke() 方法,传入子图需要的输入
# 这是一个阻塞调用,主图会等待子图完成
dev_result = forge_app.invoke({
"triggering_signals": dev_task['signals']
})
# 返回子图的最终成果,更新主图的状态
return {"final_dev_report": dev_result.get('trade_proposal')}
# ... 在主图中添加这个节点 ...
# master_graph.add_node("delegate_to_dev", ceo_delegation_node)
4. 面试要点与架构权衡
Q: 该系统与 AutoGen 等多智能体框架有何不同?
A: 核心区别在于控制模式。AutoGen 以“对话”为中心,流程具有涌现性,适合探索性任务。本系统基于 LangGraph 的“状态图”,对工作流拥有确定性的、可编程的控制,在需要严格执行顺序、条件分支、错误处理和分层委托的复杂业务场景下,可靠性和可预测性更高。
Q: 为什么选择共享数据库,而不是让图之间直接进行API调用?
A: 为了解耦、异步和持久化。1. 解耦: 避免了服务间的紧耦合,一个组件的改动不影响其他组件。2. 异步: 允许各组件以自己的节奏工作,提高了系统吞吐量。3. 持久化与可审计性: 所有交互都被记录在案,便于调试、回溯和分析,系统可从中断中恢复。
Q: 该系统的主要技术风险是什么?
A: 1. LLM 的可靠性: 存在“幻觉”或生成低质量内容的风险,需要设计更多的验证和修正循环。2. 代码执行的安全:
PythonREPLTool
是一个核心安全风险点,生产环境必须使用严格的沙盒化。3. 状态管理的复杂性: 随着业务逻辑增加,共享数据库的模式和图的状态对象会变得复杂,需要良好的数据治理。
5. 核心知识点巩固 (Key Takeaways for Interviews)
笔记:此部分是整个项目的核心技术点提炼,每一条都可以在面试中作为一个独立的、有深度的技术探讨话题。
LangGraph 的本质是状态机 (LangGraph as a State Machine)
LangGraph 的核心思想不是链式调用,而是将复杂的 AI 工作流建模为一个有限状态机 (Finite State Machine)。整个图的运行,本质上是在一个中心化的状态对象 (State Object) 上,根据预设的节点 (Nodes) 和边 (Edges),进行状态转移的过程。
Pydantic 状态管理的重要性 (Pydantic for Robust State)
在复杂系统中,使用 Pydantic 的
BaseModel
来定义状态,优于简单的TypedDict
。它提供了运行时的类型检查和数据校验,能及早发现因数据格式错误导致的 bug,是保障系统健壮性的第一道防线。
条件边是“智能”的核心 (
Conditional Edges
for Logic)图的智能和逻辑(循环、分支)是通过
add_conditional_edges
实现的。其模式是:源节点 -> 决策函数 -> {决策结果: 目标节点}
。这个决策函数是一个普通的 Python 函数,它根据当前状态返回一个字符串,图根据这个字符串将流程路由到不同的分支,这是实现复杂工作流的关键。
动态能力:代码生成与沙盒执行 (Dynamic Capability: Code Gen + REPL)
高级 Agent 的一个核心特征是动态生成并执行代码。这实现了从“使用固定工具”到“为特定问题创造临时工具”的飞跃。
PythonREPLTool
是实现这一点的强大工具,但在生产环境中,其安全性至关重要,必须在严格隔离的沙盒环境(如 Docker 容器)中运行。
输出净化与边界标记 (Output Sanitization & Markers)
LLM 的代码生成输出常常是“不纯净的”,会夹杂自然语言解释和 Markdown 标记。在将代码送入执行器之前,通过正则表达式等手段提取纯代码,是一个必不可少的、稳健的工程实践。同时,在 Prompt 中要求 LLM 使用明确的开始/结束标记符(如
---START---
)来包裹关键输出,可以极大地简化和稳定后续的解析工作。
自定义工具的灵活性 (
@tool
Decorator)LangChain 的
@tool
装饰器提供了一种极其简单和强大的方式,可以将任何自定义的 Python 函数(无论其内部逻辑多复杂,如读写数据库)无缝地封装成一个可被 Agent 调用的工具,这为连接现有业务逻辑和外部世界提供了无限的可能。
图的可组合性与分层架构 (Graph Composability & Hierarchy)
LangGraph 的“杀手级特性”:任何一个通过
.compile()
编译的图实例,其本身就是一个标准的Runnable
对象。这意味着一个图可以被当作一个节点,在另一个更高阶的图中被调用。这是实现“图之图”分层架构的技术基石,也是实现模块化、封装和高可复用性等高级软件工程原则的途径。
通过共享状态实现异步解耦 (Decoupling via Shared State)
在多图智能体系统中,让图之间通过一个共享的、持久化的数据库(或消息队列) 来通信,是一种比直接 API 调用更先进的架构模式。它实现了图之间的解耦和异步工作,使得整个系统更有弹性、更易于扩展,并且所有交互都有据可查。
依赖注入的工程实践 (Dependency Injection Best Practice)
在设计节点逻辑类(如我们的
StrategyForge
)时,应将其所依赖的外部服务(如llm
实例、python_repl_tool
实例)通过构造函数__init__
注入进去,而不是在方法内部直接引用全局变量。这是一种标准的软件工程实践,它使得代码更清晰、更易于测试、耦合度更低。
Last updated