Files
popa_minecraft_launcher_api/app/services/auth.py
2025-12-20 15:53:40 +05:00

458 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
from fastapi import HTTPException, UploadFile
from fastapi.responses import JSONResponse
from app.db.database import db
from app.models.user import UserLogin, UserInDB, UserCreate, Session
from app.utils.misc import (
verify_password,
get_password_hash,
create_access_token,
decode_token,
)
from ..db.database import users_collection, sessions_collection
import uuid
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from dotenv import load_dotenv
import os
from pathlib import Path
import secrets
env_path = Path(__file__).parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
FILES_URL = os.getenv("FILES_URL")
qr_logins_collection = db.qr_logins
class AuthService:
async def register(self, user: UserCreate):
# Проверяем, существует ли пользователь
if await users_collection.find_one({"username": user.username}):
raise HTTPException(status_code=400, detail="Username already taken")
# Хешируем пароль
hashed_password = get_password_hash(user.password)
# Создаём UUID для Minecraft
user_uuid = str(uuid.uuid4())
# Сохраняем в MongoDB
new_user = UserInDB(
username=user.username,
hashed_password=hashed_password,
uuid=user_uuid,
is_verified=False,
code=None,
code_expires_at=None,
expires_at=datetime.utcnow() + timedelta(hours=1),
is_admin=False
)
await users_collection.insert_one(new_user.dict())
return {"status": "success", "uuid": user_uuid}
async def generate_code(self, username: str):
if await users_collection.find_one({"username": username}):
if await users_collection.find_one({"username": username, "is_verified": True}):
raise HTTPException(400, "User already verified")
code = secrets.token_hex(3).upper()
await users_collection.update_one({"username": username}, {"$set": {"code": code, "code_expires_at": datetime.utcnow() + timedelta(minutes=10)}})
return {"status": "success", "code": code}
else:
raise HTTPException(404, "User not found")
async def verify_code(
self,
username: str,
code: str,
telegram_user_id: int | None = None,
telegram_username: str | None = None,
):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(404, "Пользователь не найден")
if user["is_verified"]:
raise HTTPException(400, "Пользователь уже верифицирован")
if user.get("telegram_user_id") and user["telegram_user_id"] != telegram_user_id:
raise HTTPException(403, "Этот аккаунт в Telegram уже привязан к другому пользователем.")
if user.get("code") != code:
raise HTTPException(400, "Инвалид код. Прям как ты")
if telegram_user_id is not None:
existing = await users_collection.find_one({
"telegram_user_id": telegram_user_id,
"username": {"$ne": username},
})
if existing:
raise HTTPException(
status_code=403,
detail="Этот аккаунт в Telegram уже привязан к другому пользователем.",
)
update = {
"is_verified": True,
"telegram_user_id": telegram_user_id,
"code": None,
}
if telegram_user_id is not None:
update["telegram_user_id"] = telegram_user_id
if telegram_username is not None:
update["telegram_username"] = telegram_username
await users_collection.update_one(
{"username": username},
{"$set": update, "$unset": {"expires_at": ""}},
)
return {"status": "success"}
async def get_verification_status(self, username: str):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(404, "User not found")
return {"is_verified": user["is_verified"]}
async def login(self, credentials: UserLogin):
# Ищем пользователя
user = await users_collection.find_one({"username": credentials.username})
if not user or not verify_password(credentials.password, user["hashed_password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user["is_verified"]:
raise HTTPException(status_code=401, detail="User not verified")
# Генерируем токены
access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]})
client_token = str(uuid.uuid4())
# Сохраняем сессию
session = Session(
access_token=access_token,
client_token=client_token,
user_uuid=user["uuid"],
expires_at=datetime.utcnow() + timedelta(minutes=1440),
)
await sessions_collection.insert_one(session.dict())
return {
"accessToken": access_token,
"clientToken": client_token,
"selectedProfile": {
"id": user["uuid"],
"name": user["username"],
},
}
async def validate(self, access_token: str, client_token: str):
print(f"Searching for access_toke and client_token: '{access_token}', '{client_token}")
session = await sessions_collection.find_one({
"access_token": access_token,
"client_token": client_token,
})
print("Session from DB:", session)
if not session or datetime.utcnow() > session["expires_at"]:
return False
return True
async def is_admin(self, access_token: str, client_token: str) -> bool:
session = await sessions_collection.find_one({
"access_token": access_token,
"client_token": client_token,
})
if not session:
return False
user = await users_collection.find_one({"uuid": session["user_uuid"]})
return user and user.get("is_admin") is True
async def get_current_user(self, access_token: str, client_token: str):
session = await sessions_collection.find_one({
"access_token": access_token,
"client_token": client_token,
})
if not session:
raise HTTPException(status_code=401, detail="Invalid session")
user = await users_collection.find_one({"uuid": session["user_uuid"]})
if not user:
raise HTTPException(status_code=404, detail="User not found")
return {
"username": user["username"],
"uuid": user["uuid"],
"is_admin": user.get("is_admin", False),
}
async def refresh(self, access_token: str, client_token: str):
if not await self.validate(access_token, client_token):
return None
# Обновляем токен
new_access_token = create_access_token({"sub": "user", "uuid": "user_uuid"})
await sessions_collection.update_one(
{"access_token": access_token},
{"$set": {"access_token": new_access_token}},
)
return {"accessToken": new_access_token, "clientToken": client_token}
async def get_minecraft_profile(self, uuid: str):
# Преобразуем UUID без дефисов в формат с дефисами (если нужно)
if '-' not in uuid:
formatted_uuid = f"{uuid[:8]}-{uuid[8:12]}-{uuid[12:16]}-{uuid[16:20]}-{uuid[20:]}"
else:
formatted_uuid = uuid
user = await users_collection.find_one({"uuid": formatted_uuid}) # Ищем по UUID с дефисами
if not user:
raise HTTPException(status_code=404, detail="User not found")
textures = {
"timestamp": int(datetime.now().timestamp() * 1000),
"profileId": user["uuid"].replace("-", ""),
"profileName": user["username"],
"textures": {}
}
if user.get("skin_url"):
textures["textures"]["SKIN"] = {
"url": user["skin_url"],
"metadata": {"model": user.get("skin_model", "classic")}
}
if user.get("cloak_url"):
textures["textures"]["CAPE"] = {"url": user["cloak_url"]}
textures_json = json.dumps(textures).encode()
base64_textures = base64.b64encode(textures_json).decode()
try:
# Подписываем текстуры
private_key_path = "app/keys/private_key.pem"
with open(private_key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None
)
signature = private_key.sign(
base64.b64encode(textures_json),
padding.PKCS1v15(),
hashes.SHA1()
)
signature_base64 = base64.b64encode(signature).decode()
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures,
"signature": signature_base64
}]
}
except Exception as e:
print(f"Error signing textures: {e}")
# В случае ошибки возвращаем текстуры без подписи
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures
}]
}
# # Подписываем текстуры
# with open("private_key.pem", "rb") as key_file:
# private_key = serialization.load_pem_private_key(
# key_file.read(),
# password=None
# )
# signature = private_key.sign(
# textures_json,
# padding.PKCS1v15(),
# hashes.SHA1()
# )
# return JSONResponse({
# "id": user["uuid"].replace("-", ""), # Уберите дефисы
# "name": user["username"],
# "properties": [{
# "name": "textures",
# "value": base64_textures,
# # "signature": base64.b64encode(signature).decode()
# }]
# })
async def join_server(self, request_data: dict):
access_token = request_data.get("accessToken")
selected_profile = request_data.get("selectedProfile") # UUID без дефисов
server_id = request_data.get("serverId")
if not all([access_token, selected_profile, server_id]):
raise HTTPException(status_code=400, detail="Missing required parameters")
decoded_token = decode_token(access_token)
if not decoded_token:
raise HTTPException(status_code=401, detail="Invalid access token")
token_uuid = decoded_token.get("uuid", "").replace("-", "")
if token_uuid != selected_profile:
raise HTTPException(status_code=403, detail="Token doesn't match selected profile")
# Сохраняем server_id в сессию
await sessions_collection.update_one(
{"user_uuid": decoded_token["uuid"]}, # UUID с дефисами
{"$set": {"server_id": server_id}},
upsert=True
)
return True
async def has_joined(self, username: str, server_id: str):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Ищем сессию с этим server_id
session = await sessions_collection.find_one({
"user_uuid": user["uuid"], # UUID с дефисами
"server_id": server_id
})
if not session:
raise HTTPException(status_code=403, detail="Not joined this server")
textures = {
"timestamp": int(datetime.now().timestamp() * 1000),
"profileId": user["uuid"].replace("-", ""),
"profileName": user["username"],
"textures": {}
}
if user.get("skin_url"):
textures["textures"]["SKIN"] = {
"url": user["skin_url"],
"metadata": {"model": user.get("skin_model", "classic")}
}
if user.get("cloak_url"):
textures["textures"]["CAPE"] = {"url": user["cloak_url"]}
textures_json = json.dumps(textures).encode()
base64_textures = base64.b64encode(textures_json).decode()
try:
# Подписываем текстуры
private_key_path = "app/keys/private_key.pem"
with open(private_key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None
)
signature = private_key.sign(
base64.b64encode(textures_json),
padding.PKCS1v15(),
hashes.SHA1()
)
signature_base64 = base64.b64encode(signature).decode()
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures,
"signature": signature_base64
}]
}
except Exception as e:
print(f"Error signing textures: {e}")
# В случае ошибки возвращаем текстуры без подписи
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures
}]
}
async def approve_qr_login(self, token: str, telegram_user_id: int):
qr = await qr_logins_collection.find_one({"token": token})
if not qr:
raise HTTPException(404, "QR token not found")
if qr["status"] != "pending":
raise HTTPException(400, "QR token already used or not pending")
if datetime.utcnow() > qr["expires_at"]:
await qr_logins_collection.update_one({"token": token}, {"$set": {"status": "expired"}})
raise HTTPException(400, "QR token expired")
# находим пользователя по telegram_user_id
user = await users_collection.find_one({"telegram_user_id": telegram_user_id})
if not user:
raise HTTPException(403, "Telegram аккаунт не привязан")
if not user.get("is_verified"):
raise HTTPException(403, "Пользователь не верифицирован")
await qr_logins_collection.update_one(
{"token": token},
{"$set": {"status": "approved", "approved_username": user["username"]}}
)
return {"status": "success"}
async def qr_status(self, token: str, device_id: str | None = None):
qr = await qr_logins_collection.find_one({"token": token})
if not qr:
raise HTTPException(404, "QR token not found")
if datetime.utcnow() > qr["expires_at"] and qr["status"] == "pending":
await qr_logins_collection.update_one({"token": token}, {"$set": {"status": "expired"}})
return {"status": "expired"}
# если хотите привязку к устройству:
if device_id and qr.get("device_id") and qr["device_id"] != device_id:
raise HTTPException(403, "Device mismatch")
if qr["status"] == "approved":
username = qr["approved_username"]
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(404, "User not found")
# генерим токены как в login()
access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]})
client_token = str(uuid.uuid4())
session = Session(
access_token=access_token,
client_token=client_token,
user_uuid=user["uuid"],
expires_at=datetime.utcnow() + timedelta(minutes=1440),
)
await sessions_collection.insert_one(session.dict())
# одноразовость
await qr_logins_collection.update_one(
{"token": token},
{"$set": {"status": "consumed"}}
)
return {
"status": "ok",
"accessToken": access_token,
"clientToken": client_token,
"selectedProfile": {"id": user["uuid"], "name": user["username"]},
}
return {"status": qr["status"]}