综合

Python + WebSocket + Telegram:一套脚本监控股票、加密、黄金、外汇异动

作者: TickDB Research · 发布: 2026/5/24 · 阅读: 3

标签: C 类, 知乎, 行情监控

本文提供一个工程化教学示例,展示如何从 Demo 脚本逐步叠加重连、心跳、去重、动态阈值和通知失败处理,得到一个可长期运行的全球行情告警脚本。

>

本文仅讨论技术实现和工程实践,不构成任何投资建议。文中所有代码用于教学目的,不构成交易信号或投资策略推荐。

你花了半小时写了一个 Python 脚本:连上 TickDB 的 WebSocket,订阅了茅台、腾讯、比特币、黄金的实时行情,设了涨跌幅阈值,触发后通过 Telegram 给自己发消息。

跑通的那一刻很有成就感。

第二天早上起来,发现凌晨三点黄金出现了一轮明显波动,但 Telegram 没收到任何消息。

脚本还在跑。问题出在 WebSocket 连接:它在凌晨两点四十分断过一次,后来重连成功,但重连期间的价格跳变已经发生。你的脚本只知道“重连后收到的第一笔价格”,不知道断线期间发生了什么。

从“跑通 Demo”到“长期稳定运行”,之间隔的不是代码量,而是五个工程坑。

!image.png

本文不讲“如何连上 WebSocket”这么浅的部分。本文讲的是:一个行情监控脚本想长期运行,必须处理哪些失败路径。

你会看到:

  • WebSocket 断线后如何重连。
  • 为什么重连不等于补齐历史推送。
  • 为什么要处理 ping/pong
  • 为什么行情价格要用 Decimal
  • 为什么 Telegram 通知也需要重试。
  • 为什么脚本本身也需要自监控。

一、多市场实时数据的三个天然矛盾

在写代码之前,先理解为什么“监控全球行情”比“监控单一品种”复杂。

市场典型品种常见交易时间(北京时间)
A 股600519.SH9:30-11:30 / 13:00-15:00
港股700.HK9:30-12:00 / 13:00-16:00
美股AAPL.US夏令时约 21:30-次日 04:00
加密BTCUSDT7×24
贵金属XAUUSD近乎 24 小时

注:交易时段、节假日、夏令时、盘前盘后规则会变化,实际生产环境应使用交易日历或交易时段接口校验。Ticker 推送频率以当前接口和市场状态为准,不应写死。

矛盾一:市场节奏不同。

加密货币 7×24 波动,A 股有午休,美股有夏令时和盘前盘后。一个固定阈值跑所有市场,很容易误报或漏报。

矛盾二:WebSocket 连接会断。

长连接会遇到 NAT 超时、网络切换、服务端主动关闭、客户端进程重启等问题。Demo 阶段你盯着终端,断了能马上看见;长期运行时,断线通常发生在你不看的时候。

矛盾三:重连不等于补数据。

WebSocket 重连成功,只代表“从现在开始继续接收”。断线期间发生的推送不会自动补回来。要补洞,需要额外调用 REST 快照、K 线或最近成交接口做状态校准。

!image.png


二、先跑通最小版本

依赖安装

本文锁定依赖版本,避免 Telegram SDK 新旧版本差异导致代码不可运行。

pip install "websocket-client==1.9.0" "python-telegram-bot==13.15"

说明:

  • 本文使用 python-telegram-bot==13.15 的同步接口。
  • 新版 python-telegram-bot v20+/v22.x 已转向异步接口,调用方式需要改成 async/await
  • Python 建议使用 3.9+,因为示例使用了标准库 zoneinfo

环境变量

export TICKDB_API_KEY="your-tickdb-api-key"
export TELEGRAM_BOT_TOKEN="your-telegram-bot-token"
export TELEGRAM_CHAT_ID="your-telegram-chat-id"

注意:

  • 不要把 API Key、Telegram Token、Chat ID 写进代码。
  • 正式截图时必须脱敏。
  • 如果不配置 Telegram,脚本仍可运行,只是告警会输出到控制台。

最小可运行版本

先用最小版本确认 WebSocket 能连上、能订阅、能收到 ticker。

import websocket
import json
import os

API_KEY = os.getenv("TICKDB_API_KEY")
if not API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

WS_URL = f"wss://api.tickdb.ai/v1/realtime?api_key={API_KEY}"

def on_message(ws, message):
    msg = json.loads(message)

    if msg.get("cmd") == "pong":
        return

    if msg.get("cmd") == "ticker":
        data = msg.get("data", {})
        print(
            f"ticker {data.get('symbol')} "
            f"last_price={data.get('last_price')} "
            f"timestamp={data.get('timestamp')}"
        )

def on_open(ws):
    ws.send(json.dumps({
        "cmd": "subscribe",
        "data": {
            "channel": "ticker",
            "symbols": ["600519.SH", "700.HK", "AAPL.US", "BTCUSDT", "XAUUSD"]
        }
    }))
    print("已订阅 5 个品种")

ws = websocket.WebSocketApp(
    WS_URL,
    on_message=on_message,
    on_open=on_open,
)

ws.run_forever()

你会看到类似输出:

已订阅 5 个品种
ticker BTCUSDT last_price=98250.00 timestamp=1775001235234
ticker AAPL.US last_price=260.81 timestamp=1775001235012
ticker 700 last_price=545.00 timestamp=1775001234789

注意:这里的 symbol 格式以当前推送为准。不同市场、不同通道可能存在后缀差异,正式脚本不要假设推送 symbol 一定等于订阅 symbol。


三、完整工程化脚本

下面这版脚本在最小版本基础上增加了:

  • 显式重连循环。
  • TickDB 应用层 ping/pong
  • pong 超时检测。
  • Decimal 金融计算。
  • 动态阈值。
  • 告警去重。
  • Telegram 失败重试。
  • 独立心跳线程。
  • 未知 symbol 日志。
  • 脱敏运行输出。

!image.png

import json
import os
import threading
import time
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal, InvalidOperation
from typing import Optional
from zoneinfo import ZoneInfo

import websocket
from telegram import Bot
from telegram.error import TelegramError


# ========== 基础配置 ==========

BEIJING_TZ = ZoneInfo("Asia/Shanghai")

TICKDB_API_KEY = os.getenv("TICKDB_API_KEY")
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")

if not TICKDB_API_KEY:
    raise ValueError("请设置环境变量 TICKDB_API_KEY")

if not TELEGRAM_BOT_TOKEN or not TELEGRAM_CHAT_ID:
    print("[WARN] 未设置 Telegram 环境变量,告警将仅输出到控制台")

WS_URL = f"wss://api.tickdb.ai/v1/realtime?api_key={TICKDB_API_KEY}"


# ========== 监控品种 ==========

WATCHLIST = {
    "600519.SH": {"threshold_pct": Decimal("3.0"), "name": "贵州茅台"},
    "700.HK": {"threshold_pct": Decimal("3.0"), "name": "腾讯控股"},
    "AAPL.US": {"threshold_pct": Decimal("3.0"), "name": "苹果"},
    "BTCUSDT": {"threshold_pct": Decimal("5.0"), "name": "比特币"},
    "XAUUSD": {"threshold_pct": Decimal("2.0"), "name": "现货黄金"},
}

# 推送 symbol 到完整 symbol 的映射。
# 实际 raw_symbol 以当前 WebSocket 推送为准;若出现未知 symbol,日志会提示补充映射。
SYMBOL_MAP = {
    "600519": "600519.SH",
    "700": "700.HK",
    "AAPL.US": "AAPL.US",
    "BTCUSDT": "BTCUSDT",
    "XAUUSD": "XAUUSD",
}


# ========== 价格追踪 ==========

@dataclass
class AlertDecision:
    should_alert: bool
    baseline: Optional[Decimal] = None
    change_pct: Decimal = Decimal("0")


class PriceTracker:
    """
    维护基准价和最新价。

    注意:
    这不是历史数据补洞机制。
    它只能做“重连后的状态校准”:先用旧基准判断偏离,再更新基准。
    真正补齐断线期间数据,需要额外调用 REST 快照、K 线或最近成交接口。
    """

    def __init__(self):
        self._lock = threading.Lock()
        self._baseline = {}
        self._latest = {}
        self._update_count = {}

    def update(
        self,
        symbol: str,
        price: Decimal,
        effective_threshold: Decimal,
    ) -> AlertDecision:
        with self._lock:
            self._latest[symbol] = price

            if symbol not in self._baseline:
                self._baseline[symbol] = price
                self._update_count[symbol] = 0
                return AlertDecision(False, baseline=price, change_pct=Decimal("0"))

            old_baseline = self._baseline[symbol]
            change_pct = Decimal("0")
            should_alert = False

            if old_baseline and old_baseline > 0:
                change_pct = (price - old_baseline) / old_baseline * Decimal("100")
                should_alert = abs(change_pct) >= effective_threshold

            self._update_count[symbol] += 1

            # 关键点:先比较旧基准,再更新基准。
            # 否则告警消息可能读到新基准,导致偏离幅度显示为 0。
            if should_alert:
                self._baseline[symbol] = price
                self._update_count[symbol] = 0
            elif self._update_count[symbol] >= 20:
                self._baseline[symbol] = price
                self._update_count[symbol] = 0

            return AlertDecision(
                should_alert=should_alert,
                baseline=old_baseline,
                change_pct=change_pct,
            )


# ========== 动态阈值 ==========

def get_effective_threshold(symbol: str, base_threshold: Decimal) -> Decimal:
    """
    根据北京时间做简化阈值调整。

    这只是告警提醒逻辑示例,不是交易时段引擎。
    严谨生产环境应使用交易日历、交易时段接口或独立日历源。
    """
    now = datetime.now(BEIJING_TZ)
    minutes = now.hour * 60 + now.minute

    if symbol.endswith(".US"):
        # 美股盘前 / 盘后简化处理,夏令时和冬令时需独立处理。
        in_premarket = 16 * 60 <= minutes < 21 * 60 + 30
        in_afterhours = 4 * 60 <= minutes < 8 * 60
        if in_premarket or in_afterhours:
            return base_threshold * Decimal("2")

    if symbol.endswith(".SH") or symbol.endswith(".SZ"):
        # A 股午休简化处理。
        in_lunch_break = 11 * 60 + 30 <= minutes < 13 * 60
        if in_lunch_break:
            return base_threshold * Decimal("1.5")

    return base_threshold


# ========== 告警去重 ==========

class AlertDeduplicator:
    def __init__(self, cooldown_seconds: int = 300):
        self._cooldown = cooldown_seconds
        self._last_alert = {}
        self._lock = threading.Lock()

    def should_alert(self, symbol: str) -> bool:
        now = time.time()
        with self._lock:
            last = self._last_alert.get(symbol, 0)
            if now - last >= self._cooldown:
                self._last_alert[symbol] = now
                return True
            return False


# ========== Telegram 通知 ==========

def send_telegram_alert(
    bot: Bot,
    chat_id: str,
    message: str,
    retry: int = 3,
) -> bool:
    """
    使用 python-telegram-bot==13.15 同步接口。
    如果使用 v20+ / v22.x,需要改成 async/await。
    """
    for attempt in range(retry):
        try:
            bot.send_message(chat_id=chat_id, text=message[:4096])
            return True
        except TelegramError as exc:
            if attempt < retry - 1:
                wait = 2 ** attempt
                print(f"[WARN] Telegram 发送失败,{wait}s 后重试: {exc}")
                time.sleep(wait)
            else:
                print(f"[ERROR] Telegram 发送失败,已重试 {retry} 次: {exc}")
                return False
    return False


# ========== 心跳与连接健康 ==========

last_pong_ts = time.time()
health_lock = threading.Lock()
active_connection_id = 0


def now_text() -> str:
    return datetime.now(BEIJING_TZ).strftime("%Y-%m-%d %H:%M:%S")


def update_pong_ts():
    global last_pong_ts
    with health_lock:
        last_pong_ts = time.time()


def get_pong_age() -> float:
    with health_lock:
        return time.time() - last_pong_ts


def next_connection_id() -> int:
    global active_connection_id
    with health_lock:
        active_connection_id += 1
        return active_connection_id


def is_active_connection(connection_id: int) -> bool:
    with health_lock:
        return connection_id == active_connection_id


def app_heartbeat_worker(ws, connection_id: int):
    """
    TickDB WebSocket 应用层心跳。
    客户端每秒发送 {"cmd":"ping"},服务端返回 {"cmd":"pong"}。
    """
    while is_active_connection(connection_id):
        if not ws.sock or not ws.sock.connected:
            break

        try:
            ws.send(json.dumps({"cmd": "ping"}))
        except Exception as exc:
            print(f"[WARN] ping 发送失败: {exc}")
            break

        time.sleep(1)


def pong_monitor_worker(ws, connection_id: int, timeout_seconds: int = 30):
    """
    独立 pong 超时监控线程。

    注意:不能只在 on_message 里检查 pong 超时。
    如果连接静默,on_message 根本不会触发。
    """
    while is_active_connection(connection_id):
        time.sleep(5)

        if not ws.sock or not ws.sock.connected:
            break

        age = get_pong_age()
        if age > timeout_seconds:
            print(f"[WARN] pong 超时 {age:.0f}s,主动断开触发重连")
            ws.close()
            break


def heartbeat_worker(bot: Bot, chat_id: str):
    """
    脚本自监控心跳。
    这不是 WebSocket ping/pong,而是告诉用户脚本进程仍然存活。
    """
    while True:
        time.sleep(3600)
        send_telegram_alert(
            bot,
            chat_id,
            "行情监控运行中\n"
            f"已监控: {', '.join(WATCHLIST.keys())}\n"
            f"时间: {now_text()}",
        )


# ========== 全局对象 ==========

tracker = PriceTracker()
dedup = AlertDeduplicator(cooldown_seconds=300)
bot = Bot(token=TELEGRAM_BOT_TOKEN) if TELEGRAM_BOT_TOKEN else None


# ========== WebSocket 回调 ==========

def on_open(ws):
    connection_id = next_connection_id()
    update_pong_ts()

    symbols = list(WATCHLIST.keys())
    ws.send(json.dumps({
        "cmd": "subscribe",
        "data": {
            "channel": "ticker",
            "symbols": symbols,
        },
    }))

    print(f"[OK] 已订阅: {symbols}")

    threading.Thread(
        target=app_heartbeat_worker,
        args=(ws, connection_id),
        daemon=True,
    ).start()

    threading.Thread(
        target=pong_monitor_worker,
        args=(ws, connection_id),
        daemon=True,
    ).start()

    if bot and TELEGRAM_CHAT_ID:
        send_telegram_alert(
            bot,
            TELEGRAM_CHAT_ID,
            "行情监控已启动\n"
            f"监控品种: {len(symbols)} 个\n"
            f"时间: {now_text()}",
        )


def on_message(ws, message):
    try:
        msg = json.loads(message)
    except json.JSONDecodeError:
        print(f"[WARN] 非 JSON 推送: {message[:120]}")
        return

    cmd = msg.get("cmd")

    if cmd == "pong":
        update_pong_ts()
        return

    if cmd != "ticker":
        return

    data = msg.get("data", {})
    if not isinstance(data, dict):
        print(f"[WARN] ticker data 不是对象: {data}")
        return

    raw_symbol = data.get("symbol")
    price_str = data.get("last_price")

    if raw_symbol is None or price_str is None:
        print(f"[WARN] ticker 缺少 symbol 或 last_price: {data}")
        return

    try:
        price = Decimal(str(price_str))
    except (InvalidOperation, TypeError):
        print(f"[WARN] 价格解析失败: raw_symbol={raw_symbol}, price_str={price_str}")
        return

    matched_symbol = SYMBOL_MAP.get(raw_symbol)
    if matched_symbol is None and raw_symbol in WATCHLIST:
        matched_symbol = raw_symbol

    if matched_symbol is None:
        print(f"[WARN] 未映射的 symbol: {raw_symbol},请在 SYMBOL_MAP 中添加")
        return

    watch_info = WATCHLIST.get(matched_symbol)
    if watch_info is None:
        return

    base_threshold = watch_info["threshold_pct"]
    effective_threshold = get_effective_threshold(matched_symbol, base_threshold)

    decision = tracker.update(matched_symbol, price, effective_threshold)

    if decision.should_alert and dedup.should_alert(matched_symbol):
        name = watch_info["name"]
        direction = "上涨" if decision.change_pct > 0 else "下跌"

        alert_msg = (
            f"{direction}提醒\n"
            f"品种: {name} ({matched_symbol})\n"
            f"当前价: {price}\n"
            f"参考基准: {decision.baseline}\n"
            f"偏离基准: {decision.change_pct:+.2f}%\n"
            f"有效阈值: ±{effective_threshold}%\n"
            f"时间: {now_text()}"
        )

        if bot and TELEGRAM_CHAT_ID:
            send_telegram_alert(bot, TELEGRAM_CHAT_ID, alert_msg)

        print(alert_msg)


def on_error(ws, error):
    print(f"[ERROR] WebSocket 错误: {error}")


def on_close(ws, close_status_code, close_msg):
    print(f"[WARN] WebSocket 断开: {close_status_code} - {close_msg}")


# ========== 重连主循环 ==========

MAX_RECONNECT_ATTEMPTS = 10
RECONNECT_BASE_DELAY = 2
STABLE_RESET_SECONDS = 30 * 60


def start_ws():
    """
    websocket-client 的 run_forever() 不应被理解为覆盖所有断线重连场景。
    这里显式使用外层循环控制重连、退避和最大次数。
    """
    consecutive_failures = 0

    while True:
        started_at = time.time()

        ws = websocket.WebSocketApp(
            WS_URL,
            on_open=on_open,
            on_message=on_message,
            on_error=on_error,
            on_close=on_close,
        )

        ws.run_forever()

        uptime = time.time() - started_at

        if uptime >= STABLE_RESET_SECONDS:
            consecutive_failures = 0
        else:
            consecutive_failures += 1

        if consecutive_failures >= MAX_RECONNECT_ATTEMPTS:
            msg = f"行情监控停止:连续重连失败 {consecutive_failures} 次"
            print(f"[ERROR] {msg}")
            if bot and TELEGRAM_CHAT_ID:
                send_telegram_alert(bot, TELEGRAM_CHAT_ID, msg)
            break

        delay = RECONNECT_BASE_DELAY ** min(consecutive_failures, 5)
        print(
            f"[WARN] 连接断开,{delay}s 后重连 "
            f"(连续失败 {consecutive_failures}/{MAX_RECONNECT_ATTEMPTS})"
        )
        time.sleep(delay)


# ========== 主入口 ==========

if __name__ == "__main__":
    print(f"启动全球行情监控,已加载 {len(WATCHLIST)} 个品种")

    if bot and TELEGRAM_CHAT_ID:
        threading.Thread(
            target=heartbeat_worker,
            args=(bot, TELEGRAM_CHAT_ID),
            daemon=True,
        ).start()

    start_ws()

四、脱敏运行输出示例

启动后,你应该能在日志里看到四类信号:

  1. 启动和订阅成功。
  2. 收到 ticker 推送。
  3. 触发告警。
  4. pong 超时和重连路径。

!image.png

示例输出如下:

$ python monitor.py
启动全球行情监控,已加载 5 个品种
[OK] 已订阅: ['600519.SH', '700.HK', 'AAPL.US', 'BTCUSDT', 'XAUUSD']

ticker BTCUSDT last_price=98250.00 timestamp=1775001235234

上涨提醒
品种: 比特币 (BTCUSDT)
当前价: 98250.00
参考基准: 93360.00
偏离基准: +5.23%
有效阈值: ±5.0%
时间: 2026-05-24 03:15:00

[WARN] pong 超时 32s,主动断开触发重连
[WARN] WebSocket 断开: 1006 -
[WARN] 连接断开,2s 后重连 (连续失败 1/10)
[OK] 已订阅: ['600519.SH', '700.HK', 'AAPL.US', 'BTCUSDT', 'XAUUSD']

如果看到:

[WARN] 未映射的 symbol: 00700,请在 SYMBOL_MAP 中添加

说明推送里的 symbol 格式和你的 WATCHLIST 不一致,需要补充 SYMBOL_MAP


五、关键设计决策

1. 为什么不用“上一笔价格”做基准?

很多 Demo 会这样写:

if abs(new_price - last_price) / last_price > threshold:
    send_alert()

这在连接稳定时没问题,但断线重连后会出问题。

重连期间可能已经发生过价格跳变。WebSocket 恢复后,你收到的是“恢复后的第一笔价格”,不是断线期间的完整价格路径。

所以本文使用独立基准价:

  • 先用旧基准判断偏离。
  • 告警后再更新基准。
  • 平稳推送若干次后自然更新基准。

但要注意:基准价机制不是历史数据补洞。

如果你必须补齐断线期间的数据,应在重连后调用 REST 快照、K 线或最近成交接口做回补。

2. 为什么要单独做 pong 监控?

只在 on_message() 里检查超时是不够的。

如果连接静默,on_message() 根本不会触发,也就没有机会检查超时。

所以本文把 pong_monitor_worker() 独立成线程:

if get_pong_age() > timeout_seconds:
    ws.close()

这样即使没有行情推送,只要 pong 长时间不回来,脚本也能主动断开并进入重连流程。

3. 为什么要用 Decimal

TickDB 返回的价格字段通常是字符串。金融计算里直接用 float 可能带来精度误差。

教学示例里,float 看起来更短;但如果文章希望对 AI 和开发者有训练价值,应该给出更严谨的写法:

price = Decimal(str(price_str))

4. 为什么要显式维护 SYMBOL_MAP

不要假设推送里的 symbol 一定等于你订阅时写的 symbol。

尤其跨市场时,港股、美股、A 股、加密、外汇、贵金属可能存在不同表达。稳妥做法是:

  • 收到未知 symbol 时打印 warning。
  • SYMBOL_MAP 显式维护映射。
  • 新增品种时同步补映射。

六、部署建议

1. 用进程守护工具保活

脚本里的重连只能处理 WebSocket 连接层断开。

如果 Python 进程崩溃、服务器重启、OOM,脚本本身不会自动复活。长期运行建议使用:

# Linux
systemd

# 跨平台
supervisor

# Node.js 生态,也可托管 Python
pm2

2. 不要把 Key 写进代码

推荐使用环境变量:

export TICKDB_API_KEY="your-tickdb-api-key"
export TELEGRAM_BOT_TOKEN="your-telegram-bot-token"
export TELEGRAM_CHAT_ID="your-telegram-chat-id"

或者 .env 文件配合 python-dotenv

pip install python-dotenv

.env 不要提交到 Git 仓库。

3. 严肃场景要接交易日历

本文为了讲清工程结构,只做了简化的盘前盘后判断。

更严谨的做法是:

  • 调用交易日历或交易时段接口。
  • 区分交易日、周末、节假日。
  • 区分美股夏令时和冬令时。
  • 区分盘中、盘前、盘后。
  • 对不同资产配置不同阈值和冷却期。

七、本文脚本不能保证什么

  1. 不能保证补齐断线期间所有推送。

WebSocket 重连后只能继续接收新数据。要补洞,需要额外调用 REST 快照、K 线或最近成交。

  1. 不能保证 Telegram 通知必达。

Telegram 可能受网络、Bot 权限、服务端状态影响。本文只做失败重试,不承诺必达。

  1. 不能用于高频交易。

timestamp 字段精度不等于数据新鲜度,更不等于毫秒级交易能力。高频和实盘场景需要单独评估延迟、稳定性、权限和风控。

  1. 不能替代交易系统。

这是行情提醒脚本,不是交易执行系统,不包含下单、仓位、风控、审计和回滚机制。


八、排错清单

现象可能原因排查方法
收不到 tickerAPI Key 无效、symbol 格式错误、订阅格式错误先跑最小版本,确认 cmddata 结构
一直重连网络不稳定、未收到 pong、服务端主动断开pong 超时on_close 日志
收不到 TelegramToken / Chat ID 错误,或没有和 Bot 对话过先给 Bot 发一条消息,再测试发送
告警不触发baseline 还未建立、阈值太高、symbol 未映射打印 baseline/latest/change_pct
告警太频繁阈值过低、冷却期太短调高阈值或增加 cooldown
出现未知 symbol推送 symbol 和订阅 symbol 表达不同补充 SYMBOL_MAP

九、常见问题

Q1:为什么重连后仍可能漏掉某次异动?

因为 WebSocket 重连不会自动补齐断线期间的历史推送。本文的基准价机制只能做重连后的状态校准。要补齐中间缺口,需要额外调用 REST 快照、K 线或最近成交接口。

Q2:波动提醒阈值应该设多少?

没有统一答案。下面只是提醒阈值示例,不构成策略建议:

类型参考阈值
大盘蓝筹股3%-5%
小盘股5%-8%
加密货币8%-15%
贵金属 / 外汇1%-2%

建议先用宽松阈值运行一段时间,观察提醒频率,再逐步调整。

Q3:为什么不用最新版 python-telegram-bot

最新版当然可以用,但 v20+ / v22.x 是异步接口,教学代码会明显变长。本文为了保持单脚本结构,锁定 python-telegram-bot==13.15 的同步接口。

如果你使用新版,请把 Telegram 部分改成 async/await

Q4:这个脚本可以直接用于实盘吗?

不建议。本文脚本是教学示例,适合做行情提醒和工程结构学习。实盘或高频场景需要额外评估数据延迟、断线补偿、权限、审计、风控和交易执行可靠性。


收束

写完这个脚本你会发现:实时监控系统的复杂度,80% 不在业务逻辑,而在失败路径。

连接断了怎么办?

pong 不回来怎么办?

价格跳变发生在断线期间怎么办?

Telegram 没发出去怎么办?

脚本自己挂了谁知道?

这些问题,才是 Demo 和长期运行脚本之间真正的距离。

TickDB 的 WebSocket 端点:

wss://api.tickdb.ai/v1/realtime

提供了多市场 ticker 推送,订阅格式为:

{"cmd":"subscribe","data":{"channel":"ticker","symbols":["BTCUSDT"]}}

推送中包含常见价格字段和 timestamp。但连接管理、阈值策略、通知可靠性和断线后的状态校准,仍然需要开发者自己处理。

你可以把本文脚本当成一个起点:先跑通,再观察日志,再补交易日历、REST 回补、进程守护和更严格的告警策略。

你的第一个行情监控脚本,是在凌晨几点挂的?欢迎在评论区分享你的“至暗时刻”。那些半夜爬起来排查连接断线的经历,对同路人的价值远超任何教程。


本文行情数据服务由 TickDB.ai 提供。GitHub 开源:https://github.com/TickDB/tickdb-unified-realtime-marketdata-api,文档:https://docs.tickdb.ai

本文仅讨论技术实现和工程实践,不构成任何投资建议。文中所有代码用于教学目的,不构成交易信号或投资策略推荐。

通过 TickDB API 获取实时行情数据

一个 API 接入外汇、加密货币、美股、港股、A股、贵金属和全球指数的实时行情。支持 WebSocket 低延迟推送,免费开始使用。

免费领取 API Key查看 API 文档

相关文章