from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session import schemas import models import crud from database import get_db from password_utils import verify_password # Константы для JWT SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 # Инициализация OAuth2PasswordBearer для получения токена из запроса oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") def authenticate_admin(db: Session, username: str, password: str): """Аутентификация админа""" admin = crud.get_admin_by_username(db, username) if not admin: return False if not verify_password(password, admin.hashed_password): return False return admin def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): """Создаёт JWT токен""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): """Получает текущего админа из токена""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Некорректные учетные данные", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = schemas.TokenData(username=username) except JWTError: raise credentials_exception admin = crud.get_admin_by_username(db, username=token_data.username) if admin is None: raise credentials_exception return admin