Files
popa_minecraft_launcher_api/auth/app/main.py

125 lines
3.8 KiB
Python

import base64
from datetime import datetime
import json
from fastapi import FastAPI, Depends, File, Form, HTTPException, Body, Request, Response, UploadFile
from fastapi.security import OAuth2PasswordBearer
from fastapi.staticfiles import StaticFiles
from .models import UserCreate, UserLogin, ValidateRequest, SkinUpdate, CapeUpdate
from .auth import AuthService
from .database import users_collection
from .utils import decode_token
import os
from pathlib import Path
from typing import Union
from fastapi.middleware.cors import CORSMiddleware
import logging
# logging.basicConfig(level=logging.DEBUG)
app = FastAPI()
auth_service = AuthService()
skin_dir = Path("skins")
skin_dir.mkdir(exist_ok=True)
app.mount("/skins", StaticFiles(directory="skins"), name="skins")
cape_dir = Path("capes")
cape_dir.mkdir(exist_ok=True)
app.mount("/capes", StaticFiles(directory="capes"), name="capes")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Разрешить все домены
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def api_root():
return {
"meta": {
"serverName": "Your Auth Server",
"implementationName": "FastAPI",
"implementationVersion": "1.0.0",
"links": {
"homepage": "https://your-server.com"
},
},
"skinDomains": ["147.78.65.214"],
"capeDomains": ["147.78.65.214"]
}
# Эндпоинты Mojang-like API
@app.post("/auth/register")
async def register(user: UserCreate):
return await auth_service.register(user)
@app.post("/auth/authenticate")
async def authenticate(credentials: UserLogin):
return await auth_service.login(credentials)
@app.post("/auth/validate")
async def validate_token(request: ValidateRequest):
is_valid = await auth_service.validate(request.accessToken, request.clientToken)
return {"valid": is_valid}
@app.post("/auth/refresh")
async def refresh_token(access_token: str, client_token: str):
result = await auth_service.refresh(access_token, client_token)
if not result:
raise HTTPException(status_code=401, detail="Invalid tokens")
return result
@app.get("/sessionserver/session/minecraft/profile/{uuid}")
async def get_minecraft_profile(uuid: str, unsigned: bool = False):
return await auth_service.get_minecraft_profile(uuid)
@app.post("/sessionserver/session/minecraft/join")
async def join_server(request_data: dict = Body(...)):
try:
await auth_service.join_server(request_data)
return Response(status_code=204)
except Exception as e:
print("Error in join_server:", str(e))
raise
@app.get("/sessionserver/session/minecraft/hasJoined")
async def has_joined(username: str, serverId: str):
return await auth_service.has_joined(username, serverId)
@app.post("/user/{username}/skin")
async def set_skin(
username: str,
skin_file: UploadFile = File(...),
skin_model: str = Form("classic")
):
return await auth_service.set_skin(username, skin_file, skin_model)
@app.delete("/user/{username}/skin")
async def remove_skin(username: str):
return await auth_service.remove_skin(username)
@app.post("/user/{username}/cape")
async def set_cape(
username: str,
cape_file: UploadFile = File(...)
):
return await auth_service.set_cape(username, cape_file)
@app.delete("/user/{username}/cape")
async def remove_cape(username: str):
return await auth_service.remove_cape(username)
@app.get("/debug/profile/{uuid}")
async def debug_profile(uuid: str):
profile = await auth_service.get_minecraft_profile(uuid)
textures = base64.b64decode(profile['properties'][0]['value']).decode()
return {
"profile": profile,
"textures_decoded": json.loads(textures)
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)