diff --git a/README.md b/README.md index e624d9a..7b50263 100644 --- a/README.md +++ b/README.md @@ -9,16 +9,24 @@ API сервер для хранения и управления данными - **schemas.py** - схемы Pydantic для валидации данных - **database.py** - настройка подключения к базе данных - **crud.py** - функции для CRUD операций +- **auth.py** - функции для JWT авторизации +- **create_admin.py** - скрипт для создания первого админа ## Запуск проекта 1. Установите зависимости: ```bash -pip install fastapi uvicorn sqlalchemy pydantic +pip install -r requirements.txt ``` -2. Запустите сервер: +2. Создайте первого администратора: + +```bash +python create_admin.py --username admin --password your_password +``` + +3. Запустите сервер: ```bash uvicorn main:app --reload @@ -26,8 +34,27 @@ uvicorn main:app --reload Сервер будет доступен по адресу http://localhost:8000 +## JWT Авторизация + +Для доступа к API необходима JWT авторизация. Вот как она работает: + +1. Получите токен доступа, отправив POST-запрос к `/token` с вашими учетными данными: + +```bash +curl -X POST http://localhost:8000/token -d "username=admin&password=your_password" -H "Content-Type: application/x-www-form-urlencoded" +``` + +2. Используйте полученный токен для доступа к защищенным эндпоинтам: + +```bash +curl -X GET http://localhost:8000/cars -H "Authorization: Bearer your_token_here" +``` + ## API эндпоинты +- **POST /token** - получить JWT токен (авторизация) +- **POST /admins** - создать нового администратора (только для админов) +- **GET /users/me** - получить информацию о текущем пользователе - **GET /cars** - получить список всех автомобилей - **GET /cars/{car_id}** - получить информацию о конкретном автомобиле - **POST /cars** - добавить новый автомобиль diff --git a/auth.py b/auth.py new file mode 100644 index 0000000..017767e --- /dev/null +++ b/auth.py @@ -0,0 +1,68 @@ +from datetime import datetime, timedelta +from typing import Optional +from jose import JWTError, jwt +from fastapi import Depends, HTTPException, status +from fastapi.security import OAuth2PasswordBearer +from sqlalchemy.orm import Session +import schemas +import models +import crud +from database import get_db +from password_utils import verify_password + +# Константы для JWT +SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get +ALGORITHM = "HS256" +ACCESS_TOKEN_EXPIRE_MINUTES = 30 + +# Инициализация OAuth2PasswordBearer для получения токена из запроса +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + +def authenticate_admin(db: Session, username: str, password: str): + """Аутентификация админа""" + admin = crud.get_admin_by_username(db, username) + if not admin: + return False + if not verify_password(password, admin.hashed_password): + return False + return admin + +def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): + """Создаёт JWT токен""" + to_encode = data.copy() + if expires_delta: + expire = datetime.utcnow() + expires_delta + else: + expire = datetime.utcnow() + timedelta(minutes=15) + to_encode.update({"exp": expire}) + encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + return encoded_jwt + +async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)): + """Получает текущего админа из токена""" + credentials_exception = HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Некорректные учетные данные", + headers={"WWW-Authenticate": "Bearer"}, + ) + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + if username is None: + raise credentials_exception + token_data = schemas.TokenData(username=username) + except JWTError: + raise credentials_exception + admin = crud.get_admin_by_username(db, username=token_data.username) + if admin is None: + raise credentials_exception + return admin + +async def get_current_super_admin(current_admin: models.Admin = Depends(get_current_admin)): + """Проверяет, является ли текущий админ суперадмином""" + if not current_admin.is_super_admin: + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Недостаточно прав. Эта операция доступна только главному администратору." + ) + return current_admin diff --git a/create_admin.py b/create_admin.py new file mode 100644 index 0000000..5ca486d --- /dev/null +++ b/create_admin.py @@ -0,0 +1,41 @@ +from password_utils import get_password_hash +from models import Admin, Base +from database import SessionLocal, engine +import argparse + +def create_initial_admin(username: str, password: str, super_admin: bool = True): + """ + Создает первого администратора в системе (по умолчанию как главного админа) + """ + Base.metadata.create_all(bind=engine) + db = SessionLocal() + + # Проверяем, существует ли уже админ с таким именем + existing_admin = db.query(Admin).filter(Admin.username == username).first() + if existing_admin: + print(f"Администратор с именем {username} уже существует!") + return + + # Создаем нового админа + hashed_password = get_password_hash(password) + db_admin = Admin( + username=username, + hashed_password=hashed_password, + is_super_admin=super_admin + ) + db.add(db_admin) + db.commit() + db.refresh(db_admin) + admin_type = "главный администратор" if super_admin else "администратор" + print(f"{admin_type.capitalize()} {username} успешно создан!") + + db.close() + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Создание администратора") + parser.add_argument("--username", required=True, help="Имя пользователя администратора") + parser.add_argument("--password", required=True, help="Пароль администратора") + parser.add_argument("--super", action="store_true", help="Создать как главного администратора") + + args = parser.parse_args() + create_initial_admin(args.username, args.password, args.super) diff --git a/crud.py b/crud.py index 1ed911b..6a64cb8 100644 --- a/crud.py +++ b/crud.py @@ -1,7 +1,23 @@ from sqlalchemy.orm import Session -from models import Car, EngineType, HybridType, PowerRatio +from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin import schemas from typing import List, Optional +from password_utils import get_password_hash + +def get_admin_by_username(db: Session, username: str) -> Optional[Admin]: + return db.query(Admin).filter(Admin.username == username).first() + +def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin: + hashed_password = get_password_hash(admin.password) + db_admin = Admin( + username=admin.username, + hashed_password=hashed_password, + is_super_admin=admin.is_super_admin + ) + db.add(db_admin) + db.commit() + db.refresh(db_admin) + return db_admin def get_car(db: Session, car_id: int) -> Optional[Car]: return db.query(Car).filter(Car.id == car_id).first() @@ -72,3 +88,50 @@ def delete_car(db: Session, car_id: int) -> bool: db.delete(db_car) db.commit() return True + +# Персонал --------- + +def get_personal(db: Session, personal_id: int) -> Optional[Personal]: + return db.query(Personal).filter(Personal.id == personal_id).first() + +def get_all_personal(db: Session, skip: int = 0, limit: int = 100) -> List[Personal]: + return db.query(Personal).offset(skip).limit(limit).all() + +def get_personal_count(db: Session) -> int: + return db.query(Personal).count() + +def create_personal(db: Session, personal: schemas.PersonalCreate) -> Personal: + db_personal = Personal( + name=personal.name, + surname=personal.surname, + role=personal.role, + photo=personal.photo + ) + db.add(db_personal) + db.commit() + db.refresh(db_personal) + return db_personal + +def update_personal(db: Session, personal_id: int, personal_update: schemas.PersonalUpdate) -> Optional[Personal]: + db_personal = get_personal(db, personal_id) + if not db_personal: + return None + + update_data = personal_update.model_dump(exclude_unset=True) + + # Обновление полей + for key, value in update_data.items(): + setattr(db_personal, key, value) + + db.commit() + db.refresh(db_personal) + return db_personal + +def delete_personal(db: Session, personal_id: int) -> bool: + db_personal = get_personal(db, personal_id) + if not db_personal: + return False + + db.delete(db_personal) + db.commit() + return True diff --git a/main.py b/main.py index 9d16709..89a00b8 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form from fastapi.staticfiles import StaticFiles +from fastapi.security import OAuth2PasswordRequestForm from sqlalchemy.orm import Session import crud import models @@ -9,6 +10,8 @@ from typing import List, Optional import uvicorn from utils import save_image, delete_image import json +from datetime import timedelta +import auth from fastapi.middleware.cors import CORSMiddleware # Создание таблиц в БД @@ -16,18 +19,74 @@ models.Base.metadata.create_all(bind=engine) app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей") -# Настройка CORS app.add_middleware( CORSMiddleware, - allow_origins=["*"], # В продакшене замените на конкретные домены + allow_origins=["*"], allow_credentials=True, allow_methods=["*"], - allow_headers=["*"], + allow_headers=["*",] ) # Добавляем обработку статических файлов app.mount("/static", StaticFiles(directory="static"), name="static") +# Эндпоинты для авторизации +@app.post("/token", response_model=schemas.Token) +async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + admin = auth.authenticate_admin(db, form_data.username, form_data.password) + if not admin: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Неверное имя пользователя или пароль", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = auth.create_access_token( + data={"sub": admin.username}, expires_delta=access_token_expires + ) + return {"access_token": access_token, "token_type": "bearer"} + +@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED) +def create_admin( + admin: schemas.AdminCreate, + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_super_admin) +): + db_admin = crud.get_admin_by_username(db, username=admin.username) + if db_admin: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Пользователь с таким именем уже существует" + ) + return crud.create_admin(db=db, admin=admin) + +# Эндпоинты для авторизации +@app.post("/token", response_model=schemas.Token) +async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): + admin = auth.authenticate_admin(db, form_data.username, form_data.password) + if not admin: + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Неверное имя пользователя или пароль", + headers={"WWW-Authenticate": "Bearer"}, + ) + access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES) + access_token = auth.create_access_token( + data={"sub": admin.username}, expires_delta=access_token_expires + ) + return {"access_token": access_token, "token_type": "bearer"} + +@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED) +def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)): + # Проверка, что создать админа может только существующий админ + db_admin = crud.get_admin_by_username(db, username=admin.username) + if db_admin: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Пользователь с таким именем уже существует" + ) + return crud.create_admin(db=db, admin=admin) + @app.get("/") def read_root(): return {"message": "AutoBro API"} @@ -59,7 +118,8 @@ def get_car( async def create_car( car_data: str = Form(..., description="Данные автомобиля в JSON формате"), image: UploadFile = File(None, description="Изображение автомобиля"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) ): try: # Преобразуем строку JSON в словарь @@ -92,7 +152,8 @@ async def update_car( car_id: int = Path(..., description="ID автомобиля", gt=0), car_data: str = Form(None, description="Данные автомобиля в JSON формате"), image: UploadFile = File(None, description="Изображение автомобиля"), - db: Session = Depends(get_db) + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) ): # Проверяем существование автомобиля existing_car = crud.get_car(db, car_id=car_id) @@ -138,7 +199,8 @@ async def update_car( @app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_car( car_id: int = Path(..., description="ID автомобиля", gt=0), - db: Session = Depends(get_db) + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) ): # Получаем автомобиль перед удалением car = crud.get_car(db, car_id=car_id) @@ -156,5 +218,144 @@ def delete_car( crud.delete_car(db=db, car_id=car_id) return None +# Эндпоинт для проверки текущего пользователя (только для отладки) +@app.get("/users/me", response_model=schemas.Admin) +def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)): + return current_admin + +# Эндпоинт для проверки текущего пользователя (только для отладки) +@app.get("/users/me", response_model=schemas.Admin) +def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)): + return current_admin + if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=3000) + +# Персонал --------- + +@app.get("/personal", response_model=schemas.PersonalListResponse) +def get_all_personal( + skip: int = Query(0, description="Количество пропускаемых записей"), + limit: int = Query(100, description="Максимальное количество записей"), + db: Session = Depends(get_db) +): + staff = crud.get_all_personal(db, skip=skip, limit=limit) + total = crud.get_personal_count(db) + return {"staff": staff, "total": total} + +@app.get("/personal/{personal_id}", response_model=schemas.PersonalResponse) +def get_personal( + personal_id: int = Path(..., description="ID сотрудника", gt=0), + db: Session = Depends(get_db) +): + personal = crud.get_personal(db, personal_id=personal_id) + if personal is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Сотрудник не найден" + ) + return {"personal": personal} + +@app.post("/personal", response_model=schemas.PersonalResponse, status_code=status.HTTP_201_CREATED) +async def create_personal( + personal_data: str = Form(..., description="Данные сотрудника в JSON формате"), + photo: UploadFile = File(None, description="Фотография сотрудника"), + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) +): + try: + # Преобразуем строку JSON в словарь + personal_dict = json.loads(personal_data) + + # Загружаем фото, если оно предоставлено + if photo: + photo_path = await save_image(photo) + personal_dict["photo"] = photo_path + + # Создаем объект Pydantic для валидации данных + personal = schemas.PersonalCreate(**personal_dict) + + # Создаем запись в БД + db_personal = crud.create_personal(db=db, personal=personal) + return {"personal": db_personal} + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) + except json.JSONDecodeError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Неверный формат JSON" + ) + +@app.put("/personal/{personal_id}", response_model=schemas.PersonalResponse) +async def update_personal( + personal_id: int = Path(..., description="ID сотрудника", gt=0), + personal_data: str = Form(None, description="Данные сотрудника в JSON формате"), + photo: UploadFile = File(None, description="Фотография сотрудника"), + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) +): + # Проверяем существование сотрудника + existing_personal = crud.get_personal(db, personal_id=personal_id) + if existing_personal is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Сотрудник не найден" + ) + + try: + # Преобразуем строку JSON в словарь, если она предоставлена + personal_dict = {} + if personal_data: + personal_dict = json.loads(personal_data) + + # Загружаем новую фотографию, если она предоставлена + if photo: + # Удаляем старую фотографию, если есть + if existing_personal.photo: + delete_image(existing_personal.photo) + + # Сохраняем новую фотографию + photo_path = await save_image(photo) + personal_dict["photo"] = photo_path + + # Создаем объект Pydantic для валидации данных + personal_update = schemas.PersonalUpdate(**personal_dict) + + # Обновляем запись в БД + updated_personal = crud.update_personal(db=db, personal_id=personal_id, personal_update=personal_update) + return {"personal": updated_personal} + except ValueError as e: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=str(e) + ) + except json.JSONDecodeError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Неверный формат JSON" + ) + +@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT) +def delete_personal( + personal_id: int = Path(..., description="ID сотрудника", gt=0), + db: Session = Depends(get_db), + current_admin: models.Admin = Depends(auth.get_current_admin) +): + # Получаем сотрудника перед удалением + personal = crud.get_personal(db, personal_id=personal_id) + if personal is None: + raise HTTPException( + status_code=status.HTTP_404_NOT_FOUND, + detail="Сотрудник не найден" + ) + + # Удаляем фотографию, если есть + if personal.photo: + delete_image(personal.photo) + + # Удаляем запись из БД + crud.delete_personal(db=db, personal_id=personal_id) + return None diff --git a/models.py b/models.py index bf2d032..eda2721 100644 --- a/models.py +++ b/models.py @@ -17,6 +17,15 @@ class PowerRatio(enum.Enum): ELECTRIC_GREATER = "ЭД > ДВС" NA = "не применимо" +class Admin(Base): + __tablename__ = "admins" + + id = Column(Integer, primary_key=True, index=True) + username = Column(String, unique=True, index=True) + hashed_password = Column(String) + is_active = Column(Boolean, default=True) + is_super_admin = Column(Boolean, default=False) # Новое поле для главного админа + class Car(Base): __tablename__ = "cars" @@ -35,3 +44,12 @@ class Car(Base): electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности + +class Personal(Base): + __tablename__ = "personal" + + id = Column(Integer, primary_key=True, index=True) + name = Column(String, index=True) # ФИО сотрудника + surname = Column(String, index=True) # ФИО сотрудника + role = Column(String) # Должность (произвольная строка) + photo = Column(String, nullable=True) # путь к фотографии diff --git a/password_utils.py b/password_utils.py new file mode 100644 index 0000000..db5900e --- /dev/null +++ b/password_utils.py @@ -0,0 +1,12 @@ +from passlib.context import CryptContext + +# Инициализация контекста для хеширования паролей +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +def verify_password(plain_password, hashed_password): + """Проверяет, соответствует ли пароль хешу""" + return pwd_context.verify(plain_password, hashed_password) + +def get_password_hash(password): + """Создаёт хеш пароля""" + return pwd_context.hash(password) diff --git a/requirements.txt b/requirements.txt index f939c44..5024e0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ uvicorn==0.30.0 sqlalchemy==2.0.41 pydantic==2.11.7 python-multipart==0.0.9 +python-jose[cryptography]==3.3.0 +passlib[bcrypt]==1.7.4 diff --git a/schemas.py b/schemas.py index 782cfc4..fc8d764 100644 --- a/schemas.py +++ b/schemas.py @@ -18,6 +18,30 @@ class PowerRatioEnum(str, Enum): ELECTRIC_GREATER = "ЭД > ДВС" NA = "не применимо" +# Схемы для авторизации +class Token(BaseModel): + access_token: str + token_type: str + +class TokenData(BaseModel): + username: Optional[str] = None + +# Схемы для пользователя-админа +class AdminBase(BaseModel): + username: str + +class AdminCreate(AdminBase): + password: str + is_super_admin: Optional[bool] = False + +class Admin(AdminBase): + id: int + is_active: bool + is_super_admin: bool + + class Config: + from_attributes = True + class CarBase(BaseModel): image: Optional[str] = None name: str @@ -65,3 +89,33 @@ class CarResponse(BaseModel): class CarsResponse(BaseModel): cars: List[Car] total: int + +# Персонал --------- + +class PersonalBase(BaseModel): + name: str + surname: str + role: str + photo: Optional[str] = None + +class PersonalCreate(PersonalBase): + pass + +class PersonalUpdate(BaseModel): + name: Optional[str] = None + surname: Optional[str] = None + role: Optional[str] = None + photo: Optional[str] = None + +class Personal(PersonalBase): + id: int + + class Config: + from_attributes = True + +class PersonalResponse(BaseModel): + personal: Personal + +class PersonalListResponse(BaseModel): + staff: List[Personal] + total: int