test qr code
This commit is contained in:
@ -2,6 +2,7 @@ import base64
|
||||
import json
|
||||
from fastapi import HTTPException, UploadFile
|
||||
from fastapi.responses import JSONResponse
|
||||
from app import db
|
||||
from app.models.user import UserLogin, UserInDB, UserCreate, Session
|
||||
from app.utils.misc import (
|
||||
verify_password,
|
||||
@ -23,6 +24,8 @@ env_path = Path(__file__).parent.parent / ".env"
|
||||
load_dotenv(dotenv_path=env_path)
|
||||
FILES_URL = os.getenv("FILES_URL")
|
||||
|
||||
qr_logins_collection = db["qr_logins"]
|
||||
|
||||
class AuthService:
|
||||
async def register(self, user: UserCreate):
|
||||
# Проверяем, существует ли пользователь
|
||||
@ -380,3 +383,75 @@ class AuthService:
|
||||
"value": base64_textures
|
||||
}]
|
||||
}
|
||||
|
||||
async def approve_qr_login(self, token: str, telegram_user_id: int):
|
||||
qr = await qr_logins_collection.find_one({"token": token})
|
||||
if not qr:
|
||||
raise HTTPException(404, "QR token not found")
|
||||
|
||||
if qr["status"] != "pending":
|
||||
raise HTTPException(400, "QR token already used or not pending")
|
||||
|
||||
if datetime.utcnow() > qr["expires_at"]:
|
||||
await qr_logins_collection.update_one({"token": token}, {"$set": {"status": "expired"}})
|
||||
raise HTTPException(400, "QR token expired")
|
||||
|
||||
# находим пользователя по telegram_user_id
|
||||
user = await users_collection.find_one({"telegram_user_id": telegram_user_id})
|
||||
if not user:
|
||||
raise HTTPException(403, "Telegram аккаунт не привязан")
|
||||
|
||||
if not user.get("is_verified"):
|
||||
raise HTTPException(403, "Пользователь не верифицирован")
|
||||
|
||||
await qr_logins_collection.update_one(
|
||||
{"token": token},
|
||||
{"$set": {"status": "approved", "approved_username": user["username"]}}
|
||||
)
|
||||
return {"status": "success"}
|
||||
|
||||
async def qr_status(self, token: str, device_id: str | None = None):
|
||||
qr = await qr_logins_collection.find_one({"token": token})
|
||||
if not qr:
|
||||
raise HTTPException(404, "QR token not found")
|
||||
|
||||
if datetime.utcnow() > qr["expires_at"] and qr["status"] == "pending":
|
||||
await qr_logins_collection.update_one({"token": token}, {"$set": {"status": "expired"}})
|
||||
return {"status": "expired"}
|
||||
|
||||
# если хотите привязку к устройству:
|
||||
if device_id and qr.get("device_id") and qr["device_id"] != device_id:
|
||||
raise HTTPException(403, "Device mismatch")
|
||||
|
||||
if qr["status"] == "approved":
|
||||
username = qr["approved_username"]
|
||||
user = await users_collection.find_one({"username": username})
|
||||
if not user:
|
||||
raise HTTPException(404, "User not found")
|
||||
|
||||
# генерим токены как в login()
|
||||
access_token = create_access_token({"sub": user["username"], "uuid": user["uuid"]})
|
||||
client_token = str(uuid.uuid4())
|
||||
|
||||
session = Session(
|
||||
access_token=access_token,
|
||||
client_token=client_token,
|
||||
user_uuid=user["uuid"],
|
||||
expires_at=datetime.utcnow() + timedelta(minutes=1440),
|
||||
)
|
||||
await sessions_collection.insert_one(session.dict())
|
||||
|
||||
# одноразовость
|
||||
await qr_logins_collection.update_one(
|
||||
{"token": token},
|
||||
{"$set": {"status": "consumed"}}
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "ok",
|
||||
"accessToken": access_token,
|
||||
"clientToken": client_token,
|
||||
"selectedProfile": {"id": user["uuid"], "name": user["username"]},
|
||||
}
|
||||
|
||||
return {"status": qr["status"]}
|
||||
|
||||
Reference in New Issue
Block a user