import base64 import json from fastapi import HTTPException, UploadFile from fastapi.responses import JSONResponse from app.db.database import db from app.models.user import UserLogin, UserInDB, UserCreate, Session from app.utils.misc import ( verify_password, get_password_hash, create_access_token, decode_token, ) from ..db.database import users_collection, sessions_collection import uuid from datetime import datetime, timedelta from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding from dotenv import load_dotenv import os from pathlib import Path import secrets env_path = Path(__file__).parent.parent / ".env" load_dotenv(dotenv_path=env_path) FILES_URL = os.getenv("FILES_URL") qr_logins_collection = db.qr_logins class AuthService: async def register(self, user: UserCreate): # Проверяем, существует ли пользователь if await users_collection.find_one({"username": user.username}): raise HTTPException(status_code=400, detail="Username already taken") # Хешируем пароль hashed_password = get_password_hash(user.password) # Создаём UUID для Minecraft user_uuid = str(uuid.uuid4()) # Сохраняем в MongoDB new_user = UserInDB( username=user.username, hashed_password=hashed_password, uuid=user_uuid, is_verified=False, code=None, code_expires_at=None, expires_at=datetime.utcnow() + timedelta(hours=1), is_admin=False ) await users_collection.insert_one(new_user.dict()) return {"status": "success", "uuid": user_uuid} async def generate_code(self, username: str): if await users_collection.find_one({"username": username}): if await users_collection.find_one({"username": username, "is_verified": True}): raise HTTPException(400, "User already verified") code = secrets.token_hex(3).upper() await users_collection.update_one({"username": username}, {"$set": {"code": code, "code_expires_at": datetime.utcnow() + timedelta(minutes=10)}}) return {"status": "success", "code": code} else: raise HTTPException(404, "User not found") async def verify_code( self, username: str, code: str, telegram_user_id: int | None = None, telegram_username: str | None = None, ): user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(404, "Пользователь не найден") if user["is_verified"]: raise HTTPException(400, "Пользователь уже верифицирован") if user.get("telegram_user_id") and user["telegram_user_id"] != telegram_user_id: raise HTTPException(403, "Этот аккаунт в Telegram уже привязан к другому пользователем.") if user.get("code") != code: raise HTTPException(400, "Инвалид код. Прям как ты") if telegram_user_id is not None: existing = await users_collection.find_one({ "telegram_user_id": telegram_user_id, "username": {"$ne": username}, }) if existing: raise HTTPException( status_code=403, detail="Этот аккаунт в Telegram уже привязан к другому пользователем.", ) update = { "is_verified": True, "telegram_user_id": telegram_user_id, "code": None, } if telegram_user_id is not None: update["telegram_user_id"] = telegram_user_id if telegram_username is not None: update["telegram_username"] = telegram_username await users_collection.update_one( {"username": username}, {"$set": update, "$unset": {"expires_at": ""}}, ) return {"status": "success"} async def get_verification_status(self, username: str): user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(404, "User not found") return {"is_verified": user["is_verified"]} async def login(self, credentials: UserLogin): # Ищем пользователя user = await users_collection.find_one({"username": credentials.username}) if not user or not verify_password(credentials.password, user["hashed_password"]): raise HTTPException(status_code=401, detail="Invalid credentials") if not user["is_verified"]: raise HTTPException(status_code=401, detail="User not verified") # Генерируем токены access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]}) client_token = str(uuid.uuid4()) await sessions_collection.delete_many({ "user_uuid": user["uuid"] }) # Сохраняем сессию session = Session( access_token=access_token, client_token=client_token, user_uuid=user["uuid"], expires_at=datetime.utcnow() + timedelta(minutes=1440), ) await sessions_collection.insert_one(session.dict()) return { "accessToken": access_token, "clientToken": client_token, "selectedProfile": { "id": user["uuid"], "name": user["username"], }, } async def validate(self, access_token: str, client_token: str): session = await sessions_collection.find_one({ "access_token": access_token, "client_token": client_token, }) if not session: return False if datetime.utcnow() > session["expires_at"]: # можно сразу чистить await sessions_collection.delete_one({"_id": session["_id"]}) return False return True async def is_admin(self, access_token: str, client_token: str) -> bool: session = await sessions_collection.find_one({ "access_token": access_token, "client_token": client_token, }) if not session: return False user = await users_collection.find_one({"uuid": session["user_uuid"]}) return user and user.get("is_admin") is True async def get_current_user(self, access_token: str, client_token: str): session = await sessions_collection.find_one({ "access_token": access_token, "client_token": client_token, }) if not session: raise HTTPException(status_code=401, detail="Invalid session") user = await users_collection.find_one({"uuid": session["user_uuid"]}) if not user: raise HTTPException(status_code=404, detail="User not found") return { "username": user["username"], "uuid": user["uuid"], "is_admin": user.get("is_admin", False), } async def refresh(self, access_token: str, client_token: str): session = await sessions_collection.find_one({ "access_token": access_token, "client_token": client_token, }) if not session: return None if datetime.utcnow() > session["expires_at"]: return None user = await users_collection.find_one({"uuid": session["user_uuid"]}) if not user: return None new_access_token = create_access_token({ "sub": user["username"], "uuid": user["uuid"], }) new_expires_at = datetime.utcnow() + timedelta(minutes=1440) await sessions_collection.update_one( {"_id": session["_id"]}, { "$set": { "access_token": new_access_token, "expires_at": new_expires_at, } }, ) return { "accessToken": new_access_token, "clientToken": client_token, "selectedProfile": { "id": user["uuid"], "name": user["username"], }, } async def get_minecraft_profile(self, uuid: str): # Преобразуем UUID без дефисов в формат с дефисами (если нужно) if '-' not in uuid: formatted_uuid = f"{uuid[:8]}-{uuid[8:12]}-{uuid[12:16]}-{uuid[16:20]}-{uuid[20:]}" else: formatted_uuid = uuid user = await users_collection.find_one({"uuid": formatted_uuid}) # Ищем по UUID с дефисами if not user: raise HTTPException(status_code=404, detail="User not found") textures = { "timestamp": int(datetime.now().timestamp() * 1000), "profileId": user["uuid"].replace("-", ""), "profileName": user["username"], "textures": {} } if user.get("skin_url"): textures["textures"]["SKIN"] = { "url": user["skin_url"], "metadata": {"model": user.get("skin_model", "classic")} } if user.get("cloak_url"): textures["textures"]["CAPE"] = {"url": user["cloak_url"]} textures_json = json.dumps(textures).encode() base64_textures = base64.b64encode(textures_json).decode() try: # Подписываем текстуры private_key_path = "app/keys/private_key.pem" with open(private_key_path, "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None ) signature = private_key.sign( base64.b64encode(textures_json), padding.PKCS1v15(), hashes.SHA1() ) signature_base64 = base64.b64encode(signature).decode() return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures, "signature": signature_base64 }] } except Exception as e: print(f"Error signing textures: {e}") # В случае ошибки возвращаем текстуры без подписи return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures }] } # # Подписываем текстуры # with open("private_key.pem", "rb") as key_file: # private_key = serialization.load_pem_private_key( # key_file.read(), # password=None # ) # signature = private_key.sign( # textures_json, # padding.PKCS1v15(), # hashes.SHA1() # ) # return JSONResponse({ # "id": user["uuid"].replace("-", ""), # Уберите дефисы # "name": user["username"], # "properties": [{ # "name": "textures", # "value": base64_textures, # # "signature": base64.b64encode(signature).decode() # }] # }) async def join_server(self, request_data: dict): access_token = request_data.get("accessToken") selected_profile = request_data.get("selectedProfile") server_id = request_data.get("serverId") if not all([access_token, selected_profile, server_id]): raise HTTPException(status_code=400, detail="Missing required parameters") session = await sessions_collection.find_one({ "access_token": access_token, "client_token": request_data.get("clientToken"), }) if not session or datetime.utcnow() > session["expires_at"]: raise HTTPException(status_code=401, detail="Invalid or expired session") decoded_token = decode_token(access_token) if not decoded_token: raise HTTPException(status_code=401, detail="Invalid access token") token_uuid = decoded_token.get("uuid", "").replace("-", "") if token_uuid != selected_profile: raise HTTPException(status_code=403, detail="Token doesn't match selected profile") await sessions_collection.update_one( {"_id": session["_id"]}, {"$set": {"server_id": server_id}}, ) return True async def has_joined(self, username: str, server_id: str): user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(status_code=404, detail="User not found") # Ищем сессию с этим server_id session = await sessions_collection.find_one({ "user_uuid": user["uuid"], "server_id": server_id, "expires_at": {"$gt": datetime.utcnow()}, }) if not session: raise HTTPException(status_code=403, detail="Not joined this server") textures = { "timestamp": int(datetime.now().timestamp() * 1000), "profileId": user["uuid"].replace("-", ""), "profileName": user["username"], "textures": {} } if user.get("skin_url"): textures["textures"]["SKIN"] = { "url": user["skin_url"], "metadata": {"model": user.get("skin_model", "classic")} } if user.get("cloak_url"): textures["textures"]["CAPE"] = {"url": user["cloak_url"]} textures_json = json.dumps(textures).encode() base64_textures = base64.b64encode(textures_json).decode() try: # Подписываем текстуры private_key_path = "app/keys/private_key.pem" with open(private_key_path, "rb") as key_file: private_key = serialization.load_pem_private_key( key_file.read(), password=None ) signature = private_key.sign( base64.b64encode(textures_json), padding.PKCS1v15(), hashes.SHA1() ) signature_base64 = base64.b64encode(signature).decode() return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures, "signature": signature_base64 }] } except Exception as e: print(f"Error signing textures: {e}") # В случае ошибки возвращаем текстуры без подписи return { "id": user["uuid"].replace("-", ""), "name": user["username"], "properties": [{ "name": "textures", "value": base64_textures }] } async def approve_qr_login(self, token: str, telegram_user_id: int): qr = await qr_logins_collection.find_one({"token": token}) if not qr: raise HTTPException(404, "QR token not found") if qr["status"] != "pending": raise HTTPException(400, "QR token already used or not pending") if datetime.utcnow() > qr["expires_at"]: await qr_logins_collection.update_one({"token": token}, {"$set": {"status": "expired"}}) raise HTTPException(400, "QR token expired") # находим пользователя по telegram_user_id user = await users_collection.find_one({"telegram_user_id": telegram_user_id}) if not user: raise HTTPException(403, "Telegram аккаунт не привязан") if not user.get("is_verified"): raise HTTPException(403, "Пользователь не верифицирован") await qr_logins_collection.update_one( {"token": token}, {"$set": {"status": "approved", "approved_username": user["username"]}} ) return {"status": "success"} async def qr_status(self, token: str, device_id: str | None = None): qr = await qr_logins_collection.find_one({"token": token}) if not qr: raise HTTPException(404, "QR token not found") if datetime.utcnow() > qr["expires_at"] and qr["status"] == "pending": await qr_logins_collection.update_one({"token": token}, {"$set": {"status": "expired"}}) return {"status": "expired"} # если хотите привязку к устройству: if device_id and qr.get("device_id") and qr["device_id"] != device_id: raise HTTPException(403, "Device mismatch") if qr["status"] == "approved": username = qr["approved_username"] user = await users_collection.find_one({"username": username}) if not user: raise HTTPException(404, "User not found") # генерим токены как в login() access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]}) client_token = str(uuid.uuid4()) session = Session( access_token=access_token, client_token=client_token, user_uuid=user["uuid"], expires_at=datetime.utcnow() + timedelta(minutes=1440), ) await sessions_collection.insert_one(session.dict()) # одноразовость await qr_logins_collection.update_one( {"token": token}, {"$set": {"status": "consumed"}} ) return { "status": "ok", "accessToken": access_token, "clientToken": client_token, "selectedProfile": {"id": user["uuid"], "name": user["username"]}, } return {"status": qr["status"]}