import asyncio
import json
import time
import websockets
WS_URL = "wss://v5.oddspapi.io/ws"
API_KEY = "YOUR_API_KEY"
server_epoch = None
replay_channels = None
last_seen = {}
async def run_once():
global server_epoch, replay_channels, last_seen
async with websockets.connect(
WS_URL,
ping_interval=20,
ping_timeout=20,
max_size=4194304
) as ws:
login = {
"type": "login",
"apiKey": API_KEY,
"channels": ["fixtures", "scores", "odds"],
"receiveType": "json",
"lang": "zh",
}
# 恢复模式
if server_epoch:
login["serverEpoch"] = server_epoch
# 仅为可重放频道发送游标
if replay_channels:
cursors = {
ch: eid
for ch, eid in last_seen.items()
if ch in replay_channels
}
else:
cursors = dict(last_seen)
if cursors:
login["lastSeenId"] = cursors
await ws.send(json.dumps(login))
async for raw in ws:
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8", errors="replace")
msg = json.loads(raw)
msg_type = msg.get("type")
if msg_type == "login_ok":
resume = msg.get("resume") or {}
server_epoch = resume.get("serverEpoch") or server_epoch
rc = resume.get("replayChannels")
if isinstance(rc, list):
replay_channels = set(map(str, rc))
continue
if msg_type == "snapshot_required":
channels = msg.get("channels") or []
for ch in channels:
last_seen.pop(ch, None)
# 在此触发REST快照刷新
print("需要快照:", channels)
continue
if msg_type == "resume_complete":
print("恢复完成")
continue
# 数据消息
channel = msg.get("channel")
entry_id = msg.get("entryId")
if isinstance(channel, str) and isinstance(entry_id, str):
last_seen[channel] = entry_id
async def main():
while True:
try:
await run_once()
except Exception as e:
print("断开连接:", e)
await asyncio.sleep(1)
asyncio.run(main())