Compare commits
9 Commits
b6901ddf7b
...
main
Author | SHA1 | Date | |
---|---|---|---|
b83bd87c68 | |||
eb7227bdc2 | |||
9ab99aa7e4 | |||
296f2a91b2 | |||
dd623cd1b8 | |||
481612925c | |||
821741a9f8 | |||
81d71e60f5 | |||
f7b3081893 |
9
auth.py
9
auth.py
@ -57,3 +57,12 @@ async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = D
|
|||||||
if admin is None:
|
if admin is None:
|
||||||
raise credentials_exception
|
raise credentials_exception
|
||||||
return admin
|
return admin
|
||||||
|
|
||||||
|
async def get_current_super_admin(current_admin: models.Admin = Depends(get_current_admin)):
|
||||||
|
"""Проверяет, является ли текущий админ суперадмином"""
|
||||||
|
if not current_admin.is_super_admin:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_403_FORBIDDEN,
|
||||||
|
detail="Недостаточно прав. Эта операция доступна только главному администратору."
|
||||||
|
)
|
||||||
|
return current_admin
|
||||||
|
@ -3,9 +3,9 @@ from models import Admin, Base
|
|||||||
from database import SessionLocal, engine
|
from database import SessionLocal, engine
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
def create_initial_admin(username: str, password: str):
|
def create_initial_admin(username: str, password: str, super_admin: bool = True):
|
||||||
"""
|
"""
|
||||||
Создает первого администратора в системе
|
Создает первого администратора в системе (по умолчанию как главного админа)
|
||||||
"""
|
"""
|
||||||
Base.metadata.create_all(bind=engine)
|
Base.metadata.create_all(bind=engine)
|
||||||
db = SessionLocal()
|
db = SessionLocal()
|
||||||
@ -18,18 +18,24 @@ def create_initial_admin(username: str, password: str):
|
|||||||
|
|
||||||
# Создаем нового админа
|
# Создаем нового админа
|
||||||
hashed_password = get_password_hash(password)
|
hashed_password = get_password_hash(password)
|
||||||
db_admin = Admin(username=username, hashed_password=hashed_password)
|
db_admin = Admin(
|
||||||
|
username=username,
|
||||||
|
hashed_password=hashed_password,
|
||||||
|
is_super_admin=super_admin
|
||||||
|
)
|
||||||
db.add(db_admin)
|
db.add(db_admin)
|
||||||
db.commit()
|
db.commit()
|
||||||
db.refresh(db_admin)
|
db.refresh(db_admin)
|
||||||
print(f"Администратор {username} успешно создан!")
|
admin_type = "главный администратор" if super_admin else "администратор"
|
||||||
|
print(f"{admin_type.capitalize()} {username} успешно создан!")
|
||||||
|
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
parser = argparse.ArgumentParser(description="Создание первого администратора")
|
parser = argparse.ArgumentParser(description="Создание администратора")
|
||||||
parser.add_argument("--username", required=True, help="Имя пользователя администратора")
|
parser.add_argument("--username", required=True, help="Имя пользователя администратора")
|
||||||
parser.add_argument("--password", required=True, help="Пароль администратора")
|
parser.add_argument("--password", required=True, help="Пароль администратора")
|
||||||
|
parser.add_argument("--super", action="store_true", help="Создать как главного администратора")
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
create_initial_admin(args.username, args.password)
|
create_initial_admin(args.username, args.password, args.super)
|
||||||
|
55
crud.py
55
crud.py
@ -1,5 +1,5 @@
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
from models import Car, EngineType, HybridType, PowerRatio, Admin
|
from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin
|
||||||
import schemas
|
import schemas
|
||||||
from typing import List, Optional
|
from typing import List, Optional
|
||||||
from password_utils import get_password_hash
|
from password_utils import get_password_hash
|
||||||
@ -9,7 +9,11 @@ def get_admin_by_username(db: Session, username: str) -> Optional[Admin]:
|
|||||||
|
|
||||||
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
|
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
|
||||||
hashed_password = get_password_hash(admin.password)
|
hashed_password = get_password_hash(admin.password)
|
||||||
db_admin = Admin(username=admin.username, hashed_password=hashed_password)
|
db_admin = Admin(
|
||||||
|
username=admin.username,
|
||||||
|
hashed_password=hashed_password,
|
||||||
|
is_super_admin=admin.is_super_admin
|
||||||
|
)
|
||||||
db.add(db_admin)
|
db.add(db_admin)
|
||||||
db.commit()
|
db.commit()
|
||||||
db.refresh(db_admin)
|
db.refresh(db_admin)
|
||||||
@ -84,3 +88,50 @@ def delete_car(db: Session, car_id: int) -> bool:
|
|||||||
db.delete(db_car)
|
db.delete(db_car)
|
||||||
db.commit()
|
db.commit()
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
# Персонал ---------
|
||||||
|
|
||||||
|
def get_personal(db: Session, personal_id: int) -> Optional[Personal]:
|
||||||
|
return db.query(Personal).filter(Personal.id == personal_id).first()
|
||||||
|
|
||||||
|
def get_all_personal(db: Session, skip: int = 0, limit: int = 100) -> List[Personal]:
|
||||||
|
return db.query(Personal).offset(skip).limit(limit).all()
|
||||||
|
|
||||||
|
def get_personal_count(db: Session) -> int:
|
||||||
|
return db.query(Personal).count()
|
||||||
|
|
||||||
|
def create_personal(db: Session, personal: schemas.PersonalCreate) -> Personal:
|
||||||
|
db_personal = Personal(
|
||||||
|
name=personal.name,
|
||||||
|
surname=personal.surname,
|
||||||
|
role=personal.role,
|
||||||
|
photo=personal.photo
|
||||||
|
)
|
||||||
|
db.add(db_personal)
|
||||||
|
db.commit()
|
||||||
|
db.refresh(db_personal)
|
||||||
|
return db_personal
|
||||||
|
|
||||||
|
def update_personal(db: Session, personal_id: int, personal_update: schemas.PersonalUpdate) -> Optional[Personal]:
|
||||||
|
db_personal = get_personal(db, personal_id)
|
||||||
|
if not db_personal:
|
||||||
|
return None
|
||||||
|
|
||||||
|
update_data = personal_update.model_dump(exclude_unset=True)
|
||||||
|
|
||||||
|
# Обновление полей
|
||||||
|
for key, value in update_data.items():
|
||||||
|
setattr(db_personal, key, value)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
db.refresh(db_personal)
|
||||||
|
return db_personal
|
||||||
|
|
||||||
|
def delete_personal(db: Session, personal_id: int) -> bool:
|
||||||
|
db_personal = get_personal(db, personal_id)
|
||||||
|
if not db_personal:
|
||||||
|
return False
|
||||||
|
|
||||||
|
db.delete(db_personal)
|
||||||
|
db.commit()
|
||||||
|
return True
|
||||||
|
173
main.py
173
main.py
@ -12,15 +12,54 @@ from utils import save_image, delete_image
|
|||||||
import json
|
import json
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
import auth
|
import auth
|
||||||
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
|
|
||||||
# Создание таблиц в БД
|
# Создание таблиц в БД
|
||||||
models.Base.metadata.create_all(bind=engine)
|
models.Base.metadata.create_all(bind=engine)
|
||||||
|
|
||||||
app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей")
|
app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей")
|
||||||
|
|
||||||
|
app.add_middleware(
|
||||||
|
CORSMiddleware,
|
||||||
|
allow_origins=["*"],
|
||||||
|
allow_credentials=True,
|
||||||
|
allow_methods=["*"],
|
||||||
|
allow_headers=["*",]
|
||||||
|
)
|
||||||
|
|
||||||
# Добавляем обработку статических файлов
|
# Добавляем обработку статических файлов
|
||||||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||||||
|
|
||||||
|
# Эндпоинты для авторизации
|
||||||
|
@app.post("/token", response_model=schemas.Token)
|
||||||
|
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
|
||||||
|
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
|
||||||
|
if not admin:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||||
|
detail="Неверное имя пользователя или пароль",
|
||||||
|
headers={"WWW-Authenticate": "Bearer"},
|
||||||
|
)
|
||||||
|
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||||
|
access_token = auth.create_access_token(
|
||||||
|
data={"sub": admin.username}, expires_delta=access_token_expires
|
||||||
|
)
|
||||||
|
return {"access_token": access_token, "token_type": "bearer"}
|
||||||
|
|
||||||
|
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
|
||||||
|
def create_admin(
|
||||||
|
admin: schemas.AdminCreate,
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
current_admin: models.Admin = Depends(auth.get_current_super_admin)
|
||||||
|
):
|
||||||
|
db_admin = crud.get_admin_by_username(db, username=admin.username)
|
||||||
|
if db_admin:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail="Пользователь с таким именем уже существует"
|
||||||
|
)
|
||||||
|
return crud.create_admin(db=db, admin=admin)
|
||||||
|
|
||||||
# Эндпоинты для авторизации
|
# Эндпоинты для авторизации
|
||||||
@app.post("/token", response_model=schemas.Token)
|
@app.post("/token", response_model=schemas.Token)
|
||||||
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
|
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
|
||||||
@ -184,5 +223,139 @@ def delete_car(
|
|||||||
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
|
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
|
||||||
return current_admin
|
return current_admin
|
||||||
|
|
||||||
|
# Эндпоинт для проверки текущего пользователя (только для отладки)
|
||||||
|
@app.get("/users/me", response_model=schemas.Admin)
|
||||||
|
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
|
||||||
|
return current_admin
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
uvicorn.run(app, host="0.0.0.0", port=3000)
|
uvicorn.run(app, host="0.0.0.0", port=3000)
|
||||||
|
|
||||||
|
# Персонал ---------
|
||||||
|
|
||||||
|
@app.get("/personal", response_model=schemas.PersonalListResponse)
|
||||||
|
def get_all_personal(
|
||||||
|
skip: int = Query(0, description="Количество пропускаемых записей"),
|
||||||
|
limit: int = Query(100, description="Максимальное количество записей"),
|
||||||
|
db: Session = Depends(get_db)
|
||||||
|
):
|
||||||
|
staff = crud.get_all_personal(db, skip=skip, limit=limit)
|
||||||
|
total = crud.get_personal_count(db)
|
||||||
|
return {"staff": staff, "total": total}
|
||||||
|
|
||||||
|
@app.get("/personal/{personal_id}", response_model=schemas.PersonalResponse)
|
||||||
|
def get_personal(
|
||||||
|
personal_id: int = Path(..., description="ID сотрудника", gt=0),
|
||||||
|
db: Session = Depends(get_db)
|
||||||
|
):
|
||||||
|
personal = crud.get_personal(db, personal_id=personal_id)
|
||||||
|
if personal is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail="Сотрудник не найден"
|
||||||
|
)
|
||||||
|
return {"personal": personal}
|
||||||
|
|
||||||
|
@app.post("/personal", response_model=schemas.PersonalResponse, status_code=status.HTTP_201_CREATED)
|
||||||
|
async def create_personal(
|
||||||
|
personal_data: str = Form(..., description="Данные сотрудника в JSON формате"),
|
||||||
|
photo: UploadFile = File(None, description="Фотография сотрудника"),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
current_admin: models.Admin = Depends(auth.get_current_admin)
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
# Преобразуем строку JSON в словарь
|
||||||
|
personal_dict = json.loads(personal_data)
|
||||||
|
|
||||||
|
# Загружаем фото, если оно предоставлено
|
||||||
|
if photo:
|
||||||
|
photo_path = await save_image(photo)
|
||||||
|
personal_dict["photo"] = photo_path
|
||||||
|
|
||||||
|
# Создаем объект Pydantic для валидации данных
|
||||||
|
personal = schemas.PersonalCreate(**personal_dict)
|
||||||
|
|
||||||
|
# Создаем запись в БД
|
||||||
|
db_personal = crud.create_personal(db=db, personal=personal)
|
||||||
|
return {"personal": db_personal}
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=str(e)
|
||||||
|
)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail="Неверный формат JSON"
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.put("/personal/{personal_id}", response_model=schemas.PersonalResponse)
|
||||||
|
async def update_personal(
|
||||||
|
personal_id: int = Path(..., description="ID сотрудника", gt=0),
|
||||||
|
personal_data: str = Form(None, description="Данные сотрудника в JSON формате"),
|
||||||
|
photo: UploadFile = File(None, description="Фотография сотрудника"),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
current_admin: models.Admin = Depends(auth.get_current_admin)
|
||||||
|
):
|
||||||
|
# Проверяем существование сотрудника
|
||||||
|
existing_personal = crud.get_personal(db, personal_id=personal_id)
|
||||||
|
if existing_personal is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail="Сотрудник не найден"
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Преобразуем строку JSON в словарь, если она предоставлена
|
||||||
|
personal_dict = {}
|
||||||
|
if personal_data:
|
||||||
|
personal_dict = json.loads(personal_data)
|
||||||
|
|
||||||
|
# Загружаем новую фотографию, если она предоставлена
|
||||||
|
if photo:
|
||||||
|
# Удаляем старую фотографию, если есть
|
||||||
|
if existing_personal.photo:
|
||||||
|
delete_image(existing_personal.photo)
|
||||||
|
|
||||||
|
# Сохраняем новую фотографию
|
||||||
|
photo_path = await save_image(photo)
|
||||||
|
personal_dict["photo"] = photo_path
|
||||||
|
|
||||||
|
# Создаем объект Pydantic для валидации данных
|
||||||
|
personal_update = schemas.PersonalUpdate(**personal_dict)
|
||||||
|
|
||||||
|
# Обновляем запись в БД
|
||||||
|
updated_personal = crud.update_personal(db=db, personal_id=personal_id, personal_update=personal_update)
|
||||||
|
return {"personal": updated_personal}
|
||||||
|
except ValueError as e:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=str(e)
|
||||||
|
)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail="Неверный формат JSON"
|
||||||
|
)
|
||||||
|
|
||||||
|
@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT)
|
||||||
|
def delete_personal(
|
||||||
|
personal_id: int = Path(..., description="ID сотрудника", gt=0),
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
current_admin: models.Admin = Depends(auth.get_current_admin)
|
||||||
|
):
|
||||||
|
# Получаем сотрудника перед удалением
|
||||||
|
personal = crud.get_personal(db, personal_id=personal_id)
|
||||||
|
if personal is None:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_404_NOT_FOUND,
|
||||||
|
detail="Сотрудник не найден"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Удаляем фотографию, если есть
|
||||||
|
if personal.photo:
|
||||||
|
delete_image(personal.photo)
|
||||||
|
|
||||||
|
# Удаляем запись из БД
|
||||||
|
crud.delete_personal(db=db, personal_id=personal_id)
|
||||||
|
return None
|
||||||
|
10
models.py
10
models.py
@ -24,6 +24,7 @@ class Admin(Base):
|
|||||||
username = Column(String, unique=True, index=True)
|
username = Column(String, unique=True, index=True)
|
||||||
hashed_password = Column(String)
|
hashed_password = Column(String)
|
||||||
is_active = Column(Boolean, default=True)
|
is_active = Column(Boolean, default=True)
|
||||||
|
is_super_admin = Column(Boolean, default=False) # Новое поле для главного админа
|
||||||
|
|
||||||
class Car(Base):
|
class Car(Base):
|
||||||
__tablename__ = "cars"
|
__tablename__ = "cars"
|
||||||
@ -43,3 +44,12 @@ class Car(Base):
|
|||||||
electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя
|
electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя
|
||||||
hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида
|
hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида
|
||||||
power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности
|
power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности
|
||||||
|
|
||||||
|
class Personal(Base):
|
||||||
|
__tablename__ = "personal"
|
||||||
|
|
||||||
|
id = Column(Integer, primary_key=True, index=True)
|
||||||
|
name = Column(String, index=True) # ФИО сотрудника
|
||||||
|
surname = Column(String, index=True) # ФИО сотрудника
|
||||||
|
role = Column(String) # Должность (произвольная строка)
|
||||||
|
photo = Column(String, nullable=True) # путь к фотографии
|
||||||
|
32
schemas.py
32
schemas.py
@ -32,10 +32,12 @@ class AdminBase(BaseModel):
|
|||||||
|
|
||||||
class AdminCreate(AdminBase):
|
class AdminCreate(AdminBase):
|
||||||
password: str
|
password: str
|
||||||
|
is_super_admin: Optional[bool] = False
|
||||||
|
|
||||||
class Admin(AdminBase):
|
class Admin(AdminBase):
|
||||||
id: int
|
id: int
|
||||||
is_active: bool
|
is_active: bool
|
||||||
|
is_super_admin: bool
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
from_attributes = True
|
from_attributes = True
|
||||||
@ -87,3 +89,33 @@ class CarResponse(BaseModel):
|
|||||||
class CarsResponse(BaseModel):
|
class CarsResponse(BaseModel):
|
||||||
cars: List[Car]
|
cars: List[Car]
|
||||||
total: int
|
total: int
|
||||||
|
|
||||||
|
# Персонал ---------
|
||||||
|
|
||||||
|
class PersonalBase(BaseModel):
|
||||||
|
name: str
|
||||||
|
surname: str
|
||||||
|
role: str
|
||||||
|
photo: Optional[str] = None
|
||||||
|
|
||||||
|
class PersonalCreate(PersonalBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class PersonalUpdate(BaseModel):
|
||||||
|
name: Optional[str] = None
|
||||||
|
surname: Optional[str] = None
|
||||||
|
role: Optional[str] = None
|
||||||
|
photo: Optional[str] = None
|
||||||
|
|
||||||
|
class Personal(PersonalBase):
|
||||||
|
id: int
|
||||||
|
|
||||||
|
class Config:
|
||||||
|
from_attributes = True
|
||||||
|
|
||||||
|
class PersonalResponse(BaseModel):
|
||||||
|
personal: Personal
|
||||||
|
|
||||||
|
class PersonalListResponse(BaseModel):
|
||||||
|
staff: List[Personal]
|
||||||
|
total: int
|
||||||
|
Reference in New Issue
Block a user