Files
popa_minecraft_launcher_api/auth/app/auth.py

350 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import json
from fastapi import HTTPException, UploadFile
from fastapi.responses import JSONResponse
from .models import UserLogin, UserInDB, Session, UserCreate, SkinUpdate, CapeUpdate
from .utils import (
verify_password,
get_password_hash,
create_access_token,
decode_token,
)
from .database import users_collection, sessions_collection
import uuid
from datetime import datetime, timedelta
from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.asymmetric import padding
from dotenv import load_dotenv
import os
from pathlib import Path
env_path = Path(__file__).parent.parent / ".env"
load_dotenv(dotenv_path=env_path)
FILES_URL = os.getenv("FILES_URL")
class AuthService:
async def register(self, user: UserCreate):
# Проверяем, существует ли пользователь
if await users_collection.find_one({"username": user.username}):
raise HTTPException(status_code=400, detail="Username already taken")
# Хешируем пароль
hashed_password = get_password_hash(user.password)
# Создаём UUID для Minecraft
user_uuid = str(uuid.uuid4())
# Сохраняем в MongoDB
new_user = UserInDB(
username=user.username,
email=user.email,
hashed_password=hashed_password,
uuid=user_uuid,
)
await users_collection.insert_one(new_user.dict())
return {"status": "success", "uuid": user_uuid}
async def login(self, credentials: UserLogin):
# Ищем пользователя
user = await users_collection.find_one({"username": credentials.username})
if not user or not verify_password(credentials.password, user["hashed_password"]):
raise HTTPException(status_code=401, detail="Invalid credentials")
# Генерируем токены
access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]})
client_token = str(uuid.uuid4())
# Сохраняем сессию
session = Session(
access_token=access_token,
client_token=client_token,
user_uuid=user["uuid"],
expires_at=datetime.utcnow() + timedelta(minutes=1440),
)
await sessions_collection.insert_one(session.dict())
return {
"accessToken": access_token,
"clientToken": client_token,
"selectedProfile": {
"id": user["uuid"],
"name": user["username"],
},
}
async def validate(self, access_token: str, client_token: str):
print(f"Searching for access_toke and client_token: '{access_token}', '{client_token}")
session = await sessions_collection.find_one({
"access_token": access_token,
"client_token": client_token,
})
print("Session from DB:", session)
if not session or datetime.utcnow() > session["expires_at"]:
return False
return True
async def refresh(self, access_token: str, client_token: str):
if not await self.validate(access_token, client_token):
return None
# Обновляем токен
new_access_token = create_access_token({"sub": "user", "uuid": "user_uuid"})
await sessions_collection.update_one(
{"access_token": access_token},
{"$set": {"access_token": new_access_token}},
)
return {"accessToken": new_access_token, "clientToken": client_token}
async def get_minecraft_profile(self, uuid: str):
# Преобразуем UUID без дефисов в формат с дефисами (если нужно)
if '-' not in uuid:
formatted_uuid = f"{uuid[:8]}-{uuid[8:12]}-{uuid[12:16]}-{uuid[16:20]}-{uuid[20:]}"
else:
formatted_uuid = uuid
user = await users_collection.find_one({"uuid": formatted_uuid}) # Ищем по UUID с дефисами
if not user:
raise HTTPException(status_code=404, detail="User not found")
textures = {
"timestamp": int(datetime.now().timestamp() * 1000),
"profileId": user["uuid"], # UUID с дефисами
"profileName": user["username"],
"textures": {}
}
if user.get("skin_url"):
textures["textures"]["SKIN"] = {
"url": user["skin_url"],
"metadata": {"model": user.get("skin_model", "classic")}
}
if user.get("cloak_url"):
textures["textures"]["CAPE"] = {"url": user["cloak_url"]}
textures_json = json.dumps(textures).encode()
base64_textures = base64.b64encode(textures_json).decode()
# Подписываем текстуры
with open("private_key.pem", "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None
)
signature = private_key.sign(
textures_json,
padding.PKCS1v15(),
hashes.SHA1()
)
return JSONResponse({
"id": user["uuid"].replace("-", ""), # Уберите дефисы
"name": user["username"],
"properties": [{
"name": "textures",
"value": base64_textures,
"signature": base64.b64encode(signature).decode()
}]
})
async def join_server(self, request_data: dict):
access_token = request_data.get("accessToken")
selected_profile = request_data.get("selectedProfile") # UUID без дефисов
server_id = request_data.get("serverId")
if not all([access_token, selected_profile, server_id]):
raise HTTPException(status_code=400, detail="Missing required parameters")
decoded_token = decode_token(access_token)
if not decoded_token:
raise HTTPException(status_code=401, detail="Invalid access token")
token_uuid = decoded_token.get("uuid", "").replace("-", "")
if token_uuid != selected_profile:
raise HTTPException(status_code=403, detail="Token doesn't match selected profile")
# Сохраняем server_id в сессию
await sessions_collection.update_one(
{"user_uuid": decoded_token["uuid"]}, # UUID с дефисами
{"$set": {"server_id": server_id}},
upsert=True
)
return True
async def has_joined(self, username: str, server_id: str):
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Ищем сессию с этим server_id
session = await sessions_collection.find_one({
"user_uuid": user["uuid"], # UUID с дефисами
"server_id": server_id
})
if not session:
raise HTTPException(status_code=403, detail="Not joined this server")
textures = {}
if user.get("skin_url"):
textures["SKIN"] = {"url": user["skin_url"]}
if user.get("cloak_url"):
textures["CAPE"] = {"url": user["cloak_url"]}
textures_value = base64.b64encode(json.dumps({
"timestamp": int(datetime.now().timestamp()),
"profileId": user["uuid"].replace("-", ""), # UUID без дефисов
"profileName": username,
"textures": textures
}).encode()).decode()
return {
"id": user["uuid"].replace("-", ""), # UUID без дефисов
"name": username,
"properties": [{
"name": "textures",
"value": textures_value
}] if textures else []
}
async def set_skin(self, username: str, skin_file: UploadFile, skin_model: str = "classic"):
"""Установка или замена скина через загрузку файла"""
# Проверяем тип файла
if not skin_file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
# Проверяем размер файла (максимум 2MB)
max_size = 2 * 1024 * 1024 # 2MB
contents = await skin_file.read()
if len(contents) > max_size:
raise HTTPException(status_code=400, detail="File too large (max 2MB)")
# Удаляем старый скин, если есть
user = await users_collection.find_one({"username": username})
if user and user.get("skin_url"):
from urllib.parse import urlparse
import os
old_url = user["skin_url"]
# Получаем имя файла из url
old_filename = os.path.basename(urlparse(old_url).path)
old_path = os.path.join("skins", old_filename)
if os.path.exists(old_path):
try:
os.remove(old_path)
except Exception:
pass
# Создаем папку для скинов, если ее нет
from pathlib import Path
skin_dir = Path("skins")
skin_dir.mkdir(exist_ok=True)
# Генерируем имя файла
skin_filename = f"{username}_{int(datetime.now().timestamp())}.png"
skin_path = skin_dir / skin_filename
# Сохраняем файл
with open(skin_path, "wb") as f:
f.write(contents)
# Обновляем запись пользователя
result = await users_collection.update_one(
{"username": username},
{"$set": {
"skin_url": f"{FILES_URL}/skins/{skin_filename}",
"skin_model": skin_model
}}
)
if result.modified_count == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"status": "success"}
async def remove_skin(self, username: str):
"""Удаление скина"""
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Удаляем файл скина, если он существует
if user.get("skin_url") and user["skin_url"].startswith("/skins/"):
import os
try:
os.remove(f"skins/{user['skin_url'].split('/')[-1]}")
except:
pass
result = await users_collection.update_one(
{"username": username},
{"$unset": {
"skin_url": "",
"skin_model": ""
}}
)
return {"status": "success"}
async def set_cape(self, username: str, cape_file: UploadFile):
"""Установка или замена плаща через загрузку файла (PNG или GIF)"""
if not cape_file.content_type.startswith('image/'):
raise HTTPException(status_code=400, detail="File must be an image")
# Определяем расширение
ext = None
if cape_file.content_type == "image/png":
ext = "png"
elif cape_file.content_type == "image/gif":
ext = "gif"
else:
raise HTTPException(status_code=400, detail="Only PNG and GIF capes are supported")
max_size = 2 * 1024 * 1024 # 2MB
contents = await cape_file.read()
if len(contents) > max_size:
raise HTTPException(status_code=400, detail="File too large (max 2MB)")
# Удаляем старый плащ, если есть
user = await users_collection.find_one({"username": username})
if user and user.get("cloak_url"):
from urllib.parse import urlparse
import os
old_url = user["cloak_url"]
old_filename = os.path.basename(urlparse(old_url).path)
old_path = os.path.join("capes", old_filename)
if os.path.exists(old_path):
try:
os.remove(old_path)
except Exception:
pass
from pathlib import Path
cape_dir = Path("capes")
cape_dir.mkdir(exist_ok=True)
cape_filename = f"{username}_{int(datetime.now().timestamp())}.{ext}"
cape_path = cape_dir / cape_filename
with open(cape_path, "wb") as f:
f.write(contents)
result = await users_collection.update_one(
{"username": username},
{"$set": {
"cloak_url": f"{FILES_URL}/capes/{cape_filename}"
}}
)
if result.modified_count == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"status": "success"}
async def remove_cape(self, username: str):
"""Удаление плаща"""
result = await users_collection.update_one(
{"username": username},
{"$unset": {"cloak_url": ""}}
)
if result.modified_count == 0:
raise HTTPException(status_code=404, detail="User not found")
return {"status": "success"}