From 51e903e249f245a23e09f6b95f59c97ae63ffd51 Mon Sep 17 00:00:00 2001 From: aurinex Date: Mon, 22 Dec 2025 11:11:26 +0500 Subject: [PATCH] add websocket market --- app/api/marketplace_ws.py | 19 +++++++++++++ app/realtime/marketplace_hub.py | 47 +++++++++++++++++++++++++++++++++ app/services/marketplace.py | 43 +++++++++++++++++++++++++++++- main.py | 2 ++ 4 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 app/api/marketplace_ws.py create mode 100644 app/realtime/marketplace_hub.py diff --git a/app/api/marketplace_ws.py b/app/api/marketplace_ws.py new file mode 100644 index 0000000..36c8b88 --- /dev/null +++ b/app/api/marketplace_ws.py @@ -0,0 +1,19 @@ +from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query +from app.realtime.marketplace_hub import marketplace_hub + +router = APIRouter(tags=["Marketplace WS"]) + +@router.websocket("/ws/marketplace") +async def marketplace_ws( + websocket: WebSocket, + server_ip: str = Query(...), +): + await marketplace_hub.connect(server_ip, websocket) + try: + # Можно принимать сообщения от клиента, но нам не обязательно. + while True: + await websocket.receive_text() + except WebSocketDisconnect: + pass + finally: + await marketplace_hub.disconnect(server_ip, websocket) diff --git a/app/realtime/marketplace_hub.py b/app/realtime/marketplace_hub.py new file mode 100644 index 0000000..ff06f01 --- /dev/null +++ b/app/realtime/marketplace_hub.py @@ -0,0 +1,47 @@ +# app/realtime/marketplace_hub.py +from __future__ import annotations + +import asyncio +from typing import Dict, Set, Any +from fastapi import WebSocket + +class MarketplaceHub: + def __init__(self) -> None: + self._rooms: Dict[str, Set[WebSocket]] = {} + self._lock = asyncio.Lock() + + async def connect(self, server_ip: str, ws: WebSocket) -> None: + await ws.accept() + async with self._lock: + self._rooms.setdefault(server_ip, set()).add(ws) + + async def disconnect(self, server_ip: str, ws: WebSocket) -> None: + async with self._lock: + room = self._rooms.get(server_ip) + if not room: + return + room.discard(ws) + if not room: + self._rooms.pop(server_ip, None) + + async def broadcast(self, server_ip: str, message: dict) -> None: + async with self._lock: + conns = list(self._rooms.get(server_ip, set())) + + if not conns: + return + + dead: list[WebSocket] = [] + for ws in conns: + try: + await ws.send_json(message) + except Exception: + dead.append(ws) + + if dead: + async with self._lock: + room = self._rooms.get(server_ip, set()) + for ws in dead: + room.discard(ws) + +marketplace_hub = MarketplaceHub() diff --git a/app/services/marketplace.py b/app/services/marketplace.py index 4507811..bf6359a 100644 --- a/app/services/marketplace.py +++ b/app/services/marketplace.py @@ -4,6 +4,7 @@ from fastapi import HTTPException from app.db.database import db from app.services.coins import CoinsService from app.services.server.command import CommandService +from app.realtime.marketplace_hub import marketplace_hub # Коллекция для хранения товаров на торговой площадке marketplace_collection = db.marketplace_items @@ -161,6 +162,15 @@ class MarketplaceService: } await marketplace_collection.insert_one(marketplace_item) + + await marketplace_hub.broadcast( + marketplace_item["server_ip"], + { + "event": "market:item_listed", + "server_ip": marketplace_item["server_ip"], + "item": _serialize_mongodb_doc(marketplace_item), + } + ) # Обновляем операцию await marketplace_operations.update_one( @@ -214,6 +224,16 @@ class MarketplaceService: # 7. Удаляем предмет с торговой площадки await marketplace_collection.delete_one({"id": item_id}) + + await marketplace_hub.broadcast( + item["server_ip"], + { + "event": "market:item_sold", + "server_ip": item["server_ip"], + "item_id": item_id, + "buyer": buyer_username, + } + ) return { "status": "pending", @@ -250,6 +270,16 @@ class MarketplaceService: # Удаляем предмет с торговой площадки await marketplace_collection.delete_one({"id": item_id}) + + await marketplace_hub.broadcast( + item["server_ip"], + { + "event": "market:item_cancelled", + "server_ip": item["server_ip"], + "item_id": item_id, + "seller": username, + } + ) return { "status": "pending", @@ -313,9 +343,20 @@ class MarketplaceService: {"id": item_id}, {"$set": {"price": new_price}} ) - + if result.modified_count == 0: raise HTTPException(status_code=500, detail="Не удалось обновить цену предмета") + + updated = await marketplace_collection.find_one({"id": item_id}) + if updated: + await marketplace_hub.broadcast( + updated["server_ip"], + { + "event": "market:item_price_updated", + "server_ip": updated["server_ip"], + "item": _serialize_mongodb_doc(updated), + } + ) return { "status": "success", diff --git a/main.py b/main.py index c61c542..e32f169 100644 --- a/main.py +++ b/main.py @@ -10,6 +10,7 @@ from app.core.config import CAPES_DIR, CAPES_STORE_DIR, SKINS_DIR from app.services.promo import PromoService from app.webhooks import telegram from app.db.database import users_collection +from app.api import marketplace_ws ###################### БОТ ###################### @@ -59,6 +60,7 @@ app.include_router(server.router) app.include_router(store.router) app.include_router(pranks.router) app.include_router(marketplace.router) +app.include_router(marketplace_ws.router) app.include_router(case.router) app.include_router(inventory.router) app.include_router(bonuses.router)