You want your model to see a score change, run inference, compare to Polymarket's current ask, and either signal or skip — all within one second. Building this requires merging two asynchronous streams (game state from ESPN; market prices from Polymarket), handling failures gracefully, and minimizing latency end-to-end.
This post walks through a production architecture in Python: what to poll, what to stream over WebSocket, how to fuse the streams, where to store state, and how to expose the merged signal via FastAPI. It's based on the pipeline that powers a production sports prediction system serving calibrated probabilities for 11 sports.
High-level flow:
Two independent async tasks feed a fusion layer. The fusion layer runs model inference when either stream changes, computes edge, and emits signals downstream. FastAPI serves both client queries and internal webhooks.
ESPN's public site API returns a full scoreboard per sport at endpoints like site.api.espn.com/apis/site/v2/sports/basketball/nba/scoreboard. It's not WebSocket, so you poll. The trick: poll fast for in-progress games, slow otherwise, and detect score changes with minimal parsing overhead.
import asyncio
import aiohttp
from typing import Dict, Optional
from dataclasses import dataclass, field
@dataclass
class GameState:
game_id: str
home: str
away: str
home_score: int
away_score: int
period: int
clock_remaining_s: float
is_live: bool
last_update_ts: float = 0.0
class ESPNTracker:
"""Polls ESPN scoreboard. Fires callbacks on score changes."""
SPORT_URLS = {
"NBA": "basketball/nba",
"NHL": "hockey/nhl",
"MLB": "baseball/mlb",
"NCAAMB": "basketball/mens-college-basketball",
}
def __init__(self, sports: list[str], fast_s: float = 5.0, slow_s: float = 15.0):
self.sports = sports
self.fast_s = fast_s
self.slow_s = slow_s
self.games: Dict[str, GameState] = {}
self._on_change = []
def on_score_change(self, cb):
self._on_change.append(cb)
async def run(self):
async with aiohttp.ClientSession() as session:
while True:
live_count = 0
for sport in self.sports:
try:
live_count += await self._poll_sport(session, sport)
except Exception as e:
logger.warning(f"ESPN poll failed for {sport}: {e}")
interval = self.fast_s if live_count > 0 else self.slow_s
await asyncio.sleep(interval)
async def _poll_sport(self, session, sport: str) -> int:
path = self.SPORT_URLS[sport]
url = f"https://site.api.espn.com/apis/site/v2/sports/{path}/scoreboard"
async with session.get(url, timeout=8) as resp:
data = await resp.json()
live = 0
for ev in data.get("events", []):
gs = self._parse_event(ev, sport)
if gs is None:
continue
prev = self.games.get(gs.game_id)
self.games[gs.game_id] = gs
if gs.is_live:
live += 1
if prev is None or prev.home_score != gs.home_score or prev.away_score != gs.away_score:
for cb in self._on_change:
cb(prev, gs)
return live
Rate limiting: ESPN doesn't publish rate limits, but sustained polling faster than every 2-3s per sport will eventually trigger soft-throttling (slower responses). Target 5s intervals when games are live; 15-30s between polls when quiet.
Polymarket's CLOB supports WebSocket subscriptions on specific token IDs. You send a subscribe message with a list of tokens, receive book updates as they change. This is real-time; latency is typically 50-150ms from price change to push.
import json
import websockets
from typing import Callable, List
class PolymarketBookTracker:
"""Subscribes to Polymarket CLOB book updates for a set of tokens."""
WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"
def __init__(self, tokens: List[str]):
self.tokens = tokens
self.book: dict = {} # token_id -> {"bid": float, "ask": float, "ts": float}
self._on_update: List[Callable] = []
def on_book_update(self, cb):
self._on_update.append(cb)
async def run(self):
while True:
try:
async with websockets.connect(self.WS_URL, ping_interval=20) as ws:
await ws.send(json.dumps({
"type": "MARKET",
"assets_ids": self.tokens,
}))
async for msg in ws:
self._handle_msg(msg)
except (websockets.ConnectionClosed, asyncio.TimeoutError) as e:
logger.warning(f"Polymarket WS dropped: {e} — reconnecting in 5s")
await asyncio.sleep(5)
def _handle_msg(self, msg: str):
try:
events = json.loads(msg)
except json.JSONDecodeError:
return
if not isinstance(events, list):
events = [events]
for ev in events:
if ev.get("event_type") != "book":
continue
token_id = ev.get("asset_id")
bids = ev.get("bids", [])
asks = ev.get("asks", [])
best_bid = max(float(b["price"]) for b in bids) if bids else 0.0
best_ask = min(float(a["price"]) for a in asks) if asks else 1.0
self.book[token_id] = {
"bid": best_bid,
"ask": best_ask,
"ts": time.time(),
}
for cb in self._on_update:
cb(token_id, self.book[token_id])
Both trackers call a fusion layer on changes. The fusion layer looks up the current state from both streams, runs inference, computes edge, and emits signals.
from typing import Optional
class SignalFuser:
def __init__(self, espn: ESPNTracker, book: PolymarketBookTracker,
model, token_to_game: dict, min_edge_c: float = 5.0):
self.espn = espn
self.book = book
self.model = model
self.token_to_game = token_to_game # token_id -> (game_id, side)
self.min_edge_c = min_edge_c
# Wire up change callbacks
espn.on_score_change(self._on_score_change)
book.on_book_update(self._on_book_update)
def _on_score_change(self, prev: Optional[GameState], new: GameState):
# For each token covering this game, re-evaluate
for token_id, (game_id, side) in self.token_to_game.items():
if game_id == new.game_id:
self._evaluate(token_id, new)
def _on_book_update(self, token_id: str, book_state: dict):
game_id, _side = self.token_to_game.get(token_id, (None, None))
if not game_id:
return
game_state = self.espn.games.get(game_id)
if game_state is None or not game_state.is_live:
return
self._evaluate(token_id, game_state)
def _evaluate(self, token_id: str, game_state: GameState):
book_state = self.book.book.get(token_id)
if not book_state:
return
ask_c = book_state["ask"] * 100
side = self.token_to_game[token_id][1]
fair_wp = self.model.predict_wp(game_state, side)
fair_c = fair_wp * 100
edge_c = fair_c - ask_c
if edge_c < self.min_edge_c:
return
self._emit(token_id, game_state, book_state, fair_c, edge_c)
def _emit(self, token_id, game_state, book_state, fair_c, edge_c):
signal = {
"ts": time.time(),
"game_id": game_state.game_id,
"token_id": token_id,
"fair_c": fair_c,
"ask_c": book_state["ask"] * 100,
"edge_c": edge_c,
"score": f"{game_state.home_score}-{game_state.away_score}",
}
logger.info(f"SIGNAL {signal}")
# Push to queue, webhook, Redis, etc.
Expose the merged state and signals via FastAPI so downstream consumers (trading bots, dashboards, clients) can subscribe:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
@app.get("/v1/state/{game_id}")
async def get_state(game_id: str):
g = espn_tracker.games.get(game_id)
if not g:
return {"error": "not found"}, 404
return {
"game_id": g.game_id,
"home": g.home, "away": g.away,
"home_score": g.home_score, "away_score": g.away_score,
"is_live": g.is_live,
"last_update_ts": g.last_update_ts,
}
@app.get("/v1/signals/stream")
async def stream_signals():
"""Server-sent events: push signals to subscribers as they occur."""
async def gen():
queue = asyncio.Queue()
signal_fuser.subscribe(queue.put_nowait)
try:
while True:
sig = await queue.get()
yield f"data: {json.dumps(sig)}\n\n"
finally:
signal_fuser.unsubscribe(queue.put_nowait)
return StreamingResponse(gen(), media_type="text/event-stream")
Every real-time pipeline has the same three failure modes: upstream goes silent, downstream falls behind, state desyncs between streams. Each needs a deliberate response.
| Failure mode | Mitigation |
|---|---|
| ESPN returns 5xx or times out | Retry with exponential backoff; continue with stale state tagged with age |
| Polymarket WS drops | Reconnect, re-subscribe, fall back to REST polling for critical tokens |
| Score-change arrives but token book is stale > 30s | Skip evaluation; log as stale-book warning |
| Model inference throws | Log, increment counter, keep pipeline alive — never crash on model errors |
| Signal queue backpressure | Drop oldest signals rather than block producers |
The single most important resilience pattern: tag every state snapshot with its timestamp. Consumers can then decide for themselves whether the data is fresh enough to act on.
def is_book_fresh(book_state: dict, max_age_s: float = 5.0) -> bool:
return (time.time() - book_state.get("ts", 0)) < max_age_s
def is_game_fresh(game_state: GameState, max_age_s: float = 30.0) -> bool:
return (time.time() - game_state.last_update_ts) < max_age_s
If you're trading on these signals, latency is money. Instrument the pipeline end-to-end:
import time
def evaluate_with_latency(...):
t0 = time.perf_counter()
# inference
t1 = time.perf_counter()
# edge compute
t2 = time.perf_counter()
# signal emit
t3 = time.perf_counter()
logger.info(
f"LAT | inference={(t1-t0)*1000:.1f}ms "
f"edge={(t2-t1)*1000:.1f}ms "
f"emit={(t3-t2)*1000:.1f}ms "
f"total={(t3-t0)*1000:.1f}ms"
)
Target latencies in a healthy system:
The real limit: ESPN poll interval. You can't react to a score before the next ESPN poll. At 5s intervals you have up to 5s of "score already changed but we don't know yet." Faster polling helps but hits rate limits. Acceptable tradeoff depends on your trading horizon.
ZenHodl runs this exact pipeline for 11 sports in production. You get the fused signal (ESPN state + Polymarket book + calibrated WP) via a single REST or WebSocket API — no infrastructure to operate.
See the API docs