add websocket market

This commit is contained in:
aurinex
2025-12-22 11:11:26 +05:00
parent bfacff902f
commit 51e903e249
4 changed files with 110 additions and 1 deletions

19
app/api/marketplace_ws.py Normal file
View File

@ -0,0 +1,19 @@
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Query
from app.realtime.marketplace_hub import marketplace_hub
router = APIRouter(tags=["Marketplace WS"])
@router.websocket("/ws/marketplace")
async def marketplace_ws(
websocket: WebSocket,
server_ip: str = Query(...),
):
await marketplace_hub.connect(server_ip, websocket)
try:
# Можно принимать сообщения от клиента, но нам не обязательно.
while True:
await websocket.receive_text()
except WebSocketDisconnect:
pass
finally:
await marketplace_hub.disconnect(server_ip, websocket)

View File

@ -0,0 +1,47 @@
# app/realtime/marketplace_hub.py
from __future__ import annotations
import asyncio
from typing import Dict, Set, Any
from fastapi import WebSocket
class MarketplaceHub:
def __init__(self) -> None:
self._rooms: Dict[str, Set[WebSocket]] = {}
self._lock = asyncio.Lock()
async def connect(self, server_ip: str, ws: WebSocket) -> None:
await ws.accept()
async with self._lock:
self._rooms.setdefault(server_ip, set()).add(ws)
async def disconnect(self, server_ip: str, ws: WebSocket) -> None:
async with self._lock:
room = self._rooms.get(server_ip)
if not room:
return
room.discard(ws)
if not room:
self._rooms.pop(server_ip, None)
async def broadcast(self, server_ip: str, message: dict) -> None:
async with self._lock:
conns = list(self._rooms.get(server_ip, set()))
if not conns:
return
dead: list[WebSocket] = []
for ws in conns:
try:
await ws.send_json(message)
except Exception:
dead.append(ws)
if dead:
async with self._lock:
room = self._rooms.get(server_ip, set())
for ws in dead:
room.discard(ws)
marketplace_hub = MarketplaceHub()

View File

@ -4,6 +4,7 @@ from fastapi import HTTPException
from app.db.database import db
from app.services.coins import CoinsService
from app.services.server.command import CommandService
from app.realtime.marketplace_hub import marketplace_hub
# Коллекция для хранения товаров на торговой площадке
marketplace_collection = db.marketplace_items
@ -161,6 +162,15 @@ class MarketplaceService:
}
await marketplace_collection.insert_one(marketplace_item)
await marketplace_hub.broadcast(
marketplace_item["server_ip"],
{
"event": "market:item_listed",
"server_ip": marketplace_item["server_ip"],
"item": _serialize_mongodb_doc(marketplace_item),
}
)
# Обновляем операцию
await marketplace_operations.update_one(
@ -214,6 +224,16 @@ class MarketplaceService:
# 7. Удаляем предмет с торговой площадки
await marketplace_collection.delete_one({"id": item_id})
await marketplace_hub.broadcast(
item["server_ip"],
{
"event": "market:item_sold",
"server_ip": item["server_ip"],
"item_id": item_id,
"buyer": buyer_username,
}
)
return {
"status": "pending",
@ -250,6 +270,16 @@ class MarketplaceService:
# Удаляем предмет с торговой площадки
await marketplace_collection.delete_one({"id": item_id})
await marketplace_hub.broadcast(
item["server_ip"],
{
"event": "market:item_cancelled",
"server_ip": item["server_ip"],
"item_id": item_id,
"seller": username,
}
)
return {
"status": "pending",
@ -313,9 +343,20 @@ class MarketplaceService:
{"id": item_id},
{"$set": {"price": new_price}}
)
if result.modified_count == 0:
raise HTTPException(status_code=500, detail="Не удалось обновить цену предмета")
updated = await marketplace_collection.find_one({"id": item_id})
if updated:
await marketplace_hub.broadcast(
updated["server_ip"],
{
"event": "market:item_price_updated",
"server_ip": updated["server_ip"],
"item": _serialize_mongodb_doc(updated),
}
)
return {
"status": "success",