# app/realtime/marketplace_hub.py from __future__ import annotations import asyncio from typing import Dict, Set, Any from fastapi import WebSocket class MarketplaceHub: def __init__(self) -> None: self._rooms: Dict[str, Set[WebSocket]] = {} self._lock = asyncio.Lock() async def connect(self, server_ip: str, ws: WebSocket) -> None: await ws.accept() async with self._lock: self._rooms.setdefault(server_ip, set()).add(ws) async def disconnect(self, server_ip: str, ws: WebSocket) -> None: async with self._lock: room = self._rooms.get(server_ip) if not room: return room.discard(ws) if not room: self._rooms.pop(server_ip, None) async def broadcast(self, server_ip: str, message: dict) -> None: async with self._lock: conns = list(self._rooms.get(server_ip, set())) if not conns: return dead: list[WebSocket] = [] for ws in conns: try: await ws.send_json(message) except Exception: dead.append(ws) if dead: async with self._lock: room = self._rooms.get(server_ip, set()) for ws in dead: room.discard(ws) marketplace_hub = MarketplaceHub()