Files
popa_minecraft_launcher_api/app/services/auth.py
DIKER0K 8a57fdad7a
All checks were successful
Build and Deploy / deploy (push) Successful in 21s
add: route get verification status
2025-07-21 09:03:46 +05:00

331 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
from fastapi import HTTPException, UploadFile
from fastapi.responses import JSONResponse
from app.models.user import UserLogin, UserInDB, UserCreate, Session
from app.utils.misc import (
verify_password,
get_password_hash,
create_access_token,
decode_token,
)
from ..db.database import users_collection, sessions_collection
import uuid
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from dotenv import load_dotenv
import os
from pathlib import Path
import secrets
env_path = Path(__file__).parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
FILES_URL = os.getenv("FILES_URL")
class AuthService:
async def register(self, user: UserCreate):
# Проверяем, существует ли пользователь
if await users_collection.find_one({"username": user.username}):
raise HTTPException(status_code=400, detail="Username already taken")
# Хешируем пароль
hashed_password = get_password_hash(user.password)
# Создаём UUID для Minecraft
user_uuid = str(uuid.uuid4())
# Сохраняем в MongoDB
new_user = UserInDB(
username=user.username,
hashed_password=hashed_password,
uuid=user_uuid,
is_verified=False,
code=None,
code_expires_at=None
)
await users_collection.insert_one(new_user.dict())
return {"status": "success", "uuid": user_uuid}
async def generate_code(self, username: str):
if await users_collection.find_one({"username": username}):
if await users_collection.find_one({"username": username, "is_verified": True}):
raise HTTPException(400, "User already verified")
code = secrets.token_hex(3).upper()
await users_collection.update_one({"username": username}, {"$set": {"code": code, "code_expires_at": datetime.utcnow() + timedelta(minutes=10)}})
return {"status": "success", "code": code}
else:
raise HTTPException(404, "User not found")
async def verify_code(self, username: str, code: str, telegram_chat_id: int):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(404, "User not found")
if user["is_verified"]:
raise HTTPException(400, "User already verified")
# Проверяем код и привязку к Telegram
if user.get("telegram_chat_id") and user["telegram_chat_id"] != telegram_chat_id:
raise HTTPException(403, "This account is linked to another Telegram")
if user.get("code") != code:
raise HTTPException(400, "Invalid code")
# Обновляем chat_id при первом подтверждении
await users_collection.update_one(
{"username": username},
{"$set": {
"is_verified": True,
"telegram_chat_id": telegram_chat_id,
"code": None
}}
)
return {"status": "success"}
async def get_verification_status(self, username: str):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(404, "User not found")
return {"is_verified": user["is_verified"]}
async def login(self, credentials: UserLogin):
# Ищем пользователя
user = await users_collection.find_one({"username": credentials.username})
if not user or not verify_password(credentials.password, user["hashed_password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
if not user["is_verified"]:
raise HTTPException(status_code=401, detail="User not verified")
# Генерируем токены
access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]})
client_token = str(uuid.uuid4())
# Сохраняем сессию
session = Session(
access_token=access_token,
client_token=client_token,
user_uuid=user["uuid"],
expires_at=datetime.utcnow() + timedelta(minutes=1440),
)
await sessions_collection.insert_one(session.dict())
return {
"accessToken": access_token,
"clientToken": client_token,
"selectedProfile": {
"id": user["uuid"],
"name": user["username"],
},
}
async def validate(self, access_token: str, client_token: str):
print(f"Searching for access_toke and client_token: '{access_token}', '{client_token}")
session = await sessions_collection.find_one({
"access_token": access_token,
"client_token": client_token,
})
print("Session from DB:", session)
if not session or datetime.utcnow() > session["expires_at"]:
return False
return True
async def refresh(self, access_token: str, client_token: str):
if not await self.validate(access_token, client_token):
return None
# Обновляем токен
new_access_token = create_access_token({"sub": "user", "uuid": "user_uuid"})
await sessions_collection.update_one(
{"access_token": access_token},
{"$set": {"access_token": new_access_token}},
)
return {"accessToken": new_access_token, "clientToken": client_token}
async def get_minecraft_profile(self, uuid: str):
# Преобразуем UUID без дефисов в формат с дефисами (если нужно)
if '-' not in uuid:
formatted_uuid = f"{uuid[:8]}-{uuid[8:12]}-{uuid[12:16]}-{uuid[16:20]}-{uuid[20:]}"
else:
formatted_uuid = uuid
user = await users_collection.find_one({"uuid": formatted_uuid}) # Ищем по UUID с дефисами
if not user:
raise HTTPException(status_code=404, detail="User not found")
textures = {
"timestamp": int(datetime.now().timestamp() * 1000),
"profileId": user["uuid"].replace("-", ""),
"profileName": user["username"],
"textures": {}
}
if user.get("skin_url"):
textures["textures"]["SKIN"] = {
"url": user["skin_url"],
"metadata": {"model": user.get("skin_model", "classic")}
}
if user.get("cloak_url"):
textures["textures"]["CAPE"] = {"url": user["cloak_url"]}
textures_json = json.dumps(textures).encode()
base64_textures = base64.b64encode(textures_json).decode()
try:
# Подписываем текстуры
private_key_path = "app/keys/private_key.pem"
with open(private_key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None
)
signature = private_key.sign(
base64.b64encode(textures_json),
padding.PKCS1v15(),
hashes.SHA1()
)
signature_base64 = base64.b64encode(signature).decode()
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures,
"signature": signature_base64
}]
}
except Exception as e:
print(f"Error signing textures: {e}")
# В случае ошибки возвращаем текстуры без подписи
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures
}]
}
# # Подписываем текстуры
# with open("private_key.pem", "rb") as key_file:
# private_key = serialization.load_pem_private_key(
# key_file.read(),
# password=None
# )
# signature = private_key.sign(
# textures_json,
# padding.PKCS1v15(),
# hashes.SHA1()
# )
# return JSONResponse({
# "id": user["uuid"].replace("-", ""), # Уберите дефисы
# "name": user["username"],
# "properties": [{
# "name": "textures",
# "value": base64_textures,
# # "signature": base64.b64encode(signature).decode()
# }]
# })
async def join_server(self, request_data: dict):
access_token = request_data.get("accessToken")
selected_profile = request_data.get("selectedProfile") # UUID без дефисов
server_id = request_data.get("serverId")
if not all([access_token, selected_profile, server_id]):
raise HTTPException(status_code=400, detail="Missing required parameters")
decoded_token = decode_token(access_token)
if not decoded_token:
raise HTTPException(status_code=401, detail="Invalid access token")
token_uuid = decoded_token.get("uuid", "").replace("-", "")
if token_uuid != selected_profile:
raise HTTPException(status_code=403, detail="Token doesn't match selected profile")
# Сохраняем server_id в сессию
await sessions_collection.update_one(
{"user_uuid": decoded_token["uuid"]}, # UUID с дефисами
{"$set": {"server_id": server_id}},
upsert=True
)
return True
async def has_joined(self, username: str, server_id: str):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Ищем сессию с этим server_id
session = await sessions_collection.find_one({
"user_uuid": user["uuid"], # UUID с дефисами
"server_id": server_id
})
if not session:
raise HTTPException(status_code=403, detail="Not joined this server")
textures = {
"timestamp": int(datetime.now().timestamp() * 1000),
"profileId": user["uuid"].replace("-", ""),
"profileName": user["username"],
"textures": {}
}
if user.get("skin_url"):
textures["textures"]["SKIN"] = {
"url": user["skin_url"],
"metadata": {"model": user.get("skin_model", "classic")}
}
if user.get("cloak_url"):
textures["textures"]["CAPE"] = {"url": user["cloak_url"]}
textures_json = json.dumps(textures).encode()
base64_textures = base64.b64encode(textures_json).decode()
try:
# Подписываем текстуры
private_key_path = "app/keys/private_key.pem"
with open(private_key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None
)
signature = private_key.sign(
base64.b64encode(textures_json),
padding.PKCS1v15(),
hashes.SHA1()
)
signature_base64 = base64.b64encode(signature).decode()
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures,
"signature": signature_base64
}]
}
except Exception as e:
print(f"Error signing textures: {e}")
# В случае ошибки возвращаем текстуры без подписи
return {
"id": user["uuid"].replace("-", ""),
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures
}]
}