Compare commits

..

9 Commits

Author SHA1 Message Date
b83bd87c68 1 2025-07-16 11:07:20 +05:00
eb7227bdc2 add: super admin role 2025-07-15 22:51:16 +05:00
9ab99aa7e4 fix main 2025-07-13 17:32:08 +05:00
296f2a91b2 fix conflict 2025-07-13 17:31:48 +05:00
dd623cd1b8 add: admin required to personal endpoints 2025-07-13 17:27:13 +05:00
481612925c add: personal 2025-07-13 17:24:50 +05:00
821741a9f8 add personal 2025-07-12 01:47:29 +05:00
81d71e60f5 123 2025-07-10 19:54:07 +05:00
f7b3081893 edit to run in local 2025-07-09 17:30:07 +05:00
6 changed files with 289 additions and 8 deletions

View File

@ -57,3 +57,12 @@ async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = D
if admin is None: if admin is None:
raise credentials_exception raise credentials_exception
return admin return admin
async def get_current_super_admin(current_admin: models.Admin = Depends(get_current_admin)):
"""Проверяет, является ли текущий админ суперадмином"""
if not current_admin.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав. Эта операция доступна только главному администратору."
)
return current_admin

View File

@ -3,9 +3,9 @@ from models import Admin, Base
from database import SessionLocal, engine from database import SessionLocal, engine
import argparse import argparse
def create_initial_admin(username: str, password: str): def create_initial_admin(username: str, password: str, super_admin: bool = True):
""" """
Создает первого администратора в системе Создает первого администратора в системе (по умолчанию как главного админа)
""" """
Base.metadata.create_all(bind=engine) Base.metadata.create_all(bind=engine)
db = SessionLocal() db = SessionLocal()
@ -18,18 +18,24 @@ def create_initial_admin(username: str, password: str):
# Создаем нового админа # Создаем нового админа
hashed_password = get_password_hash(password) hashed_password = get_password_hash(password)
db_admin = Admin(username=username, hashed_password=hashed_password) db_admin = Admin(
username=username,
hashed_password=hashed_password,
is_super_admin=super_admin
)
db.add(db_admin) db.add(db_admin)
db.commit() db.commit()
db.refresh(db_admin) db.refresh(db_admin)
print(f"Администратор {username} успешно создан!") admin_type = "главный администратор" if super_admin else "администратор"
print(f"{admin_type.capitalize()} {username} успешно создан!")
db.close() db.close()
if __name__ == "__main__": if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Создание первого администратора") parser = argparse.ArgumentParser(description="Создание администратора")
parser.add_argument("--username", required=True, help="Имя пользователя администратора") parser.add_argument("--username", required=True, help="Имя пользователя администратора")
parser.add_argument("--password", required=True, help="Пароль администратора") parser.add_argument("--password", required=True, help="Пароль администратора")
parser.add_argument("--super", action="store_true", help="Создать как главного администратора")
args = parser.parse_args() args = parser.parse_args()
create_initial_admin(args.username, args.password) create_initial_admin(args.username, args.password, args.super)

55
crud.py
View File

@ -1,5 +1,5 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from models import Car, EngineType, HybridType, PowerRatio, Admin from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin
import schemas import schemas
from typing import List, Optional from typing import List, Optional
from password_utils import get_password_hash from password_utils import get_password_hash
@ -9,7 +9,11 @@ def get_admin_by_username(db: Session, username: str) -> Optional[Admin]:
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin: def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
hashed_password = get_password_hash(admin.password) hashed_password = get_password_hash(admin.password)
db_admin = Admin(username=admin.username, hashed_password=hashed_password) db_admin = Admin(
username=admin.username,
hashed_password=hashed_password,
is_super_admin=admin.is_super_admin
)
db.add(db_admin) db.add(db_admin)
db.commit() db.commit()
db.refresh(db_admin) db.refresh(db_admin)
@ -84,3 +88,50 @@ def delete_car(db: Session, car_id: int) -> bool:
db.delete(db_car) db.delete(db_car)
db.commit() db.commit()
return True return True
# Персонал ---------
def get_personal(db: Session, personal_id: int) -> Optional[Personal]:
return db.query(Personal).filter(Personal.id == personal_id).first()
def get_all_personal(db: Session, skip: int = 0, limit: int = 100) -> List[Personal]:
return db.query(Personal).offset(skip).limit(limit).all()
def get_personal_count(db: Session) -> int:
return db.query(Personal).count()
def create_personal(db: Session, personal: schemas.PersonalCreate) -> Personal:
db_personal = Personal(
name=personal.name,
surname=personal.surname,
role=personal.role,
photo=personal.photo
)
db.add(db_personal)
db.commit()
db.refresh(db_personal)
return db_personal
def update_personal(db: Session, personal_id: int, personal_update: schemas.PersonalUpdate) -> Optional[Personal]:
db_personal = get_personal(db, personal_id)
if not db_personal:
return None
update_data = personal_update.model_dump(exclude_unset=True)
# Обновление полей
for key, value in update_data.items():
setattr(db_personal, key, value)
db.commit()
db.refresh(db_personal)
return db_personal
def delete_personal(db: Session, personal_id: int) -> bool:
db_personal = get_personal(db, personal_id)
if not db_personal:
return False
db.delete(db_personal)
db.commit()
return True

173
main.py
View File

@ -12,15 +12,54 @@ from utils import save_image, delete_image
import json import json
from datetime import timedelta from datetime import timedelta
import auth import auth
from fastapi.middleware.cors import CORSMiddleware
# Создание таблиц в БД # Создание таблиц в БД
models.Base.metadata.create_all(bind=engine) models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей") app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*",]
)
# Добавляем обработку статических файлов # Добавляем обработку статических файлов
app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static", StaticFiles(directory="static"), name="static")
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(
admin: schemas.AdminCreate,
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_super_admin)
):
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
# Эндпоинты для авторизации # Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token) @app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
@ -184,5 +223,139 @@ def delete_car(
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)): def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin return current_admin
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3000) uvicorn.run(app, host="0.0.0.0", port=3000)
# Персонал ---------
@app.get("/personal", response_model=schemas.PersonalListResponse)
def get_all_personal(
skip: int = Query(0, description="Количество пропускаемых записей"),
limit: int = Query(100, description="Максимальное количество записей"),
db: Session = Depends(get_db)
):
staff = crud.get_all_personal(db, skip=skip, limit=limit)
total = crud.get_personal_count(db)
return {"staff": staff, "total": total}
@app.get("/personal/{personal_id}", response_model=schemas.PersonalResponse)
def get_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db)
):
personal = crud.get_personal(db, personal_id=personal_id)
if personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
return {"personal": personal}
@app.post("/personal", response_model=schemas.PersonalResponse, status_code=status.HTTP_201_CREATED)
async def create_personal(
personal_data: str = Form(..., description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
try:
# Преобразуем строку JSON в словарь
personal_dict = json.loads(personal_data)
# Загружаем фото, если оно предоставлено
if photo:
photo_path = await save_image(photo)
personal_dict["photo"] = photo_path
# Создаем объект Pydantic для валидации данных
personal = schemas.PersonalCreate(**personal_dict)
# Создаем запись в БД
db_personal = crud.create_personal(db=db, personal=personal)
return {"personal": db_personal}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный формат JSON"
)
@app.put("/personal/{personal_id}", response_model=schemas.PersonalResponse)
async def update_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
personal_data: str = Form(None, description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Проверяем существование сотрудника
existing_personal = crud.get_personal(db, personal_id=personal_id)
if existing_personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
try:
# Преобразуем строку JSON в словарь, если она предоставлена
personal_dict = {}
if personal_data:
personal_dict = json.loads(personal_data)
# Загружаем новую фотографию, если она предоставлена
if photo:
# Удаляем старую фотографию, если есть
if existing_personal.photo:
delete_image(existing_personal.photo)
# Сохраняем новую фотографию
photo_path = await save_image(photo)
personal_dict["photo"] = photo_path
# Создаем объект Pydantic для валидации данных
personal_update = schemas.PersonalUpdate(**personal_dict)
# Обновляем запись в БД
updated_personal = crud.update_personal(db=db, personal_id=personal_id, personal_update=personal_update)
return {"personal": updated_personal}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный формат JSON"
)
@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Получаем сотрудника перед удалением
personal = crud.get_personal(db, personal_id=personal_id)
if personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
# Удаляем фотографию, если есть
if personal.photo:
delete_image(personal.photo)
# Удаляем запись из БД
crud.delete_personal(db=db, personal_id=personal_id)
return None

View File

@ -24,6 +24,7 @@ class Admin(Base):
username = Column(String, unique=True, index=True) username = Column(String, unique=True, index=True)
hashed_password = Column(String) hashed_password = Column(String)
is_active = Column(Boolean, default=True) is_active = Column(Boolean, default=True)
is_super_admin = Column(Boolean, default=False) # Новое поле для главного админа
class Car(Base): class Car(Base):
__tablename__ = "cars" __tablename__ = "cars"
@ -43,3 +44,12 @@ class Car(Base):
electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя
hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида
power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности
class Personal(Base):
__tablename__ = "personal"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True) # ФИО сотрудника
surname = Column(String, index=True) # ФИО сотрудника
role = Column(String) # Должность (произвольная строка)
photo = Column(String, nullable=True) # путь к фотографии

View File

@ -32,10 +32,12 @@ class AdminBase(BaseModel):
class AdminCreate(AdminBase): class AdminCreate(AdminBase):
password: str password: str
is_super_admin: Optional[bool] = False
class Admin(AdminBase): class Admin(AdminBase):
id: int id: int
is_active: bool is_active: bool
is_super_admin: bool
class Config: class Config:
from_attributes = True from_attributes = True
@ -87,3 +89,33 @@ class CarResponse(BaseModel):
class CarsResponse(BaseModel): class CarsResponse(BaseModel):
cars: List[Car] cars: List[Car]
total: int total: int
# Персонал ---------
class PersonalBase(BaseModel):
name: str
surname: str
role: str
photo: Optional[str] = None
class PersonalCreate(PersonalBase):
pass
class PersonalUpdate(BaseModel):
name: Optional[str] = None
surname: Optional[str] = None
role: Optional[str] = None
photo: Optional[str] = None
class Personal(PersonalBase):
id: int
class Config:
from_attributes = True
class PersonalResponse(BaseModel):
personal: Personal
class PersonalListResponse(BaseModel):
staff: List[Personal]
total: int