Files
popa_minecraft_launcher_api/app/realtime/voice_hub.py
2026-01-02 16:34:43 +05:00

42 lines
1.3 KiB
Python

from typing import Dict
from fastapi import WebSocket
class VoiceHub:
def __init__(self):
# room_id -> username -> websocket
self.rooms: Dict[str, Dict[str, WebSocket]] = {}
async def connect(self, room_id: str, username: str, ws: WebSocket):
await ws.accept()
self.rooms.setdefault(room_id, {})[username] = ws
def disconnect(self, room_id: str, username: str):
room = self.rooms.get(room_id)
if not room:
return
room.pop(username, None)
if not room:
self.rooms.pop(room_id, None)
async def send_to(self, room_id: str, to_user: str, payload: dict):
ws = self.rooms.get(room_id, {}).get(to_user)
if ws:
await ws.send_json(payload)
async def broadcast(self, room_id: str, payload: dict):
for ws in self.rooms.get(room_id, {}).values():
try:
await ws.send_json(payload)
except:
pass
async def broadcast_except(self, room_id: str, except_user: str, payload: dict):
for user, ws in self.rooms.get(room_id, {}).items():
if user != except_user:
try:
await ws.send_json(payload)
except:
pass
voice_hub = VoiceHub()