# app/realtime/coins_hub.py from typing import Dict, Set from fastapi import WebSocket import asyncio class CoinsHub: def __init__(self): # username -> set of websockets self._connections: Dict[str, Set[WebSocket]] = {} self._lock = asyncio.Lock() async def connect(self, username: str, ws: WebSocket): await ws.accept() async with self._lock: self._connections.setdefault(username, set()).add(ws) async def disconnect(self, username: str, ws: WebSocket): async with self._lock: conns = self._connections.get(username) if not conns: return conns.discard(ws) if not conns: self._connections.pop(username, None) async def send_update(self, username: str, coins: int): async with self._lock: conns = list(self._connections.get(username, [])) if not conns: return payload = { "event": "coins:update", "coins": coins, } dead: list[WebSocket] = [] for ws in conns: try: await ws.send_json(payload) except Exception: dead.append(ws) if dead: async with self._lock: for ws in dead: self._connections.get(username, set()).discard(ws) coins_hub = CoinsHub()