Real-Time Sports Data Pipelines: ESPN, Polymarket & WebSockets Architecture

Published April 2026 · 16 min read · By SsysTech Softwares

You want your model to see a score change, run inference, compare to Polymarket's current ask, and either signal or skip — all within one second. Building this requires merging two asynchronous streams (game state from ESPN; market prices from Polymarket), handling failures gracefully, and minimizing latency end-to-end.

This post walks through a production architecture in Python: what to poll, what to stream over WebSocket, how to fuse the streams, where to store state, and how to expose the merged signal via FastAPI. It's based on the pipeline that powers a production sports prediction system serving calibrated probabilities for 11 sports.

Contents

  1. Architecture overview
  2. ESPN scoreboard polling
  3. Polymarket WebSocket subscription
  4. Fusing streams into a signal
  5. Serving via FastAPI
  6. Failure modes & resilience
  7. Measuring end-to-end latency

1. Architecture overview

High-level flow:

ESPN API (HTTPS) Polymarket CLOB (WebSocket) | | v poll every 5s v push on book change ┌──────────────┐ ┌──────────────────┐ │ Game tracker │ │ Book tracker │ │ (state dict) │ │ (top-of-book dict)│ └──────┬───────┘ └──────┬───────────┘ │ score change │ price change └──────────────┬──────────────────────┘ v ┌──────────────┐ │ Signal fuser │ <-- ML model inference here └──────┬───────┘ │ edge > threshold v ┌──────────────┐ │ FastAPI / │ <-- trader or webhook subscribes │ webhook │ └──────────────┘

Two independent async tasks feed a fusion layer. The fusion layer runs model inference when either stream changes, computes edge, and emits signals downstream. FastAPI serves both client queries and internal webhooks.

2. ESPN scoreboard polling

ESPN's public site API returns a full scoreboard per sport at endpoints like site.api.espn.com/apis/site/v2/sports/basketball/nba/scoreboard. It's not WebSocket, so you poll. The trick: poll fast for in-progress games, slow otherwise, and detect score changes with minimal parsing overhead.

import asyncio
import aiohttp
from typing import Dict, Optional
from dataclasses import dataclass, field

@dataclass
class GameState:
    game_id: str
    home: str
    away: str
    home_score: int
    away_score: int
    period: int
    clock_remaining_s: float
    is_live: bool
    last_update_ts: float = 0.0


class ESPNTracker:
    """Polls ESPN scoreboard. Fires callbacks on score changes."""

    SPORT_URLS = {
        "NBA":    "basketball/nba",
        "NHL":    "hockey/nhl",
        "MLB":    "baseball/mlb",
        "NCAAMB": "basketball/mens-college-basketball",
    }

    def __init__(self, sports: list[str], fast_s: float = 5.0, slow_s: float = 15.0):
        self.sports = sports
        self.fast_s = fast_s
        self.slow_s = slow_s
        self.games: Dict[str, GameState] = {}
        self._on_change = []

    def on_score_change(self, cb):
        self._on_change.append(cb)

    async def run(self):
        async with aiohttp.ClientSession() as session:
            while True:
                live_count = 0
                for sport in self.sports:
                    try:
                        live_count += await self._poll_sport(session, sport)
                    except Exception as e:
                        logger.warning(f"ESPN poll failed for {sport}: {e}")
                interval = self.fast_s if live_count > 0 else self.slow_s
                await asyncio.sleep(interval)

    async def _poll_sport(self, session, sport: str) -> int:
        path = self.SPORT_URLS[sport]
        url = f"https://site.api.espn.com/apis/site/v2/sports/{path}/scoreboard"
        async with session.get(url, timeout=8) as resp:
            data = await resp.json()

        live = 0
        for ev in data.get("events", []):
            gs = self._parse_event(ev, sport)
            if gs is None:
                continue
            prev = self.games.get(gs.game_id)
            self.games[gs.game_id] = gs
            if gs.is_live:
                live += 1
                if prev is None or prev.home_score != gs.home_score or prev.away_score != gs.away_score:
                    for cb in self._on_change:
                        cb(prev, gs)
        return live

Rate limiting: ESPN doesn't publish rate limits, but sustained polling faster than every 2-3s per sport will eventually trigger soft-throttling (slower responses). Target 5s intervals when games are live; 15-30s between polls when quiet.

3. Polymarket WebSocket subscription

Polymarket's CLOB supports WebSocket subscriptions on specific token IDs. You send a subscribe message with a list of tokens, receive book updates as they change. This is real-time; latency is typically 50-150ms from price change to push.

import json
import websockets
from typing import Callable, List

class PolymarketBookTracker:
    """Subscribes to Polymarket CLOB book updates for a set of tokens."""

    WS_URL = "wss://ws-subscriptions-clob.polymarket.com/ws/market"

    def __init__(self, tokens: List[str]):
        self.tokens = tokens
        self.book: dict = {}   # token_id -> {"bid": float, "ask": float, "ts": float}
        self._on_update: List[Callable] = []

    def on_book_update(self, cb):
        self._on_update.append(cb)

    async def run(self):
        while True:
            try:
                async with websockets.connect(self.WS_URL, ping_interval=20) as ws:
                    await ws.send(json.dumps({
                        "type": "MARKET",
                        "assets_ids": self.tokens,
                    }))
                    async for msg in ws:
                        self._handle_msg(msg)
            except (websockets.ConnectionClosed, asyncio.TimeoutError) as e:
                logger.warning(f"Polymarket WS dropped: {e} — reconnecting in 5s")
                await asyncio.sleep(5)

    def _handle_msg(self, msg: str):
        try:
            events = json.loads(msg)
        except json.JSONDecodeError:
            return
        if not isinstance(events, list):
            events = [events]
        for ev in events:
            if ev.get("event_type") != "book":
                continue
            token_id = ev.get("asset_id")
            bids = ev.get("bids", [])
            asks = ev.get("asks", [])
            best_bid = max(float(b["price"]) for b in bids) if bids else 0.0
            best_ask = min(float(a["price"]) for a in asks) if asks else 1.0
            self.book[token_id] = {
                "bid": best_bid,
                "ask": best_ask,
                "ts": time.time(),
            }
            for cb in self._on_update:
                cb(token_id, self.book[token_id])

4. Fusing streams into a signal

Both trackers call a fusion layer on changes. The fusion layer looks up the current state from both streams, runs inference, computes edge, and emits signals.

from typing import Optional

class SignalFuser:
    def __init__(self, espn: ESPNTracker, book: PolymarketBookTracker,
                 model, token_to_game: dict, min_edge_c: float = 5.0):
        self.espn = espn
        self.book = book
        self.model = model
        self.token_to_game = token_to_game  # token_id -> (game_id, side)
        self.min_edge_c = min_edge_c

        # Wire up change callbacks
        espn.on_score_change(self._on_score_change)
        book.on_book_update(self._on_book_update)

    def _on_score_change(self, prev: Optional[GameState], new: GameState):
        # For each token covering this game, re-evaluate
        for token_id, (game_id, side) in self.token_to_game.items():
            if game_id == new.game_id:
                self._evaluate(token_id, new)

    def _on_book_update(self, token_id: str, book_state: dict):
        game_id, _side = self.token_to_game.get(token_id, (None, None))
        if not game_id:
            return
        game_state = self.espn.games.get(game_id)
        if game_state is None or not game_state.is_live:
            return
        self._evaluate(token_id, game_state)

    def _evaluate(self, token_id: str, game_state: GameState):
        book_state = self.book.book.get(token_id)
        if not book_state:
            return
        ask_c = book_state["ask"] * 100
        side = self.token_to_game[token_id][1]
        fair_wp = self.model.predict_wp(game_state, side)
        fair_c = fair_wp * 100
        edge_c = fair_c - ask_c
        if edge_c < self.min_edge_c:
            return
        self._emit(token_id, game_state, book_state, fair_c, edge_c)

    def _emit(self, token_id, game_state, book_state, fair_c, edge_c):
        signal = {
            "ts": time.time(),
            "game_id": game_state.game_id,
            "token_id": token_id,
            "fair_c": fair_c,
            "ask_c": book_state["ask"] * 100,
            "edge_c": edge_c,
            "score": f"{game_state.home_score}-{game_state.away_score}",
        }
        logger.info(f"SIGNAL {signal}")
        # Push to queue, webhook, Redis, etc.

5. Serving via FastAPI

Expose the merged state and signals via FastAPI so downstream consumers (trading bots, dashboards, clients) can subscribe:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

@app.get("/v1/state/{game_id}")
async def get_state(game_id: str):
    g = espn_tracker.games.get(game_id)
    if not g:
        return {"error": "not found"}, 404
    return {
        "game_id": g.game_id,
        "home": g.home, "away": g.away,
        "home_score": g.home_score, "away_score": g.away_score,
        "is_live": g.is_live,
        "last_update_ts": g.last_update_ts,
    }

@app.get("/v1/signals/stream")
async def stream_signals():
    """Server-sent events: push signals to subscribers as they occur."""
    async def gen():
        queue = asyncio.Queue()
        signal_fuser.subscribe(queue.put_nowait)
        try:
            while True:
                sig = await queue.get()
                yield f"data: {json.dumps(sig)}\n\n"
        finally:
            signal_fuser.unsubscribe(queue.put_nowait)
    return StreamingResponse(gen(), media_type="text/event-stream")

6. Failure modes & resilience

Every real-time pipeline has the same three failure modes: upstream goes silent, downstream falls behind, state desyncs between streams. Each needs a deliberate response.

Failure modeMitigation
ESPN returns 5xx or times outRetry with exponential backoff; continue with stale state tagged with age
Polymarket WS dropsReconnect, re-subscribe, fall back to REST polling for critical tokens
Score-change arrives but token book is stale > 30sSkip evaluation; log as stale-book warning
Model inference throwsLog, increment counter, keep pipeline alive — never crash on model errors
Signal queue backpressureDrop oldest signals rather than block producers

The single most important resilience pattern: tag every state snapshot with its timestamp. Consumers can then decide for themselves whether the data is fresh enough to act on.

def is_book_fresh(book_state: dict, max_age_s: float = 5.0) -> bool:
    return (time.time() - book_state.get("ts", 0)) < max_age_s

def is_game_fresh(game_state: GameState, max_age_s: float = 30.0) -> bool:
    return (time.time() - game_state.last_update_ts) < max_age_s

7. Measuring end-to-end latency

If you're trading on these signals, latency is money. Instrument the pipeline end-to-end:

import time

def evaluate_with_latency(...):
    t0 = time.perf_counter()
    # inference
    t1 = time.perf_counter()
    # edge compute
    t2 = time.perf_counter()
    # signal emit
    t3 = time.perf_counter()
    logger.info(
        f"LAT | inference={(t1-t0)*1000:.1f}ms "
        f"edge={(t2-t1)*1000:.1f}ms "
        f"emit={(t3-t2)*1000:.1f}ms "
        f"total={(t3-t0)*1000:.1f}ms"
    )

Target latencies in a healthy system:

The real limit: ESPN poll interval. You can't react to a score before the next ESPN poll. At 5s intervals you have up to 5s of "score already changed but we don't know yet." Faster polling helps but hits rate limits. Acceptable tradeoff depends on your trading horizon.

Don't build this. Subscribe to the merged feed.

ZenHodl runs this exact pipeline for 11 sports in production. You get the fused signal (ESPN state + Polymarket book + calibrated WP) via a single REST or WebSocket API — no infrastructure to operate.

See the API docs

Further reading: Live Model Recalibration · Backtesting Pitfalls · How to Build a Polymarket Trading Bot in Python