LangGraph 多 Agent 行情接入:节点之间传递实时数据的 3 种姿势
作者: TickDB Research · 发布: 2026/5/24 · 阅读: 7
标签: C 类, 掘金, Agent
目录
- 一、当你的金融 Agent 节点“各说各话”
- 二、姿势 1:State 共享——最简单,但 reducer 行为容易忽略
- 三、姿势 2:Tool 封装——最灵活,但 API 调用量是隐藏成本
- 四、姿势 3:Subgraph 组合——最工程,但状态映射是调试重点
- 五、三种姿势权衡矩阵与数据一致性策略
- 六、完整示例:盘前-盘中-盘后多 Agent 协同
- 七、FAQ
- 附录:TickDB REST 端点与 MCP 工具对照表
一、当你的金融 Agent 节点“各说各话”
上周我在调试一个 LangGraph Agent。它的结构很简单——三个节点:
data_fetcher → analyzer → reporter
data_fetcher 负责调行情 API 取贵州茅台(600519.SH)的实时数据,analyzer 计算 5 日均线,reporter 生成一段 Markdown 推送。
问题出在第二天早上。我打开 Telegram,看到 analyzer 计算的趋势标签和 reporter 里的最新价差了 0.8%。一查日志——data_fetcher 在 9:30:01 取了数据,analyzer 在 9:30:03 用自己的 Tool 又取了一次,两次取到的 last_price 不一样。两个节点在几乎同一时刻,对“茅台现价”这件事的判断基础不一致。
这不是 LangGraph 的 bug,而是行情数据在 Agent 节点间传递时,时效性、一致性、来源统一性三个问题同时爆发。
这三个问题的根源其实指向同一件事:你的所有节点是不是用同一个数据源?如果每个节点各自用不同方式拉数据,你首先需要的是一个统一的数据源——至少保证所有节点拿到的字段结构一致。TickDB 的 REST API 在这个场景下是一个自然的选择:GET /v1/market/ticker?symbols=600519.SH 返回的 last_price 对所有节点都是同一个字段名。但光有统一数据源还不够——数据怎么在节点间传递,才是这篇文章要解决的核心问题。
接下来拆解三种传递姿势,每种都会踩一个真实的问题。
二、姿势 1:State 共享——最简单,但 reducer 行为容易忽略
是什么
LangGraph 的 State 是一个跨节点共享的数据容器(通常是 TypedDict)。每个节点返回一个字典,LangGraph 将返回的字典合并回 State,后续节点从 State 读取数据。
为什么容易出问题
很多开发者以为 State 是“全局变量”——节点 A 写入了 {"ticker_data": {...}},节点 B 立即就能读到。但实际行为取决于两个因素:
- Reducer 的类型:LangGraph 对 State 字段默认是覆盖更新(新值替换旧值)。如果你显式配置了
Annotated[list, operator.add],才会将两次写入拼接。如果你对列表字段期望的是覆盖,却误配了 add reducer,数据会越积越多而非更新。 - 并行分支写入同一 key:LangGraph 的并行分支(如
graph.add_edge("A", "B"); graph.add_edge("A", "C"))中,如果节点 B 和节点 C 同时返回对同一 key 的更新,reducer 的合并语义决定了最终 State 中该 key 的值——可能是覆盖(后者胜出)、拼接(add reducer)或抛出异常(无 reducer 的并发冲突)。
真实踩坑场景
# 节点 A:data_fetcher(返回包含 ticker_data 的字典)
def data_fetcher(state):
resp = requests.get("https://api.tickdb.ai/v1/market/ticker?symbols=600519.SH")
data = resp.json()
return {"ticker_data": data} # 更新 State
# 节点 B:analyzer(从 State 读取 ticker_data)
def analyzer(state):
price = state["ticker_data"]["data"][0]["last_price"]
# 对于顺序边 A → B,这里通常能读到 A 刚写入的数据。
# 但在并行分支 A → (B, C) 中,如果 B 和 C 都写同一 key,
# reducer 的行为决定了最终结果——覆盖、拼接还是抛异常。
⚠️ 关键点:
- 对于顺序边(
A → B),B 通常能看到 A 返回并合并后的 State。- 真正需要关注的是并行分支、循环边、多节点写入同一 key 的场景——reducer 的合并语义在这些场景下至关重要。
- 默认覆盖更新适合单节点写入的场景;多节点写同一 key 时需要自定义 reducer 或使用
Annotated类型。
正确用法
适用场景:低频更新的数据——开盘前取一次日 K 线,全图共享。不适合高频 tick 数据或并行分支写入同一 key 的场景。
正确做法:
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
from decimal import Decimal
class MarketState(TypedDict):
ticker_data: dict
ticker_timestamp: int # 加时间戳,下游检查时效
kline_data: dict
analysis_result: str
def data_fetcher(state: MarketState) -> dict:
resp = requests.get(
"https://api.tickdb.ai/v1/market/ticker",
params={"symbols": "600519.SH"},
headers={"X-API-Key": API_KEY}
)
data = resp.json()
if data["code"] == 0:
return {
"ticker_data": data["data"][0],
"ticker_timestamp": data["data"][0]["timestamp"]
}
return {}
def analyzer(state: MarketState) -> dict:
import time
now = int(time.time() * 1000)
# 关键:检查时效性
if now - state.get("ticker_timestamp", 0) > 120_000:
return {"analysis_result": "数据过期,跳过分析"}
price = Decimal(str(state["ticker_data"]["last_price"]))
# ... 计算均线逻辑
return {"analysis_result": f"现价 {price},均线趋势标签示例"}
要点:
- 在 State 中存储
timestamp,下游节点使用前检查时效 - 高频更新的数据(如 ticker)不适合 State 共享,应使用姿势 2 的 Tool 模式
- 低频数据(如日 K 线、基本面)适合 State 共享
- 并行分支写入同一 key 时需要自定义 reducer,否则使用默认覆盖并确保只有一个节点写该 key
三、姿势 2:Tool 封装——最灵活,但 API 调用量是隐藏成本
是什么
将 TickDB API 调用封装为 LangChain 的 @tool,每个节点在需要时独立调用 Tool,获取最新数据。
为什么灵活但要小心
Tool 模式提高了按需获取新数据的能力,让每个节点都能拿到较新的数据。但代价是:如果你的 Agent 有 5 个节点,每个节点调一次 get_ticker,那就是 5 次 API 请求。在生产环境中,这触发了两个风险:①API 限流(TickDB 的 3001 错误);②同一毫秒内两次请求返回的价格可能不一致(价格在变化)。此外,Tool 模式仍受网络延迟和数据源更新频率影响,获取到的数据并非“零延迟”。
真实踩坑场景
# 节点 A 和节点 B 几乎同时调用 Tool
# 节点 A 在 9:30:01.234 拿到 last_price = 1823.50
# 节点 B 在 9:30:01.456 拿到 last_price = 1823.62
# 节点 A 的计算基于 1823.50,节点 B 的计算基于 1823.62
# 两个节点的计算基础不一致,输出可能产生矛盾
⚠️ 关键点:
- 同一轮执行中多次 Tool 调用可能返回不同的价格(市场在变化)
- 如果要求同一轮内所有节点基于同一价格做判断,需要在 Tool 内部或 State 层面做快照
正确做法
适用场景:对实时性要求高的节点(如盘中监控),每个节点独立获取最新 tick 数据。
正确做法:
from functools import lru_cache
import time
from decimal import Decimal
from langchain.tools import tool
class CachedTickerTool:
"""带缓存的行情 Tool,避免短时间内重复请求"""
def __init__(self):
self._cache = {}
self._cache_ttl = 5 # 缓存有效期(秒)
def get_ticker(self, symbols: str) -> dict:
# 规范化 symbols 顺序,避免 "A,B" 和 "B,A" 命中不同缓存
normalized = ",".join(sorted(symbols.split(",")))
cache_key = f"ticker:{normalized}"
now = time.time()
if cache_key in self._cache:
cached_time, cached_data = self._cache[cache_key]
if now - cached_time < self._cache_ttl:
return cached_data
# 缓存过期或不存在,调 API(循环重试,非递归)
for attempt in range(3):
try:
resp = requests.get(
"https://api.tickdb.ai/v1/market/ticker",
params={"symbols": normalized},
headers={"X-API-Key": API_KEY},
timeout=10
)
# HTTP 429 限流
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After", str(2 ** attempt))
try:
wait = int(retry_after)
except ValueError:
wait = 2 ** attempt
if attempt < 2:
time.sleep(wait)
continue
return {"error": "rate_limited", "message": "HTTP 429"}
data = resp.json()
if data["code"] == 0:
self._cache[cache_key] = (now, data)
return data
elif data["code"] == 3001:
retry_after = resp.headers.get("Retry-After", str(2 ** attempt))
try:
wait = int(retry_after)
except ValueError:
wait = 2 ** attempt
if attempt < 2:
time.sleep(wait)
continue
return {"error": "rate_limited", "code": 3001}
elif data["code"] == 1001:
raise PermissionError("鉴权失败(1001): API Key 无效或已过期")
elif data["code"] == 1002:
raise PermissionError("鉴权失败(1002): 未提供 API Key")
elif data["code"] == 1004:
raise PermissionError("鉴权失败(1004): API Key 权限不足")
else:
return {"error": "api_error", "code": data["code"]}
except (requests.Timeout, requests.ConnectionError) as e:
if attempt < 2:
time.sleep(2 ** attempt)
continue
return {"error": "network_error", "message": str(e)}
except json.JSONDecodeError as e:
return {"error": "json_parse_error", "message": str(e)}
return {"error": "max_retries_exceeded"}
# 全局单例
ticker_tool_instance = CachedTickerTool()
@tool
def get_ticker_tool(symbols: str) -> dict:
"""获取实时行情快照。使用前检查缓存,避免短时间内重复请求。"""
return ticker_tool_instance.get_ticker(symbols)
# 节点中使用
def data_fetcher(state):
result = get_ticker_tool.invoke({"symbols": "600519.SH,700.HK"})
return {"ticker_data": result}
def analyzer(state):
# 如果与 data_fetcher 在同一缓存窗口内,不会重复调 API
result = get_ticker_tool.invoke({"symbols": "600519.SH"})
price = Decimal(str(result["data"][0]["last_price"]))
# ...
要点:
- 在 Tool 内部实现缓存(同一缓存窗口内的重复请求不调 API)
- 缓存 TTL 根据策略频率设置(5-30 秒)
- 使用循环重试而非递归,避免栈溢出
- 处理 429/3001 限流、1001/1002/1004 鉴权、超时、JSON 解析失败
- 缓存 key 包含规范化后的 symbols,避免顺序差异导致缓存未命中
四、姿势 3:Subgraph 组合——最工程,但状态映射是调试重点
是什么
将一个“数据获取+预处理”的完整流程封装为子图(Subgraph),主图通过节点或 wrapper 函数与子图通信。
为什么工程化
子图的可复用性是三种姿势中最强的。如果你有多个 Agent(如 A 股 Agent、港股 Agent、美股 Agent),每个都需要行情数据获取+校验+标准化,用 Subgraph 可以写一次、到处复用。但代价是——子图和父图之间的数据传递需要显式处理,当 State 嵌套较深时,容易出现数据“消失”的问题(无报错,但下游节点读到 None)。
真实踩坑场景
# 子图内部有 fetch、validate、enrich 三个节点
# 主图的 state["raw_input"] 要传给子图
# 子图的 state["cleaned_data"] 要传回主图
# 如果数据映射不正确,数据在子图边界“消失”——无报错,但下游节点读到 None
⚠️ 关键点:
- 将 compiled subgraph 作为普通节点添加到主图时,需要手动处理状态的输入输出映射
- LangGraph 的 Subgraph API 在不同版本中变化较大,本文以“wrapper node”方式实现子图集成——这是最稳定、最通用的做法
- 调试时先检查子图的输入和输出,确认数据在边界没有被丢弃
正确做法
适用场景:复杂数据流水线(获取→校验→清洗→标准化),多个 Agent 复用同一套数据预处理逻辑。
正确做法(wrapper node 方式,兼容当前 LangGraph 版本):
from langgraph.graph import StateGraph
from typing import TypedDict
from decimal import Decimal
# ========== 子图:行情数据预处理 ==========
class MarketDataState(TypedDict):
raw_input: str # 输入:品种代码
market_data: dict # 输出:处理后的行情数据
validation_errors: list # 校验错误
def fetch_node(state: MarketDataState):
symbols = state["raw_input"]
resp = requests.get(
f"https://api.tickdb.ai/v1/market/ticker",
params={"symbols": symbols},
headers={"X-API-Key": API_KEY}
)
data = resp.json()
if data["code"] == 0:
return {"market_data": data["data"]}
return {"validation_errors": [f"fetch failed: {data.get('message')}"]}
def validate_node(state: MarketDataState):
errors = state.get("validation_errors", [])
market_data = state.get("market_data", [])
if not market_data:
errors.append("market_data is empty")
for item in market_data:
price = Decimal(str(item.get("last_price", 0)))
if price <= 0:
errors.append(f"invalid price for {item.get('symbol')}")
return {"validation_errors": errors}
# 构建子图
market_subgraph = StateGraph(MarketDataState)
market_subgraph.add_node("fetch", fetch_node)
market_subgraph.add_node("validate", validate_node)
market_subgraph.add_edge("fetch", "validate")
market_subgraph.set_entry_point("fetch")
market_subgraph.set_finish_point("validate")
compiled_subgraph = market_subgraph.compile()
# ========== 主图 ==========
class MainState(TypedDict):
symbols: str
processed_market_data: dict
errors: list
# Wrapper 节点:在主图 State 和子图 State 之间做映射
def market_data_wrapper(state: MainState) -> dict:
# 1. 从主图 State 提取子图需要的输入
subgraph_input = {"raw_input": state["symbols"]}
# 2. 调用子图
subgraph_result = compiled_subgraph.invoke(subgraph_input)
# 3. 将子图输出映射回主图 State
return {
"processed_market_data": subgraph_result.get("market_data", {}),
"errors": subgraph_result.get("validation_errors", [])
}
main_graph = StateGraph(MainState)
main_graph.add_node("market_data_pipeline", market_data_wrapper)
main_graph.set_entry_point("market_data_pipeline")
main_graph.set_finish_point("market_data_pipeline")
compiled_main = main_graph.compile()
要点:
- 使用 wrapper node 方式集成子图——这是最稳定的跨版本做法
- wrapper 内部显式做
主图 State → 子图 State → 调用 → 子图结果 → 主图 State的完整映射 - 子图的错误通过
errors字段传递回主图,不要被子图吞掉 - 调试时先打印子图的输入和输出,确认数据在边界没有丢失
五、三种姿势权衡矩阵与数据一致性策略
权衡矩阵
| 维度 | 姿势 1:State 共享 | 姿势 2:Tool 封装 | 姿势 3:Subgraph 组合 |
|---|---|---|---|
| 数据新鲜度 | 低(依赖上次 State 更新) | 高(每次调用拉最新) | 取决于子图内部实现 |
| API 调用量 | 低(每个品种/轮) | 高(每个节点/轮) | 取决于子图内部实现 |
| 实现复杂度 | 低 | 中(需处理缓存+限流) | 高(需处理状态映射) |
| 可维护性 | 中(State 膨胀后难管理) | 高(Tool 独立可测) | 最高(子图可复用) |
| 适用场景 | 低频数据(日 K、基本面) | 高频 tick 数据、按需查询 | 复杂数据流水线、多 Agent 复用 |
| 核心风险 | 并行分支写入同一 key 时 reducer 行为不确定 | API 限流 + 节点间数据不一致 | 状态映射错误导致数据丢失 |
数据一致性策略
| 策略 | 实现方式 | 适用场景 |
|---|---|---|
| 同一轮固定快照 | 一个节点调 API 后将数据存入 State,所有节点读同一份 State | 低频数据、需要所有节点基于同一价格做判断 |
| 按需获取 + TTL 缓存 | Tool 内部缓存(TTL 5-30s),同一缓存窗口内返回相同数据 | 高频数据、对一致性要求不极端 |
| 多 Agent 标准化 | Subgraph/wrapper 将行情数据获取和预处理集中管理,所有 Agent 调同一子图 | 多个 Agent 需要相同数据格式 |
选型指南
你的场景是什么?
低频数据(日 K 线、基本面)→ 姿势 1:State 共享
高频数据(ticker、盘口)且节点数 ≤ 3 → 姿势 2:Tool 封装(加 TTL 缓存)
高频数据且节点数 > 3 或需要多 Agent 复用 → 姿势 3:Subgraph 组合
六、完整示例:盘前-盘中-盘后多 Agent 协同
以下是一个结合三种姿势的完整 LangGraph Agent,模拟“盘前分析→盘中监控→盘后报告”的完整流程。
依赖安装(基于 langgraph 0.2.x 测试):
pip install langgraph==0.2.68 langchain-core requests
完整代码:
"""
LangGraph 多 Agent 行情接入示例
场景:盘前分析(K线)→ 盘中监控(ticker)→ 盘后报告(估值)
覆盖姿势 1(State 共享日 K)+ 姿势 2(Tool 封装 ticker)+ 姿势 3(Subgraph 预处理)
"""
import os, time, logging
from typing import TypedDict
from decimal import Decimal
import requests
from langgraph.graph import StateGraph
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("MarketAgent")
API_KEY = os.getenv("TICKDB_API_KEY", "")
BASE_URL = "https://api.tickdb.ai/v1"
if not API_KEY:
raise EnvironmentError("请设置环境变量 TICKDB_API_KEY")
# ============================================================
# 1. 带缓存和限流处理的 HTTP 客户端
# ============================================================
class APIClient:
def __init__(self):
self._cache = {}
self._cache_ttl = 5
def _call(self, endpoint: str, params: dict):
url = f"{BASE_URL}{endpoint}"
headers = {"X-API-Key": API_KEY}
for attempt in range(3):
try:
resp = requests.get(url, params=params, headers=headers, timeout=10)
if resp.status_code == 429:
retry_after = resp.headers.get("Retry-After", str(2 ** attempt))
try:
wait = int(retry_after)
except ValueError:
wait = 2 ** attempt
if attempt < 2:
logger.warning(f"HTTP 429 等待{wait}s")
time.sleep(wait); continue
return None
data = resp.json()
if data["code"] == 0:
return data
if data["code"] == 3001:
wait = int(resp.headers.get("Retry-After", 2 ** attempt))
logger.warning(f"限流(3001) 等待{wait}s")
time.sleep(wait); continue
if data["code"] == 1001:
raise PermissionError("API Key 无效(1001)")
if data["code"] == 1002:
raise PermissionError("未提供 API Key(1002)")
if data["code"] == 1004:
raise PermissionError("API Key 权限不足(1004)")
return None
except (requests.Timeout, requests.ConnectionError) as e:
logger.warning(f"网络错误 {e}")
if attempt < 2:
time.sleep(2 ** attempt); continue
return None
return None
def get_kline(self, symbol: str, interval: str = "1d", limit: int = 20):
cache_key = f"kline:{symbol}:{interval}:{limit}"
now = time.time()
if cache_key in self._cache:
ts, val = self._cache[cache_key]
if now - ts < self._cache_ttl:
return val
result = self._call("/market/kline", {"symbol": symbol, "interval": interval, "limit": limit})
self._cache[cache_key] = (now, result)
return result
def get_ticker(self, symbols: str):
return self._call("/market/ticker", {"symbols": symbols})
def get_calc_index(self, symbols: str):
# 注意:REST 估值端点是 /market/calc-index,不是 /market/market-metrics
return self._call("/market/calc-index", {"symbols": symbols})
client = APIClient()
# ============================================================
# 2. 姿势 3:Subgraph 数据预处理(wrapper node 集成)
# ============================================================
class PreprocessState(TypedDict):
raw_kline: dict
cleaned_kline: dict
def fetch_kline(state: PreprocessState):
data = client.get_kline("600519.SH", "1d", 20)
return {"raw_kline": data}
def clean_kline(state: PreprocessState):
raw = state.get("raw_kline", {})
if not raw or raw.get("code") != 0:
return {"cleaned_kline": {}}
klines = raw["data"]["klines"]
cleaned = []
for k in klines:
cleaned.append({
"time": k["time"],
"close": str(Decimal(k["close"])),
"volume": str(Decimal(k["volume"]))
})
return {"cleaned_kline": {"klines": cleaned, "count": len(cleaned)}}
preprocess_graph = StateGraph(PreprocessState)
preprocess_graph.add_node("fetch", fetch_kline)
preprocess_graph.add_node("clean", clean_kline)
preprocess_graph.add_edge("fetch", "clean")
preprocess_graph.set_entry_point("fetch")
preprocess_graph.set_finish_point("clean")
compiled_preprocess = preprocess_graph.compile()
# ============================================================
# 3. 主图 State 定义
# ============================================================
class MainState(TypedDict):
kline_data: dict # 姿势 1:State 共享(盘前 K 线)
ticker_result: dict # 姿势 2:Tool 输出(盘中 ticker)
calc_index_result: dict # 姿势 2:Tool 输出(盘后估值)
report: str # 最终报告
# ============================================================
# 4. 节点定义
# ============================================================
def pre_market_node(state: MainState):
"""盘前分析:拉日 K 线,用 Subgraph 预处理后写入 State(姿势 3)"""
result = compiled_preprocess.invoke({"raw_kline": {}, "cleaned_kline": {}})
kline_data = result.get("cleaned_kline", {})
if kline_data and kline_data.get("klines"):
closes = [Decimal(k["close"]) for k in kline_data["klines"]]
if len(closes) >= 10:
ma5 = sum(closes[-5:]) / 5
ma10 = sum(closes[-10:]) / 10
trend_label = "up" if ma5 > ma10 else "down"
state["kline_data"] = {"ma5": str(ma5), "ma10": str(ma10), "trend": trend_label}
return state
def intraday_node(state: MainState):
"""盘中监控:调 Tool 获取实时 ticker(姿势 2)"""
ticker = client.get_ticker("600519.SH,700.HK,AAPL.US")
if ticker and ticker.get("code") == 0:
result = []
for item in ticker["data"]:
result.append({
"symbol": item["symbol"],
"last_price": item["last_price"],
"chg": item.get("price_change_percent_24h", "N/A")
})
state["ticker_result"] = {"monitored": result}
return state
def post_market_node(state: MainState):
"""盘后报告:调 Tool 获取估值指标(姿势 2)"""
calc_index = client.get_calc_index("600519.SH,700.HK")
if calc_index and calc_index.get("code") == 0:
state["calc_index_result"] = {"data": calc_index["data"]}
return state
def reporter_node(state: MainState):
"""生成报告:读取 State 中三个时段的数据"""
lines = ["# 盘前-盘中-盘后 报告", ""]
kline = state.get("kline_data", {})
lines.append(f"## 盘前\nMA5: {kline.get('ma5', 'N/A')}, MA10: {kline.get('ma10', 'N/A')}, 趋势标签: {kline.get('trend', 'N/A')}")
ticker = state.get("ticker_result", {}).get("monitored", [])
lines.append("\n## 盘中监控")
for t in ticker:
lines.append(f"- {t['symbol']}: {t['last_price']} ({t['chg']}%)")
calc_index = state.get("calc_index_result", {}).get("data", [])
lines.append("\n## 盘后估值")
for m in calc_index:
lines.append(f"- {m['symbol']}: PE={m.get('pe_ttm_ratio', 'N/A')}, PB={m.get('pb_ratio', 'N/A')}")
state["report"] = "\n".join(lines)
return state
# ============================================================
# 5. 构建主图
# ============================================================
main_graph = StateGraph(MainState)
main_graph.add_node("pre_market", pre_market_node)
main_graph.add_node("intraday", intraday_node)
main_graph.add_node("post_market", post_market_node)
main_graph.add_node("reporter", reporter_node)
main_graph.set_entry_point("pre_market")
main_graph.add_edge("pre_market", "intraday")
main_graph.add_edge("intraday", "post_market")
main_graph.add_edge("post_market", "reporter")
main_graph.set_finish_point("reporter")
compiled_main = main_graph.compile()
# ============================================================
# 6. main
# ============================================================
if __name__ == "__main__":
result = compiled_main.invoke({
"kline_data": {},
"ticker_result": {},
"calc_index_result": {},
"report": ""
})
print(result.get("report", "报告生成失败"))
代码解读:行情数据在三种姿势下的时效性管理——盘前用姿势 3(Subgraph 预处理日 K),盘中和盘后用姿势 2(Tool 获取最新 ticker 和估值),盘前预处理后的日 K 通过姿势 1(State)在全图共享。APIClient 的缓存和限流处理(循环重试,最大 3 次)是 Tool 模式在工程中的必备组件。Subgraph 通过 compiled_preprocess.invoke() 显式集成到 pre_market_node 中,而非未接入的占位代码。
运行输出示例(脱敏):
# 盘前-盘中-盘后 报告
## 盘前
MA5: 1821.50, MA10: 1815.30, 趋势标签: up
## 盘中监控
- 600519.SH: 1823.50 (0.15%)
- 700.HK: 455.20 (-0.82%)
- AAPL.US: 198.45 (0.33%)
## 盘后估值
- 600519.SH: PE=28.5, PB=6.2
- 700.HK: PE=22.1, PB=4.8
七、FAQ
Q1:什么时候该用 State 共享,什么时候该用 Tool?
一个简单的判断标准:如果数据在 Agent 执行期间不会变化(如日 K 线、基本面),用 State 共享。如果数据会变化(如 ticker、盘口),用 Tool 封装。State 共享时在数据中附带 timestamp,Tool 内部加缓存,减少 API 调用。并行分支写入同一 key 时需自定义 reducer。
Q2:三种姿势能在同一个 Agent 中混用吗?
能。第六章的完整示例就是混用——盘前用姿势 3(Subgraph 预处理日 K)→ 姿势 1(State 共享结果),盘中用姿势 2(Tool 拉 ticker)。关键是明确每种数据的更新频率和容忍度。
Q3:LangGraph 的 State 和 Tool 与 TickDB API 的交互需要注意什么?
- State 中存储的行情数据要附带
timestamp,下游节点检查时效 - Tool 内部必须处理
3001(限流循环重试)、1001/1002/1004(鉴权阻断)、HTTP 429、超时、JSON 解析失败 - 不要把所有 TickDB 数据都塞进 State——高频数据应通过 Tool 按需获取
- 子图内部错误(如 API 超时)要通过 State 字段传递回主图
- 注意 REST 端点与 MCP 工具名的区别(见附录对照表)
Q4:TickDB 的 REST 端点和 MCP 工具名怎么对应?
见附录对照表。关键区别:REST 估值端点是 /v1/market/calc-index,MCP 估值工具叫 get_market_metrics;REST ticker 端点是 /v1/market/ticker,MCP ticker 工具叫 get_ticker。
附录:TickDB REST 端点与 MCP 工具对照表
| 功能 | REST 端点 | MCP 工具名 |
|---|---|---|
| 实时行情快照 | GET /v1/market/ticker | get_ticker |
| 历史 K 线 | GET /v1/market/kline | get_kline |
| 最近 K 线 | GET /v1/market/kline/latest | get_kline_latest |
| 盘口深度 | GET /v1/market/depth | get_order_book |
| 成交明细 | GET /v1/market/trades | get_recent_trades |
| 估值指标 | GET /v1/market/calc-index | get_market_metrics |
| 资金流向 | GET /v1/market/capital-flow | get_capital_flow |
| 品种列表 | GET /v1/symbols/available | get_available_symbols |
| 分时数据 | GET /v1/market/intraday | get_intraday |
| 股票信息 | GET /v1/market/stock-info | get_stock_info |
| 交易日历 | GET /v1/market/trade-days | get_trade_days |
| 交易时段 | GET /v1/market/trading-sessions | get_trading_sessions |
| K 线周期 | GET /v1/market/kline-intervals | get_kline_intervals |
注意:REST 端点和 MCP 工具名不一致的情况(如估值:REST 用
calc-index,MCP 用get_market_metrics),在代码中混用时需要特别注意。本文第六章使用 REST API,因此用/market/calc-index,而非get_market_metrics。
📡 本文行情数据由 TickDB.ai 提供
- API:
https://api.tickdb.ai - 文档:
https://docs.tickdb.ai
本文仅讨论 Agent 框架的工程实现,不构成任何投资建议。文中所有策略示例、趋势标签、均线数值仅用于演示数据接入和代码验证流程,不代表任何交易信号。
通过 TickDB API 获取实时行情数据
一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。
免费领取 API Key查看 API 文档