diff --git a/README.md b/README.md index e624d9a..7b50263 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,24 @@ API сервер для хранения и управления данными - **schemas.py** - схемы Pydantic для валидации данных - **database.py** - настройка подключения к базе данных - **crud.py** - функции для CRUD операций +- **auth.py** - функции для JWT авторизации +- **create_admin.py** - скрипт для создания первого админа ## Запуск проекта 1. Установите зависимости: ```bash -pip install fastapi uvicorn sqlalchemy pydantic +pip install -r requirements.txt ``` -2. Запустите сервер: +2. Создайте первого администратора: + +```bash +python create_admin.py --username admin --password your_password +``` + +3. Запустите сервер: ```bash uvicorn main:app --reload @@ -26,8 +34,27 @@ uvicorn main:app --reload Сервер будет доступен по адресу http://localhost:8000 +## JWT Авторизация + +Для доступа к API необходима JWT авторизация. Вот как она работает: + +1. Получите токен доступа, отправив POST-запрос к `/token` с вашими учетными данными: + +```bash +curl -X POST http://localhost:8000/token -d "username=admin&password=your_password" -H "Content-Type: application/x-www-form-urlencoded" +``` + +2. Используйте полученный токен для доступа к защищенным эндпоинтам: + +```bash +curl -X GET http://localhost:8000/cars -H "Authorization: Bearer your_token_here" +``` + ## API эндпоинты +- **POST /token** - получить JWT токен (авторизация) +- **POST /admins** - создать нового администратора (только для админов) +- **GET /users/me** - получить информацию о текущем пользователе - **GET /cars** - получить список всех автомобилей - **GET /cars/{car_id}** - получить информацию о конкретном автомобиле - **POST /cars** - добавить новый автомобиль diff --git a/auth.py b/auth.py new file mode 100644 index 0000000..a2b484b --- /dev/null +++ b/auth.py @@ -0,0 +1,59 @@ +from datetime import datetime, timedelta +from typing import Optional +from jose import JWTError, jwt +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from sqlalchemy.orm import Session +import schemas +import models +import crud +from database import get_db +from password_utils import verify_password + +# Константы для JWT +SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +# Инициализация OAuth2PasswordBearer для получения токена из запроса +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +def authenticate_admin(db: Session, username: str, password: str): + """Аутентификация админа""" + admin = crud.get_admin_by_username(db, username) + if not admin: + return False + if not verify_password(password, admin.hashed_password): + return False + return admin + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + """Создаёт JWT токен""" + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): + """Получает текущего админа из токена""" + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Некорректные учетные данные", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = schemas.TokenData(username=username) + except JWTError: + raise credentials_exception + admin = crud.get_admin_by_username(db, username=token_data.username) + if admin is None: + raise credentials_exception + return admin diff --git a/create_admin.py b/create_admin.py new file mode 100644 index 0000000..c2801c0 --- /dev/null +++ b/create_admin.py @@ -0,0 +1,35 @@ +from password_utils import get_password_hash +from models import Admin, Base +from database import SessionLocal, engine +import argparse + +def create_initial_admin(username: str, password: str): + """ + Создает первого администратора в системе + """ + Base.metadata.create_all(bind=engine) + db = SessionLocal() + + # Проверяем, существует ли уже админ с таким именем + existing_admin = db.query(Admin).filter(Admin.username == username).first() + if existing_admin: + print(f"Администратор с именем {username} уже существует!") + return + + # Создаем нового админа + hashed_password = get_password_hash(password) + db_admin = Admin(username=username, hashed_password=hashed_password) + db.add(db_admin) + db.commit() + db.refresh(db_admin) + print(f"Администратор {username} успешно создан!") + + db.close() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Создание первого администратора") + parser.add_argument("--username", required=True, help="Имя пользователя администратора") + parser.add_argument("--password", required=True, help="Пароль администратора") + + args = parser.parse_args() + create_initial_admin(args.username, args.password) diff --git a/crud.py b/crud.py index 1ed911b..a078df1 100644 --- a/crud.py +++ b/crud.py @@ -1,7 +1,19 @@ from sqlalchemy.orm import Session -from models import Car, EngineType, HybridType, PowerRatio +from models import Car, EngineType, HybridType, PowerRatio, Admin import schemas from typing import List, Optional +from password_utils import get_password_hash + +def get_admin_by_username(db: Session, username: str) -> Optional[Admin]: + return db.query(Admin).filter(Admin.username == username).first() + +def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin: + hashed_password = get_password_hash(admin.password) + db_admin = Admin(username=admin.username, hashed_password=hashed_password) + db.add(db_admin) + db.commit() + db.refresh(db_admin) + return db_admin def get_car(db: Session, car_id: int) -> Optional[Car]: return db.query(Car).filter(Car.id == car_id).first() diff --git a/main.py b/main.py index f9888a0..3b0fa1a 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form from fastapi.staticfiles import StaticFiles +from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session import crud import models @@ -9,6 +10,8 @@ from typing import List, Optional import uvicorn from utils import save_image, delete_image import json +from datetime import timedelta +import auth # Создание таблиц в БД models.Base.metadata.create_all(bind=engine) @@ -18,6 +21,33 @@ app = FastAPI(title="AutoBro API", description="API для управления # Добавляем обработку статических файлов app.mount("/static", StaticFiles(directory="static"), name="static") +# Эндпоинты для авторизации +@app.post("/token", response_model=schemas.Token) +async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + admin = auth.authenticate_admin(db, form_data.username, form_data.password) + if not admin: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Неверное имя пользователя или пароль", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = auth.create_access_token( + data={"sub": admin.username}, expires_delta=access_token_expires + ) + return {"access_token": access_token, "token_type": "bearer"} + +@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED) +def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)): + # Проверка, что создать админа может только существующий админ + db_admin = crud.get_admin_by_username(db, username=admin.username) + if db_admin: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Пользователь с таким именем уже существует" + ) + return crud.create_admin(db=db, admin=admin) + @app.get("/") def read_root(): return {"message": "AutoBro API"} @@ -49,7 +79,8 @@ def get_car( async def create_car( car_data: str = Form(..., description="Данные автомобиля в JSON формате"), image: UploadFile = File(None, description="Изображение автомобиля"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) ): try: # Преобразуем строку JSON в словарь @@ -82,7 +113,8 @@ async def update_car( car_id: int = Path(..., description="ID автомобиля", gt=0), car_data: str = Form(None, description="Данные автомобиля в JSON формате"), image: UploadFile = File(None, description="Изображение автомобиля"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) ): # Проверяем существование автомобиля existing_car = crud.get_car(db, car_id=car_id) @@ -128,7 +160,8 @@ async def update_car( @app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_car( car_id: int = Path(..., description="ID автомобиля", gt=0), - db: Session = Depends(get_db) + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) ): # Получаем автомобиль перед удалением car = crud.get_car(db, car_id=car_id) @@ -146,5 +179,10 @@ def delete_car( crud.delete_car(db=db, car_id=car_id) return None +# Эндпоинт для проверки текущего пользователя (только для отладки) +@app.get("/users/me", response_model=schemas.Admin) +def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)): + return current_admin + if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=3000) diff --git a/models.py b/models.py index bf2d032..214ae00 100644 --- a/models.py +++ b/models.py @@ -17,6 +17,14 @@ class PowerRatio(enum.Enum): ELECTRIC_GREATER = "ЭД > ДВС" NA = "не применимо" +class Admin(Base): + __tablename__ = "admins" + + id = Column(Integer, primary_key=True, index=True) + username = Column(String, unique=True, index=True) + hashed_password = Column(String) + is_active = Column(Boolean, default=True) + class Car(Base): __tablename__ = "cars" diff --git a/password_utils.py b/password_utils.py new file mode 100644 index 0000000..db5900e --- /dev/null +++ b/password_utils.py @@ -0,0 +1,12 @@ +from passlib.context import CryptContext + +# Инициализация контекста для хеширования паролей +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +def verify_password(plain_password, hashed_password): + """Проверяет, соответствует ли пароль хешу""" + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password): + """Создаёт хеш пароля""" + return pwd_context.hash(password) diff --git a/requirements.txt b/requirements.txt index f939c44..5024e0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ uvicorn==0.30.0 sqlalchemy==2.0.41 pydantic==2.11.7 python-multipart==0.0.9 +python-jose[cryptography]==3.3.0 +passlib[bcrypt]==1.7.4 diff --git a/schemas.py b/schemas.py index 782cfc4..67cd7e0 100644 --- a/schemas.py +++ b/schemas.py @@ -18,6 +18,28 @@ class PowerRatioEnum(str, Enum): ELECTRIC_GREATER = "ЭД > ДВС" NA = "не применимо" +# Схемы для авторизации +class Token(BaseModel): + access_token: str + token_type: str + +class TokenData(BaseModel): + username: Optional[str] = None + +# Схемы для пользователя-админа +class AdminBase(BaseModel): + username: str + +class AdminCreate(AdminBase): + password: str + +class Admin(AdminBase): + id: int + is_active: bool + + class Config: + from_attributes = True + class CarBase(BaseModel): image: Optional[str] = None name: str