Compare commits

..

10 Commits

Author SHA1 Message Date
b83bd87c68 1 2025-07-16 11:07:20 +05:00
eb7227bdc2 add: super admin role 2025-07-15 22:51:16 +05:00
9ab99aa7e4 fix main 2025-07-13 17:32:08 +05:00
296f2a91b2 fix conflict 2025-07-13 17:31:48 +05:00
dd623cd1b8 add: admin required to personal endpoints 2025-07-13 17:27:13 +05:00
481612925c add: personal 2025-07-13 17:24:50 +05:00
b6901ddf7b add authorization and admin role 2025-07-13 17:19:52 +05:00
821741a9f8 add personal 2025-07-12 01:47:29 +05:00
81d71e60f5 123 2025-07-10 19:54:07 +05:00
f7b3081893 edit to run in local 2025-07-09 17:30:07 +05:00
9 changed files with 502 additions and 6 deletions

View File

@ -9,16 +9,24 @@ API сервер для хранения и управления данными
- **schemas.py** - схемы Pydantic для валидации данных
- **database.py** - настройка подключения к базе данных
- **crud.py** - функции для CRUD операций
- **auth.py** - функции для JWT авторизации
- **create_admin.py** - скрипт для создания первого админа
## Запуск проекта
1. Установите зависимости:
```bash
pip install fastapi uvicorn sqlalchemy pydantic
pip install -r requirements.txt
```
2. Запустите сервер:
2. Создайте первого администратора:
```bash
python create_admin.py --username admin --password your_password
```
3. Запустите сервер:
```bash
uvicorn main:app --reload
@ -26,8 +34,27 @@ uvicorn main:app --reload
Сервер будет доступен по адресу http://localhost:8000
## JWT Авторизация
Для доступа к API необходима JWT авторизация. Вот как она работает:
1. Получите токен доступа, отправив POST-запрос к `/token` с вашими учетными данными:
```bash
curl -X POST http://localhost:8000/token -d "username=admin&password=your_password" -H "Content-Type: application/x-www-form-urlencoded"
```
2. Используйте полученный токен для доступа к защищенным эндпоинтам:
```bash
curl -X GET http://localhost:8000/cars -H "Authorization: Bearer your_token_here"
```
## API эндпоинты
- **POST /token** - получить JWT токен (авторизация)
- **POST /admins** - создать нового администратора (только для админов)
- **GET /users/me** - получить информацию о текущем пользователе
- **GET /cars** - получить список всех автомобилей
- **GET /cars/{car_id}** - получить информацию о конкретном автомобиле
- **POST /cars** - добавить новый автомобиль

68
auth.py Normal file
View File

@ -0,0 +1,68 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
import schemas
import models
import crud
from database import get_db
from password_utils import verify_password
# Константы для JWT
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Инициализация OAuth2PasswordBearer для получения токена из запроса
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def authenticate_admin(db: Session, username: str, password: str):
"""Аутентификация админа"""
admin = crud.get_admin_by_username(db, username)
if not admin:
return False
if not verify_password(password, admin.hashed_password):
return False
return admin
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создаёт JWT токен"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
"""Получает текущего админа из токена"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Некорректные учетные данные",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = schemas.TokenData(username=username)
except JWTError:
raise credentials_exception
admin = crud.get_admin_by_username(db, username=token_data.username)
if admin is None:
raise credentials_exception
return admin
async def get_current_super_admin(current_admin: models.Admin = Depends(get_current_admin)):
"""Проверяет, является ли текущий админ суперадмином"""
if not current_admin.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав. Эта операция доступна только главному администратору."
)
return current_admin

41
create_admin.py Normal file
View File

@ -0,0 +1,41 @@
from password_utils import get_password_hash
from models import Admin, Base
from database import SessionLocal, engine
import argparse
def create_initial_admin(username: str, password: str, super_admin: bool = True):
"""
Создает первого администратора в системе (по умолчанию как главного админа)
"""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# Проверяем, существует ли уже админ с таким именем
existing_admin = db.query(Admin).filter(Admin.username == username).first()
if existing_admin:
print(f"Администратор с именем {username} уже существует!")
return
# Создаем нового админа
hashed_password = get_password_hash(password)
db_admin = Admin(
username=username,
hashed_password=hashed_password,
is_super_admin=super_admin
)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
admin_type = "главный администратор" if super_admin else "администратор"
print(f"{admin_type.capitalize()} {username} успешно создан!")
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Создание администратора")
parser.add_argument("--username", required=True, help="Имя пользователя администратора")
parser.add_argument("--password", required=True, help="Пароль администратора")
parser.add_argument("--super", action="store_true", help="Создать как главного администратора")
args = parser.parse_args()
create_initial_admin(args.username, args.password, args.super)

65
crud.py
View File

@ -1,7 +1,23 @@
from sqlalchemy.orm import Session
from models import Car, EngineType, HybridType, PowerRatio
from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin
import schemas
from typing import List, Optional
from password_utils import get_password_hash
def get_admin_by_username(db: Session, username: str) -> Optional[Admin]:
return db.query(Admin).filter(Admin.username == username).first()
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
hashed_password = get_password_hash(admin.password)
db_admin = Admin(
username=admin.username,
hashed_password=hashed_password,
is_super_admin=admin.is_super_admin
)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
return db_admin
def get_car(db: Session, car_id: int) -> Optional[Car]:
return db.query(Car).filter(Car.id == car_id).first()
@ -72,3 +88,50 @@ def delete_car(db: Session, car_id: int) -> bool:
db.delete(db_car)
db.commit()
return True
# Персонал ---------
def get_personal(db: Session, personal_id: int) -> Optional[Personal]:
return db.query(Personal).filter(Personal.id == personal_id).first()
def get_all_personal(db: Session, skip: int = 0, limit: int = 100) -> List[Personal]:
return db.query(Personal).offset(skip).limit(limit).all()
def get_personal_count(db: Session) -> int:
return db.query(Personal).count()
def create_personal(db: Session, personal: schemas.PersonalCreate) -> Personal:
db_personal = Personal(
name=personal.name,
surname=personal.surname,
role=personal.role,
photo=personal.photo
)
db.add(db_personal)
db.commit()
db.refresh(db_personal)
return db_personal
def update_personal(db: Session, personal_id: int, personal_update: schemas.PersonalUpdate) -> Optional[Personal]:
db_personal = get_personal(db, personal_id)
if not db_personal:
return None
update_data = personal_update.model_dump(exclude_unset=True)
# Обновление полей
for key, value in update_data.items():
setattr(db_personal, key, value)
db.commit()
db.refresh(db_personal)
return db_personal
def delete_personal(db: Session, personal_id: int) -> bool:
db_personal = get_personal(db, personal_id)
if not db_personal:
return False
db.delete(db_personal)
db.commit()
return True

217
main.py
View File

@ -1,5 +1,6 @@
from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
import crud
import models
@ -9,15 +10,83 @@ from typing import List, Optional
import uvicorn
from utils import save_image, delete_image
import json
from datetime import timedelta
import auth
from fastapi.middleware.cors import CORSMiddleware
# Создание таблиц в БД
models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*",]
)
# Добавляем обработку статических файлов
app.mount("/static", StaticFiles(directory="static"), name="static")
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(
admin: schemas.AdminCreate,
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_super_admin)
):
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)):
# Проверка, что создать админа может только существующий админ
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
@app.get("/")
def read_root():
return {"message": "AutoBro API"}
@ -49,7 +118,8 @@ def get_car(
async def create_car(
car_data: str = Form(..., description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
try:
# Преобразуем строку JSON в словарь
@ -82,7 +152,8 @@ async def update_car(
car_id: int = Path(..., description="ID автомобиля", gt=0),
car_data: str = Form(None, description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Проверяем существование автомобиля
existing_car = crud.get_car(db, car_id=car_id)
@ -128,7 +199,8 @@ async def update_car(
@app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_car(
car_id: int = Path(..., description="ID автомобиля", gt=0),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Получаем автомобиль перед удалением
car = crud.get_car(db, car_id=car_id)
@ -146,5 +218,144 @@ def delete_car(
crud.delete_car(db=db, car_id=car_id)
return None
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3000)
# Персонал ---------
@app.get("/personal", response_model=schemas.PersonalListResponse)
def get_all_personal(
skip: int = Query(0, description="Количество пропускаемых записей"),
limit: int = Query(100, description="Максимальное количество записей"),
db: Session = Depends(get_db)
):
staff = crud.get_all_personal(db, skip=skip, limit=limit)
total = crud.get_personal_count(db)
return {"staff": staff, "total": total}
@app.get("/personal/{personal_id}", response_model=schemas.PersonalResponse)
def get_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db)
):
personal = crud.get_personal(db, personal_id=personal_id)
if personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
return {"personal": personal}
@app.post("/personal", response_model=schemas.PersonalResponse, status_code=status.HTTP_201_CREATED)
async def create_personal(
personal_data: str = Form(..., description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
try:
# Преобразуем строку JSON в словарь
personal_dict = json.loads(personal_data)
# Загружаем фото, если оно предоставлено
if photo:
photo_path = await save_image(photo)
personal_dict["photo"] = photo_path
# Создаем объект Pydantic для валидации данных
personal = schemas.PersonalCreate(**personal_dict)
# Создаем запись в БД
db_personal = crud.create_personal(db=db, personal=personal)
return {"personal": db_personal}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный формат JSON"
)
@app.put("/personal/{personal_id}", response_model=schemas.PersonalResponse)
async def update_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
personal_data: str = Form(None, description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Проверяем существование сотрудника
existing_personal = crud.get_personal(db, personal_id=personal_id)
if existing_personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
try:
# Преобразуем строку JSON в словарь, если она предоставлена
personal_dict = {}
if personal_data:
personal_dict = json.loads(personal_data)
# Загружаем новую фотографию, если она предоставлена
if photo:
# Удаляем старую фотографию, если есть
if existing_personal.photo:
delete_image(existing_personal.photo)
# Сохраняем новую фотографию
photo_path = await save_image(photo)
personal_dict["photo"] = photo_path
# Создаем объект Pydantic для валидации данных
personal_update = schemas.PersonalUpdate(**personal_dict)
# Обновляем запись в БД
updated_personal = crud.update_personal(db=db, personal_id=personal_id, personal_update=personal_update)
return {"personal": updated_personal}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный формат JSON"
)
@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Получаем сотрудника перед удалением
personal = crud.get_personal(db, personal_id=personal_id)
if personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
# Удаляем фотографию, если есть
if personal.photo:
delete_image(personal.photo)
# Удаляем запись из БД
crud.delete_personal(db=db, personal_id=personal_id)
return None

View File

@ -17,6 +17,15 @@ class PowerRatio(enum.Enum):
ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо"
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
is_super_admin = Column(Boolean, default=False) # Новое поле для главного админа
class Car(Base):
__tablename__ = "cars"
@ -35,3 +44,12 @@ class Car(Base):
electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя
hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида
power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности
class Personal(Base):
__tablename__ = "personal"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True) # ФИО сотрудника
surname = Column(String, index=True) # ФИО сотрудника
role = Column(String) # Должность (произвольная строка)
photo = Column(String, nullable=True) # путь к фотографии

12
password_utils.py Normal file
View File

@ -0,0 +1,12 @@
from passlib.context import CryptContext
# Инициализация контекста для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
"""Проверяет, соответствует ли пароль хешу"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""Создаёт хеш пароля"""
return pwd_context.hash(password)

View File

@ -3,3 +3,5 @@ uvicorn==0.30.0
sqlalchemy==2.0.41
pydantic==2.11.7
python-multipart==0.0.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4

View File

@ -18,6 +18,30 @@ class PowerRatioEnum(str, Enum):
ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо"
# Схемы для авторизации
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# Схемы для пользователя-админа
class AdminBase(BaseModel):
username: str
class AdminCreate(AdminBase):
password: str
is_super_admin: Optional[bool] = False
class Admin(AdminBase):
id: int
is_active: bool
is_super_admin: bool
class Config:
from_attributes = True
class CarBase(BaseModel):
image: Optional[str] = None
name: str
@ -65,3 +89,33 @@ class CarResponse(BaseModel):
class CarsResponse(BaseModel):
cars: List[Car]
total: int
# Персонал ---------
class PersonalBase(BaseModel):
name: str
surname: str
role: str
photo: Optional[str] = None
class PersonalCreate(PersonalBase):
pass
class PersonalUpdate(BaseModel):
name: Optional[str] = None
surname: Optional[str] = None
role: Optional[str] = None
photo: Optional[str] = None
class Personal(PersonalBase):
id: int
class Config:
from_attributes = True
class PersonalResponse(BaseModel):
personal: Personal
class PersonalListResponse(BaseModel):
staff: List[Personal]
total: int