跳转到主要内容

“恢复”意味着什么

成功登录后,服务器在login_ok中返回一个resume块:
{
  "resume": {
    "serverEpoch": "0804ab61513c4681a3afd8afc1fb2f75",
    "resumeWindowMs": 3000,
    "replayChannels": ["fixtures", "scores"],
    "serverEntryIds": {
      "fixtures": "1766414833582-2542",
      "scores": "1766418736962-198"
    }
  }
}
这告诉您:
  • serverEpoch — 标识当前网关会话(重启时更改)
  • resumeWindowMs — 可重放数据缓冲的时间长度
  • replayChannels — 符合重放条件的频道
  • serverEntryIds — 服务器每个频道的最新游标

客户端必须持久化的内容

为安全恢复,请持久化:
  1. serverEpoch
  2. 每个频道您处理的最新entryId
{
  "scores": "1766418736962-198",
  "fixtures": "1766414833582-2542"
}

entryId格式

每条流式消息包含一个entryId
<ts_ms>-<seq>
  • ts_ms — 服务器时间戳(UTC,毫秒)
  • seq — 每个频道的单调序列
重要:
  • entryId是一个游标,不是交付保证
  • 间隙是正常的(上游行为、合并、背压)

恢复登录示例

重新连接时,发送相同的serverEpoch和您存储的游标:
{
  "type": "login",
  "apiKey": "YOUR_API_KEY",
  "lang": "zh",
  "channels": ["fixtures", "scores", "odds"],
  "serverEpoch": "0804ab61513c4681a3afd8afc1fb2f75",
  "lastSeenId": {
    "scores": "1766418736962-198"
  }
}
如果重放成功,服务器发送:
{
  "type": "resume_complete",
  "serverEpoch": "0804ab61513c4681a3afd8afc1fb2f75"
}

snapshot_required(当重放不可能时)

有时无法安全地进行重放。在这种情况下,服务器发送:
{
  "type": "snapshot_required",
  "reason": "resume_window_exceeded",
  "channels": ["scores"],
  "serverEpoch": "0804ab61513c4681a3afd8afc1fb2f75",
  "resumeWindowMs": 3000,
  "serverEntryIds": {
    "scores": "1766418738000-220"
  }
}

可能的原因

  • server_restarted — 网关重启,游标无效
  • resume_window_exceeded — 您的游标比重放缓冲区更旧
  • client_backpressure — 您的客户端无法足够快地消费重放

重要细节

重放资格取决于游标年龄,而不是断开连接持续时间:
(now_ms - last_seen_entry_ts_ms) > resumeWindowMs
⇒ snapshot_required可能
如果您最后处理的消息已经很旧,即使非常短的断开连接也可能超过窗口。

客户端应如何处理snapshot_required

当您收到snapshot_required时:
  1. 通过REST为列出的频道获取新快照 (例如/fixtures/fixtures/odds/futures
  2. 清除这些频道的lastSeenId
  3. 继续处理实时更新
网关在snapshot_required后继续流式传输。 此消息是您的信号,表明本地状态必须重建

Python重新连接示例(完整模板)

此示例:
  • 持久化serverEpoch和每个频道的lastSeenId
  • 仅为可重放频道发送游标
  • 处理snapshot_required
  • 失败时自动重新连接
import asyncio
import json
import time
import websockets

WS_URL = "wss://v5.oddspapi.io/ws"
API_KEY = "YOUR_API_KEY"

server_epoch = None
replay_channels = None
last_seen = {}

async def run_once():
    global server_epoch, replay_channels, last_seen

    async with websockets.connect(
        WS_URL,
        ping_interval=20,
        ping_timeout=20,
        max_size=4194304
    ) as ws:
        login = {
            "type": "login",
            "apiKey": API_KEY,
            "channels": ["fixtures", "scores", "odds"],
            "receiveType": "json",
            "lang": "zh",
        }

        # 恢复模式
        if server_epoch:
            login["serverEpoch"] = server_epoch

        # 仅为可重放频道发送游标
        if replay_channels:
            cursors = {
                ch: eid
                for ch, eid in last_seen.items()
                if ch in replay_channels
            }
        else:
            cursors = dict(last_seen)

        if cursors:
            login["lastSeenId"] = cursors

        await ws.send(json.dumps(login))

        async for raw in ws:
            if isinstance(raw, (bytes, bytearray)):
                raw = raw.decode("utf-8", errors="replace")

            msg = json.loads(raw)
            msg_type = msg.get("type")

            if msg_type == "login_ok":
                resume = msg.get("resume") or {}
                server_epoch = resume.get("serverEpoch") or server_epoch

                rc = resume.get("replayChannels")
                if isinstance(rc, list):
                    replay_channels = set(map(str, rc))

                continue

            if msg_type == "snapshot_required":
                channels = msg.get("channels") or []
                for ch in channels:
                    last_seen.pop(ch, None)

                # 在此触发REST快照刷新
                print("需要快照:", channels)
                continue

            if msg_type == "resume_complete":
                print("恢复完成")
                continue

            # 数据消息
            channel = msg.get("channel")
            entry_id = msg.get("entryId")

            if isinstance(channel, str) and isinstance(entry_id, str):
                last_seen[channel] = entry_id

async def main():
    while True:
        try:
            await run_once()
        except Exception as e:
            print("断开连接:", e)
            await asyncio.sleep(1)

asyncio.run(main())