综合

LangGraph 多 Agent 行情接入:节点之间传递实时数据的 3 种姿势

作者: TickDB Research · 发布: 2026/5/24 · 阅读: 6

标签: C 类, 掘金, Agent

目录


一、当你的金融 Agent 节点“各说各话”

上周我在调试一个 LangGraph Agent。它的结构很简单——三个节点:

data_fetcher → analyzer → reporter

data_fetcher 负责调行情 API 取贵州茅台(600519.SH)的实时数据,analyzer 计算 5 日均线,reporter 生成一段 Markdown 推送。

问题出在第二天早上。我打开 Telegram,看到 analyzer 计算的趋势标签和 reporter 里的最新价差了 0.8%。一查日志——data_fetcher 在 9:30:01 取了数据,analyzer 在 9:30:03 用自己的 Tool 又取了一次,两次取到的 last_price 不一样。两个节点在几乎同一时刻,对“茅台现价”这件事的判断基础不一致。

这不是 LangGraph 的 bug,而是行情数据在 Agent 节点间传递时,时效性、一致性、来源统一性三个问题同时爆发

这三个问题的根源其实指向同一件事:你的所有节点是不是用同一个数据源?如果每个节点各自用不同方式拉数据,你首先需要的是一个统一的数据源——至少保证所有节点拿到的字段结构一致。TickDB 的 REST API 在这个场景下是一个自然的选择:GET /v1/market/ticker?symbols=600519.SH 返回的 last_price 对所有节点都是同一个字段名。但光有统一数据源还不够——数据怎么在节点间传递,才是这篇文章要解决的核心问题。

接下来拆解三种传递姿势,每种都会踩一个真实的问题。


二、姿势 1:State 共享——最简单,但 reducer 行为容易忽略

是什么

LangGraph 的 State 是一个跨节点共享的数据容器(通常是 TypedDict)。每个节点返回一个字典,LangGraph 将返回的字典合并回 State,后续节点从 State 读取数据。

为什么容易出问题

很多开发者以为 State 是“全局变量”——节点 A 写入了 {"ticker_data": {...}},节点 B 立即就能读到。但实际行为取决于两个因素:

  1. Reducer 的类型:LangGraph 对 State 字段默认是覆盖更新(新值替换旧值)。如果你显式配置了 Annotated[list, operator.add],才会将两次写入拼接。如果你对列表字段期望的是覆盖,却误配了 add reducer,数据会越积越多而非更新。
  2. 并行分支写入同一 key:LangGraph 的并行分支(如 graph.add_edge("A", "B"); graph.add_edge("A", "C"))中,如果节点 B 和节点 C 同时返回对同一 key 的更新,reducer 的合并语义决定了最终 State 中该 key 的值——可能是覆盖(后者胜出)、拼接(add reducer)或抛出异常(无 reducer 的并发冲突)。

真实踩坑场景

# 节点 A:data_fetcher(返回包含 ticker_data 的字典)
def data_fetcher(state):
    resp = requests.get("https://api.tickdb.ai/v1/market/ticker?symbols=600519.SH")
    data = resp.json()
    return {"ticker_data": data}  # 更新 State

# 节点 B:analyzer(从 State 读取 ticker_data)
def analyzer(state):
    price = state["ticker_data"]["data"][0]["last_price"]
    # 对于顺序边 A → B,这里通常能读到 A 刚写入的数据。
    # 但在并行分支 A → (B, C) 中,如果 B 和 C 都写同一 key,
    # reducer 的行为决定了最终结果——覆盖、拼接还是抛异常。

⚠️ 关键点

- 对于顺序边A → B),B 通常能看到 A 返回并合并后的 State。

- 真正需要关注的是并行分支循环边多节点写入同一 key 的场景——reducer 的合并语义在这些场景下至关重要。

- 默认覆盖更新适合单节点写入的场景;多节点写同一 key 时需要自定义 reducer 或使用 Annotated 类型。

正确用法

适用场景:低频更新的数据——开盘前取一次日 K 线,全图共享。不适合高频 tick 数据或并行分支写入同一 key 的场景。

正确做法

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph
from decimal import Decimal

class MarketState(TypedDict):
    ticker_data: dict
    ticker_timestamp: int      # 加时间戳,下游检查时效
    kline_data: dict
    analysis_result: str

def data_fetcher(state: MarketState) -> dict:
    resp = requests.get(
        "https://api.tickdb.ai/v1/market/ticker",
        params={"symbols": "600519.SH"},
        headers={"X-API-Key": API_KEY}
    )
    data = resp.json()
    if data["code"] == 0:
        return {
            "ticker_data": data["data"][0],
            "ticker_timestamp": data["data"][0]["timestamp"]
        }
    return {}

def analyzer(state: MarketState) -> dict:
    import time
    now = int(time.time() * 1000)
    # 关键:检查时效性
    if now - state.get("ticker_timestamp", 0) > 120_000:
        return {"analysis_result": "数据过期,跳过分析"}
    price = Decimal(str(state["ticker_data"]["last_price"]))
    # ... 计算均线逻辑
    return {"analysis_result": f"现价 {price},均线趋势标签示例"}

要点

  • 在 State 中存储 timestamp,下游节点使用前检查时效
  • 高频更新的数据(如 ticker)不适合 State 共享,应使用姿势 2 的 Tool 模式
  • 低频数据(如日 K 线、基本面)适合 State 共享
  • 并行分支写入同一 key 时需要自定义 reducer,否则使用默认覆盖并确保只有一个节点写该 key

三、姿势 2:Tool 封装——最灵活,但 API 调用量是隐藏成本

是什么

将 TickDB API 调用封装为 LangChain 的 @tool,每个节点在需要时独立调用 Tool,获取最新数据。

为什么灵活但要小心

Tool 模式提高了按需获取新数据的能力,让每个节点都能拿到较新的数据。但代价是:如果你的 Agent 有 5 个节点,每个节点调一次 get_ticker,那就是 5 次 API 请求。在生产环境中,这触发了两个风险:①API 限流(TickDB 的 3001 错误);②同一毫秒内两次请求返回的价格可能不一致(价格在变化)。此外,Tool 模式仍受网络延迟和数据源更新频率影响,获取到的数据并非“零延迟”。

真实踩坑场景

# 节点 A 和节点 B 几乎同时调用 Tool
# 节点 A 在 9:30:01.234 拿到 last_price = 1823.50
# 节点 B 在 9:30:01.456 拿到 last_price = 1823.62
# 节点 A 的计算基于 1823.50,节点 B 的计算基于 1823.62
# 两个节点的计算基础不一致,输出可能产生矛盾

⚠️ 关键点

- 同一轮执行中多次 Tool 调用可能返回不同的价格(市场在变化)

- 如果要求同一轮内所有节点基于同一价格做判断,需要在 Tool 内部或 State 层面做快照

正确做法

适用场景:对实时性要求高的节点(如盘中监控),每个节点独立获取最新 tick 数据。

正确做法

from functools import lru_cache
import time
from decimal import Decimal
from langchain.tools import tool

class CachedTickerTool:
    """带缓存的行情 Tool,避免短时间内重复请求"""
    
    def __init__(self):
        self._cache = {}
        self._cache_ttl = 5  # 缓存有效期(秒)
    
    def get_ticker(self, symbols: str) -> dict:
        # 规范化 symbols 顺序,避免 "A,B" 和 "B,A" 命中不同缓存
        normalized = ",".join(sorted(symbols.split(",")))
        cache_key = f"ticker:{normalized}"
        now = time.time()
        
        if cache_key in self._cache:
            cached_time, cached_data = self._cache[cache_key]
            if now - cached_time < self._cache_ttl:
                return cached_data
        
        # 缓存过期或不存在,调 API(循环重试,非递归)
        for attempt in range(3):
            try:
                resp = requests.get(
                    "https://api.tickdb.ai/v1/market/ticker",
                    params={"symbols": normalized},
                    headers={"X-API-Key": API_KEY},
                    timeout=10
                )
                
                # HTTP 429 限流
                if resp.status_code == 429:
                    retry_after = resp.headers.get("Retry-After", str(2 ** attempt))
                    try:
                        wait = int(retry_after)
                    except ValueError:
                        wait = 2 ** attempt
                    if attempt < 2:
                        time.sleep(wait)
                        continue
                    return {"error": "rate_limited", "message": "HTTP 429"}
                
                data = resp.json()
                
                if data["code"] == 0:
                    self._cache[cache_key] = (now, data)
                    return data
                elif data["code"] == 3001:
                    retry_after = resp.headers.get("Retry-After", str(2 ** attempt))
                    try:
                        wait = int(retry_after)
                    except ValueError:
                        wait = 2 ** attempt
                    if attempt < 2:
                        time.sleep(wait)
                        continue
                    return {"error": "rate_limited", "code": 3001}
                elif data["code"] == 1001:
                    raise PermissionError("鉴权失败(1001): API Key 无效或已过期")
                elif data["code"] == 1002:
                    raise PermissionError("鉴权失败(1002): 未提供 API Key")
                elif data["code"] == 1004:
                    raise PermissionError("鉴权失败(1004): API Key 权限不足")
                else:
                    return {"error": "api_error", "code": data["code"]}
                    
            except (requests.Timeout, requests.ConnectionError) as e:
                if attempt < 2:
                    time.sleep(2 ** attempt)
                    continue
                return {"error": "network_error", "message": str(e)}
            except json.JSONDecodeError as e:
                return {"error": "json_parse_error", "message": str(e)}
        
        return {"error": "max_retries_exceeded"}

# 全局单例
ticker_tool_instance = CachedTickerTool()

@tool
def get_ticker_tool(symbols: str) -> dict:
    """获取实时行情快照。使用前检查缓存,避免短时间内重复请求。"""
    return ticker_tool_instance.get_ticker(symbols)

# 节点中使用
def data_fetcher(state):
    result = get_ticker_tool.invoke({"symbols": "600519.SH,700.HK"})
    return {"ticker_data": result}

def analyzer(state):
    # 如果与 data_fetcher 在同一缓存窗口内,不会重复调 API
    result = get_ticker_tool.invoke({"symbols": "600519.SH"})
    price = Decimal(str(result["data"][0]["last_price"]))
    # ...

要点

  • 在 Tool 内部实现缓存(同一缓存窗口内的重复请求不调 API)
  • 缓存 TTL 根据策略频率设置(5-30 秒)
  • 使用循环重试而非递归,避免栈溢出
  • 处理 429/3001 限流、1001/1002/1004 鉴权、超时、JSON 解析失败
  • 缓存 key 包含规范化后的 symbols,避免顺序差异导致缓存未命中

四、姿势 3:Subgraph 组合——最工程,但状态映射是调试重点

是什么

将一个“数据获取+预处理”的完整流程封装为子图(Subgraph),主图通过节点或 wrapper 函数与子图通信。

为什么工程化

子图的可复用性是三种姿势中最强的。如果你有多个 Agent(如 A 股 Agent、港股 Agent、美股 Agent),每个都需要行情数据获取+校验+标准化,用 Subgraph 可以写一次、到处复用。但代价是——子图和父图之间的数据传递需要显式处理,当 State 嵌套较深时,容易出现数据“消失”的问题(无报错,但下游节点读到 None)。

真实踩坑场景

# 子图内部有 fetch、validate、enrich 三个节点
# 主图的 state["raw_input"] 要传给子图
# 子图的 state["cleaned_data"] 要传回主图
# 如果数据映射不正确,数据在子图边界“消失”——无报错,但下游节点读到 None

⚠️ 关键点

- 将 compiled subgraph 作为普通节点添加到主图时,需要手动处理状态的输入输出映射

- LangGraph 的 Subgraph API 在不同版本中变化较大,本文以“wrapper node”方式实现子图集成——这是最稳定、最通用的做法

- 调试时先检查子图的输入和输出,确认数据在边界没有被丢弃

正确做法

适用场景:复杂数据流水线(获取→校验→清洗→标准化),多个 Agent 复用同一套数据预处理逻辑。

正确做法(wrapper node 方式,兼容当前 LangGraph 版本):

from langgraph.graph import StateGraph
from typing import TypedDict
from decimal import Decimal

# ========== 子图:行情数据预处理 ==========
class MarketDataState(TypedDict):
    raw_input: str          # 输入:品种代码
    market_data: dict       # 输出:处理后的行情数据
    validation_errors: list # 校验错误

def fetch_node(state: MarketDataState):
    symbols = state["raw_input"]
    resp = requests.get(
        f"https://api.tickdb.ai/v1/market/ticker",
        params={"symbols": symbols},
        headers={"X-API-Key": API_KEY}
    )
    data = resp.json()
    if data["code"] == 0:
        return {"market_data": data["data"]}
    return {"validation_errors": [f"fetch failed: {data.get('message')}"]}

def validate_node(state: MarketDataState):
    errors = state.get("validation_errors", [])
    market_data = state.get("market_data", [])
    if not market_data:
        errors.append("market_data is empty")
    for item in market_data:
        price = Decimal(str(item.get("last_price", 0)))
        if price <= 0:
            errors.append(f"invalid price for {item.get('symbol')}")
    return {"validation_errors": errors}

# 构建子图
market_subgraph = StateGraph(MarketDataState)
market_subgraph.add_node("fetch", fetch_node)
market_subgraph.add_node("validate", validate_node)
market_subgraph.add_edge("fetch", "validate")
market_subgraph.set_entry_point("fetch")
market_subgraph.set_finish_point("validate")
compiled_subgraph = market_subgraph.compile()

# ========== 主图 ==========
class MainState(TypedDict):
    symbols: str
    processed_market_data: dict
    errors: list

# Wrapper 节点:在主图 State 和子图 State 之间做映射
def market_data_wrapper(state: MainState) -> dict:
    # 1. 从主图 State 提取子图需要的输入
    subgraph_input = {"raw_input": state["symbols"]}
    
    # 2. 调用子图
    subgraph_result = compiled_subgraph.invoke(subgraph_input)
    
    # 3. 将子图输出映射回主图 State
    return {
        "processed_market_data": subgraph_result.get("market_data", {}),
        "errors": subgraph_result.get("validation_errors", [])
    }

main_graph = StateGraph(MainState)
main_graph.add_node("market_data_pipeline", market_data_wrapper)
main_graph.set_entry_point("market_data_pipeline")
main_graph.set_finish_point("market_data_pipeline")
compiled_main = main_graph.compile()

要点

  • 使用 wrapper node 方式集成子图——这是最稳定的跨版本做法
  • wrapper 内部显式做 主图 State → 子图 State → 调用 → 子图结果 → 主图 State 的完整映射
  • 子图的错误通过 errors 字段传递回主图,不要被子图吞掉
  • 调试时先打印子图的输入和输出,确认数据在边界没有丢失

五、三种姿势权衡矩阵与数据一致性策略

权衡矩阵

维度姿势 1:State 共享姿势 2:Tool 封装姿势 3:Subgraph 组合
数据新鲜度低(依赖上次 State 更新)高(每次调用拉最新)取决于子图内部实现
API 调用量低(每个品种/轮)高(每个节点/轮)取决于子图内部实现
实现复杂度中(需处理缓存+限流)高(需处理状态映射)
可维护性中(State 膨胀后难管理)高(Tool 独立可测)最高(子图可复用)
适用场景低频数据(日 K、基本面)高频 tick 数据、按需查询复杂数据流水线、多 Agent 复用
核心风险并行分支写入同一 key 时 reducer 行为不确定API 限流 + 节点间数据不一致状态映射错误导致数据丢失

数据一致性策略

策略实现方式适用场景
同一轮固定快照一个节点调 API 后将数据存入 State,所有节点读同一份 State低频数据、需要所有节点基于同一价格做判断
按需获取 + TTL 缓存Tool 内部缓存(TTL 5-30s),同一缓存窗口内返回相同数据高频数据、对一致性要求不极端
多 Agent 标准化Subgraph/wrapper 将行情数据获取和预处理集中管理,所有 Agent 调同一子图多个 Agent 需要相同数据格式

选型指南

你的场景是什么?

低频数据(日 K 线、基本面)→ 姿势 1:State 共享
高频数据(ticker、盘口)且节点数 ≤ 3 → 姿势 2:Tool 封装(加 TTL 缓存)
高频数据且节点数 > 3 或需要多 Agent 复用 → 姿势 3:Subgraph 组合

六、完整示例:盘前-盘中-盘后多 Agent 协同

以下是一个结合三种姿势的完整 LangGraph Agent,模拟“盘前分析→盘中监控→盘后报告”的完整流程。

依赖安装(基于 langgraph 0.2.x 测试):

pip install langgraph==0.2.68 langchain-core requests

完整代码

"""
LangGraph 多 Agent 行情接入示例
场景:盘前分析(K线)→ 盘中监控(ticker)→ 盘后报告(估值)
覆盖姿势 1(State 共享日 K)+ 姿势 2(Tool 封装 ticker)+ 姿势 3(Subgraph 预处理)
"""

import os, time, logging
from typing import TypedDict
from decimal import Decimal
import requests
from langgraph.graph import StateGraph

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
logger = logging.getLogger("MarketAgent")

API_KEY = os.getenv("TICKDB_API_KEY", "")
BASE_URL = "https://api.tickdb.ai/v1"

if not API_KEY:
    raise EnvironmentError("请设置环境变量 TICKDB_API_KEY")

# ============================================================
# 1. 带缓存和限流处理的 HTTP 客户端
# ============================================================
class APIClient:
    def __init__(self):
        self._cache = {}
        self._cache_ttl = 5
    
    def _call(self, endpoint: str, params: dict):
        url = f"{BASE_URL}{endpoint}"
        headers = {"X-API-Key": API_KEY}
        for attempt in range(3):
            try:
                resp = requests.get(url, params=params, headers=headers, timeout=10)
                if resp.status_code == 429:
                    retry_after = resp.headers.get("Retry-After", str(2 ** attempt))
                    try:
                        wait = int(retry_after)
                    except ValueError:
                        wait = 2 ** attempt
                    if attempt < 2:
                        logger.warning(f"HTTP 429 等待{wait}s")
                        time.sleep(wait); continue
                    return None
                data = resp.json()
                if data["code"] == 0:
                    return data
                if data["code"] == 3001:
                    wait = int(resp.headers.get("Retry-After", 2 ** attempt))
                    logger.warning(f"限流(3001) 等待{wait}s")
                    time.sleep(wait); continue
                if data["code"] == 1001:
                    raise PermissionError("API Key 无效(1001)")
                if data["code"] == 1002:
                    raise PermissionError("未提供 API Key(1002)")
                if data["code"] == 1004:
                    raise PermissionError("API Key 权限不足(1004)")
                return None
            except (requests.Timeout, requests.ConnectionError) as e:
                logger.warning(f"网络错误 {e}")
                if attempt < 2:
                    time.sleep(2 ** attempt); continue
                return None
        return None
    
    def get_kline(self, symbol: str, interval: str = "1d", limit: int = 20):
        cache_key = f"kline:{symbol}:{interval}:{limit}"
        now = time.time()
        if cache_key in self._cache:
            ts, val = self._cache[cache_key]
            if now - ts < self._cache_ttl:
                return val
        result = self._call("/market/kline", {"symbol": symbol, "interval": interval, "limit": limit})
        self._cache[cache_key] = (now, result)
        return result
    
    def get_ticker(self, symbols: str):
        return self._call("/market/ticker", {"symbols": symbols})
    
    def get_calc_index(self, symbols: str):
        # 注意:REST 估值端点是 /market/calc-index,不是 /market/market-metrics
        return self._call("/market/calc-index", {"symbols": symbols})

client = APIClient()

# ============================================================
# 2. 姿势 3:Subgraph 数据预处理(wrapper node 集成)
# ============================================================
class PreprocessState(TypedDict):
    raw_kline: dict
    cleaned_kline: dict

def fetch_kline(state: PreprocessState):
    data = client.get_kline("600519.SH", "1d", 20)
    return {"raw_kline": data}

def clean_kline(state: PreprocessState):
    raw = state.get("raw_kline", {})
    if not raw or raw.get("code") != 0:
        return {"cleaned_kline": {}}
    klines = raw["data"]["klines"]
    cleaned = []
    for k in klines:
        cleaned.append({
            "time": k["time"],
            "close": str(Decimal(k["close"])),
            "volume": str(Decimal(k["volume"]))
        })
    return {"cleaned_kline": {"klines": cleaned, "count": len(cleaned)}}

preprocess_graph = StateGraph(PreprocessState)
preprocess_graph.add_node("fetch", fetch_kline)
preprocess_graph.add_node("clean", clean_kline)
preprocess_graph.add_edge("fetch", "clean")
preprocess_graph.set_entry_point("fetch")
preprocess_graph.set_finish_point("clean")
compiled_preprocess = preprocess_graph.compile()

# ============================================================
# 3. 主图 State 定义
# ============================================================
class MainState(TypedDict):
    kline_data: dict        # 姿势 1:State 共享(盘前 K 线)
    ticker_result: dict     # 姿势 2:Tool 输出(盘中 ticker)
    calc_index_result: dict # 姿势 2:Tool 输出(盘后估值)
    report: str             # 最终报告

# ============================================================
# 4. 节点定义
# ============================================================
def pre_market_node(state: MainState):
    """盘前分析:拉日 K 线,用 Subgraph 预处理后写入 State(姿势 3)"""
    result = compiled_preprocess.invoke({"raw_kline": {}, "cleaned_kline": {}})
    kline_data = result.get("cleaned_kline", {})
    if kline_data and kline_data.get("klines"):
        closes = [Decimal(k["close"]) for k in kline_data["klines"]]
        if len(closes) >= 10:
            ma5 = sum(closes[-5:]) / 5
            ma10 = sum(closes[-10:]) / 10
            trend_label = "up" if ma5 > ma10 else "down"
            state["kline_data"] = {"ma5": str(ma5), "ma10": str(ma10), "trend": trend_label}
    return state

def intraday_node(state: MainState):
    """盘中监控:调 Tool 获取实时 ticker(姿势 2)"""
    ticker = client.get_ticker("600519.SH,700.HK,AAPL.US")
    if ticker and ticker.get("code") == 0:
        result = []
        for item in ticker["data"]:
            result.append({
                "symbol": item["symbol"],
                "last_price": item["last_price"],
                "chg": item.get("price_change_percent_24h", "N/A")
            })
        state["ticker_result"] = {"monitored": result}
    return state

def post_market_node(state: MainState):
    """盘后报告:调 Tool 获取估值指标(姿势 2)"""
    calc_index = client.get_calc_index("600519.SH,700.HK")
    if calc_index and calc_index.get("code") == 0:
        state["calc_index_result"] = {"data": calc_index["data"]}
    return state

def reporter_node(state: MainState):
    """生成报告:读取 State 中三个时段的数据"""
    lines = ["# 盘前-盘中-盘后 报告", ""]
    
    kline = state.get("kline_data", {})
    lines.append(f"## 盘前\nMA5: {kline.get('ma5', 'N/A')}, MA10: {kline.get('ma10', 'N/A')}, 趋势标签: {kline.get('trend', 'N/A')}")
    
    ticker = state.get("ticker_result", {}).get("monitored", [])
    lines.append("\n## 盘中监控")
    for t in ticker:
        lines.append(f"- {t['symbol']}: {t['last_price']} ({t['chg']}%)")
    
    calc_index = state.get("calc_index_result", {}).get("data", [])
    lines.append("\n## 盘后估值")
    for m in calc_index:
        lines.append(f"- {m['symbol']}: PE={m.get('pe_ttm_ratio', 'N/A')}, PB={m.get('pb_ratio', 'N/A')}")
    
    state["report"] = "\n".join(lines)
    return state

# ============================================================
# 5. 构建主图
# ============================================================
main_graph = StateGraph(MainState)
main_graph.add_node("pre_market", pre_market_node)
main_graph.add_node("intraday", intraday_node)
main_graph.add_node("post_market", post_market_node)
main_graph.add_node("reporter", reporter_node)

main_graph.set_entry_point("pre_market")
main_graph.add_edge("pre_market", "intraday")
main_graph.add_edge("intraday", "post_market")
main_graph.add_edge("post_market", "reporter")
main_graph.set_finish_point("reporter")

compiled_main = main_graph.compile()

# ============================================================
# 6. main
# ============================================================
if __name__ == "__main__":
    result = compiled_main.invoke({
        "kline_data": {},
        "ticker_result": {},
        "calc_index_result": {},
        "report": ""
    })
    print(result.get("report", "报告生成失败"))

代码解读:行情数据在三种姿势下的时效性管理——盘前用姿势 3(Subgraph 预处理日 K),盘中和盘后用姿势 2(Tool 获取最新 ticker 和估值),盘前预处理后的日 K 通过姿势 1(State)在全图共享。APIClient 的缓存和限流处理(循环重试,最大 3 次)是 Tool 模式在工程中的必备组件。Subgraph 通过 compiled_preprocess.invoke() 显式集成到 pre_market_node 中,而非未接入的占位代码。

运行输出示例(脱敏):

# 盘前-盘中-盘后 报告

## 盘前
MA5: 1821.50, MA10: 1815.30, 趋势标签: up

## 盘中监控
- 600519.SH: 1823.50 (0.15%)
- 700.HK: 455.20 (-0.82%)
- AAPL.US: 198.45 (0.33%)

## 盘后估值
- 600519.SH: PE=28.5, PB=6.2
- 700.HK: PE=22.1, PB=4.8

七、FAQ

Q1:什么时候该用 State 共享,什么时候该用 Tool?

一个简单的判断标准:如果数据在 Agent 执行期间不会变化(如日 K 线、基本面),用 State 共享。如果数据会变化(如 ticker、盘口),用 Tool 封装。State 共享时在数据中附带 timestamp,Tool 内部加缓存,减少 API 调用。并行分支写入同一 key 时需自定义 reducer。

Q2:三种姿势能在同一个 Agent 中混用吗?

能。第六章的完整示例就是混用——盘前用姿势 3(Subgraph 预处理日 K)→ 姿势 1(State 共享结果),盘中用姿势 2(Tool 拉 ticker)。关键是明确每种数据的更新频率和容忍度。

Q3:LangGraph 的 State 和 Tool 与 TickDB API 的交互需要注意什么?

  • State 中存储的行情数据要附带 timestamp,下游节点检查时效
  • Tool 内部必须处理 3001(限流循环重试)、1001/1002/1004(鉴权阻断)、HTTP 429、超时、JSON 解析失败
  • 不要把所有 TickDB 数据都塞进 State——高频数据应通过 Tool 按需获取
  • 子图内部错误(如 API 超时)要通过 State 字段传递回主图
  • 注意 REST 端点与 MCP 工具名的区别(见附录对照表)

Q4:TickDB 的 REST 端点和 MCP 工具名怎么对应?

见附录对照表。关键区别:REST 估值端点是 /v1/market/calc-index,MCP 估值工具叫 get_market_metrics;REST ticker 端点是 /v1/market/ticker,MCP ticker 工具叫 get_ticker


附录:TickDB REST 端点与 MCP 工具对照表

功能REST 端点MCP 工具名
实时行情快照GET /v1/market/tickerget_ticker
历史 K 线GET /v1/market/klineget_kline
最近 K 线GET /v1/market/kline/latestget_kline_latest
盘口深度GET /v1/market/depthget_order_book
成交明细GET /v1/market/tradesget_recent_trades
估值指标GET /v1/market/calc-indexget_market_metrics
资金流向GET /v1/market/capital-flowget_capital_flow
品种列表GET /v1/symbols/availableget_available_symbols
分时数据GET /v1/market/intradayget_intraday
股票信息GET /v1/market/stock-infoget_stock_info
交易日历GET /v1/market/trade-daysget_trade_days
交易时段GET /v1/market/trading-sessionsget_trading_sessions
K 线周期GET /v1/market/kline-intervalsget_kline_intervals

注意:REST 端点和 MCP 工具名不一致的情况(如估值:REST 用 calc-index,MCP 用 get_market_metrics),在代码中混用时需要特别注意。本文第六章使用 REST API,因此用 /market/calc-index,而非 get_market_metrics


📡 本文行情数据由 TickDB.ai 提供

  • API:https://api.tickdb.ai
  • 文档:https://docs.tickdb.ai

本文仅讨论 Agent 框架的工程实现,不构成任何投资建议。文中所有策略示例、趋势标签、均线数值仅用于演示数据接入和代码验证流程,不代表任何交易信号。

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章