import base64 import json from fastapi import HTTPException, UploadFile from fastapi.responses import JSONResponse from app.models.user import UserLogin, UserInDB, UserCreate, Session from app.utils.misc import ( verify_password, get_password_hash, create_access_token, decode_token, ) from ..db.database import users_collection, sessions_collection import uuid from datetime import datetime, timedelta from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from dotenv import load_dotenv import os from pathlib import Path import secrets env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) FILES_URL = os.getenv("FILES_URL") class AuthService: async def register(self, user: UserCreate): # Проверяем, существует ли пользователь if await users_collection.find_one({"username": user.username}): raise HTTPException(status_code=400, detail="Username already taken") # Хешируем пароль hashed_password = get_password_hash(user.password) # Создаём UUID для Minecraft user_uuid = str(uuid.uuid4()) # Сохраняем в MongoDB new_user = UserInDB( username=user.username, hashed_password=hashed_password, uuid=user_uuid, is_verified=False, code=None, code_expires_at=None ) await users_collection.insert_one(new_user.dict()) return {"status": "success", "uuid": user_uuid} async def generate_code(self, username: str): if await users_collection.find_one({"username": username}): if await users_collection.find_one({"username": username, "is_verified": True}): raise HTTPException(400, "User already verified") code = secrets.token_hex(3).upper() await users_collection.update_one({"username": username}, {"$set": {"code": code, "code_expires_at": datetime.utcnow() + timedelta(minutes=10)}}) return {"status": "success", "code": code} else: raise HTTPException(404, "User not found") async def verify_code(self, username: str, code: str, telegram_chat_id: int): user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(404, "User not found") if user["is_verified"]: raise HTTPException(400, "User already verified") # Проверяем код и привязку к Telegram if user.get("telegram_chat_id") and user["telegram_chat_id"] != telegram_chat_id: raise HTTPException(403, "This account is linked to another Telegram") if user.get("code") != code: raise HTTPException(400, "Invalid code") # Обновляем chat_id при первом подтверждении await users_collection.update_one( {"username": username}, {"$set": { "is_verified": True, "telegram_chat_id": telegram_chat_id, "code": None }} ) return {"status": "success"} async def get_verification_status(self, username: str): user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(404, "User not found") return {"is_verified": user["is_verified"]} async def login(self, credentials: UserLogin): # Ищем пользователя user = await users_collection.find_one({"username": credentials.username}) if not user or not verify_password(credentials.password, user["hashed_password"]): raise HTTPException(status_code=401, detail="Invalid credentials") if not user["is_verified"]: raise HTTPException(status_code=401, detail="User not verified") # Генерируем токены access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]}) client_token = str(uuid.uuid4()) # Сохраняем сессию session = Session( access_token=access_token, client_token=client_token, user_uuid=user["uuid"], expires_at=datetime.utcnow() + timedelta(minutes=1440), ) await sessions_collection.insert_one(session.dict()) return { "accessToken": access_token, "clientToken": client_token, "selectedProfile": { "id": user["uuid"], "name": user["username"], }, } async def validate(self, access_token: str, client_token: str): print(f"Searching for access_toke and client_token: '{access_token}', '{client_token}") session = await sessions_collection.find_one({ "access_token": access_token, "client_token": client_token, }) print("Session from DB:", session) if not session or datetime.utcnow() > session["expires_at"]: return False return True async def refresh(self, access_token: str, client_token: str): if not await self.validate(access_token, client_token): return None # Обновляем токен new_access_token = create_access_token({"sub": "user", "uuid": "user_uuid"}) await sessions_collection.update_one( {"access_token": access_token}, {"$set": {"access_token": new_access_token}}, ) return {"accessToken": new_access_token, "clientToken": client_token} async def get_minecraft_profile(self, uuid: str): # Преобразуем UUID без дефисов в формат с дефисами (если нужно) if '-' not in uuid: formatted_uuid = f"{uuid[:8]}-{uuid[8:12]}-{uuid[12:16]}-{uuid[16:20]}-{uuid[20:]}" else: formatted_uuid = uuid user = await users_collection.find_one({"uuid": formatted_uuid}) # Ищем по UUID с дефисами if not user: raise HTTPException(status_code=404, detail="User not found") textures = { "timestamp": int(datetime.now().timestamp() * 1000), "profileId": user["uuid"].replace("-", ""), "profileName": user["username"], "textures": {} } if user.get("skin_url"): textures["textures"]["SKIN"] = { "url": user["skin_url"], "metadata": {"model": user.get("skin_model", "classic")} } if user.get("cloak_url"): textures["textures"]["CAPE"] = {"url": user["cloak_url"]} textures_json = json.dumps(textures).encode() base64_textures = base64.b64encode(textures_json).decode() try: # Подписываем текстуры private_key_path = "app/keys/private_key.pem" with open(private_key_path, "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None ) signature = private_key.sign( base64.b64encode(textures_json), padding.PKCS1v15(), hashes.SHA1() ) signature_base64 = base64.b64encode(signature).decode() return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures, "signature": signature_base64 }] } except Exception as e: print(f"Error signing textures: {e}") # В случае ошибки возвращаем текстуры без подписи return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures }] } # # Подписываем текстуры # with open("private_key.pem", "rb") as key_file: # private_key = serialization.load_pem_private_key( # key_file.read(), # password=None # ) # signature = private_key.sign( # textures_json, # padding.PKCS1v15(), # hashes.SHA1() # ) # return JSONResponse({ # "id": user["uuid"].replace("-", ""), # Уберите дефисы # "name": user["username"], # "properties": [{ # "name": "textures", # "value": base64_textures, # # "signature": base64.b64encode(signature).decode() # }] # }) async def join_server(self, request_data: dict): access_token = request_data.get("accessToken") selected_profile = request_data.get("selectedProfile") # UUID без дефисов server_id = request_data.get("serverId") if not all([access_token, selected_profile, server_id]): raise HTTPException(status_code=400, detail="Missing required parameters") decoded_token = decode_token(access_token) if not decoded_token: raise HTTPException(status_code=401, detail="Invalid access token") token_uuid = decoded_token.get("uuid", "").replace("-", "") if token_uuid != selected_profile: raise HTTPException(status_code=403, detail="Token doesn't match selected profile") # Сохраняем server_id в сессию await sessions_collection.update_one( {"user_uuid": decoded_token["uuid"]}, # UUID с дефисами {"$set": {"server_id": server_id}}, upsert=True ) return True async def has_joined(self, username: str, server_id: str): user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(status_code=404, detail="User not found") # Ищем сессию с этим server_id session = await sessions_collection.find_one({ "user_uuid": user["uuid"], # UUID с дефисами "server_id": server_id }) if not session: raise HTTPException(status_code=403, detail="Not joined this server") textures = { "timestamp": int(datetime.now().timestamp() * 1000), "profileId": user["uuid"].replace("-", ""), "profileName": user["username"], "textures": {} } if user.get("skin_url"): textures["textures"]["SKIN"] = { "url": user["skin_url"], "metadata": {"model": user.get("skin_model", "classic")} } if user.get("cloak_url"): textures["textures"]["CAPE"] = {"url": user["cloak_url"]} textures_json = json.dumps(textures).encode() base64_textures = base64.b64encode(textures_json).decode() try: # Подписываем текстуры private_key_path = "app/keys/private_key.pem" with open(private_key_path, "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None ) signature = private_key.sign( base64.b64encode(textures_json), padding.PKCS1v15(), hashes.SHA1() ) signature_base64 = base64.b64encode(signature).decode() return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures, "signature": signature_base64 }] } except Exception as e: print(f"Error signing textures: {e}") # В случае ошибки возвращаем текстуры без подписи return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures }] }