This commit is contained in:
aurinex
2025-07-16 11:07:20 +05:00
9 changed files with 495 additions and 9 deletions

View File

@ -9,16 +9,24 @@ API сервер для хранения и управления данными
- **schemas.py** - схемы Pydantic для валидации данных - **schemas.py** - схемы Pydantic для валидации данных
- **database.py** - настройка подключения к базе данных - **database.py** - настройка подключения к базе данных
- **crud.py** - функции для CRUD операций - **crud.py** - функции для CRUD операций
- **auth.py** - функции для JWT авторизации
- **create_admin.py** - скрипт для создания первого админа
## Запуск проекта ## Запуск проекта
1. Установите зависимости: 1. Установите зависимости:
```bash ```bash
pip install fastapi uvicorn sqlalchemy pydantic pip install -r requirements.txt
``` ```
2. Запустите сервер: 2. Создайте первого администратора:
```bash
python create_admin.py --username admin --password your_password
```
3. Запустите сервер:
```bash ```bash
uvicorn main:app --reload uvicorn main:app --reload
@ -26,8 +34,27 @@ uvicorn main:app --reload
Сервер будет доступен по адресу http://localhost:8000 Сервер будет доступен по адресу http://localhost:8000
## JWT Авторизация
Для доступа к API необходима JWT авторизация. Вот как она работает:
1. Получите токен доступа, отправив POST-запрос к `/token` с вашими учетными данными:
```bash
curl -X POST http://localhost:8000/token -d "username=admin&password=your_password" -H "Content-Type: application/x-www-form-urlencoded"
```
2. Используйте полученный токен для доступа к защищенным эндпоинтам:
```bash
curl -X GET http://localhost:8000/cars -H "Authorization: Bearer your_token_here"
```
## API эндпоинты ## API эндпоинты
- **POST /token** - получить JWT токен (авторизация)
- **POST /admins** - создать нового администратора (только для админов)
- **GET /users/me** - получить информацию о текущем пользователе
- **GET /cars** - получить список всех автомобилей - **GET /cars** - получить список всех автомобилей
- **GET /cars/{car_id}** - получить информацию о конкретном автомобиле - **GET /cars/{car_id}** - получить информацию о конкретном автомобиле
- **POST /cars** - добавить новый автомобиль - **POST /cars** - добавить новый автомобиль

68
auth.py Normal file
View File

@ -0,0 +1,68 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
import schemas
import models
import crud
from database import get_db
from password_utils import verify_password
# Константы для JWT
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Инициализация OAuth2PasswordBearer для получения токена из запроса
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def authenticate_admin(db: Session, username: str, password: str):
"""Аутентификация админа"""
admin = crud.get_admin_by_username(db, username)
if not admin:
return False
if not verify_password(password, admin.hashed_password):
return False
return admin
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создаёт JWT токен"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
"""Получает текущего админа из токена"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Некорректные учетные данные",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = schemas.TokenData(username=username)
except JWTError:
raise credentials_exception
admin = crud.get_admin_by_username(db, username=token_data.username)
if admin is None:
raise credentials_exception
return admin
async def get_current_super_admin(current_admin: models.Admin = Depends(get_current_admin)):
"""Проверяет, является ли текущий админ суперадмином"""
if not current_admin.is_super_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Недостаточно прав. Эта операция доступна только главному администратору."
)
return current_admin

41
create_admin.py Normal file
View File

@ -0,0 +1,41 @@
from password_utils import get_password_hash
from models import Admin, Base
from database import SessionLocal, engine
import argparse
def create_initial_admin(username: str, password: str, super_admin: bool = True):
"""
Создает первого администратора в системе (по умолчанию как главного админа)
"""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# Проверяем, существует ли уже админ с таким именем
existing_admin = db.query(Admin).filter(Admin.username == username).first()
if existing_admin:
print(f"Администратор с именем {username} уже существует!")
return
# Создаем нового админа
hashed_password = get_password_hash(password)
db_admin = Admin(
username=username,
hashed_password=hashed_password,
is_super_admin=super_admin
)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
admin_type = "главный администратор" if super_admin else "администратор"
print(f"{admin_type.capitalize()} {username} успешно создан!")
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Создание администратора")
parser.add_argument("--username", required=True, help="Имя пользователя администратора")
parser.add_argument("--password", required=True, help="Пароль администратора")
parser.add_argument("--super", action="store_true", help="Создать как главного администратора")
args = parser.parse_args()
create_initial_admin(args.username, args.password, args.super)

65
crud.py
View File

@ -1,7 +1,23 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from models import Car, EngineType, HybridType, PowerRatio from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin
import schemas import schemas
from typing import List, Optional from typing import List, Optional
from password_utils import get_password_hash
def get_admin_by_username(db: Session, username: str) -> Optional[Admin]:
return db.query(Admin).filter(Admin.username == username).first()
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
hashed_password = get_password_hash(admin.password)
db_admin = Admin(
username=admin.username,
hashed_password=hashed_password,
is_super_admin=admin.is_super_admin
)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
return db_admin
def get_car(db: Session, car_id: int) -> Optional[Car]: def get_car(db: Session, car_id: int) -> Optional[Car]:
return db.query(Car).filter(Car.id == car_id).first() return db.query(Car).filter(Car.id == car_id).first()
@ -72,3 +88,50 @@ def delete_car(db: Session, car_id: int) -> bool:
db.delete(db_car) db.delete(db_car)
db.commit() db.commit()
return True return True
# Персонал ---------
def get_personal(db: Session, personal_id: int) -> Optional[Personal]:
return db.query(Personal).filter(Personal.id == personal_id).first()
def get_all_personal(db: Session, skip: int = 0, limit: int = 100) -> List[Personal]:
return db.query(Personal).offset(skip).limit(limit).all()
def get_personal_count(db: Session) -> int:
return db.query(Personal).count()
def create_personal(db: Session, personal: schemas.PersonalCreate) -> Personal:
db_personal = Personal(
name=personal.name,
surname=personal.surname,
role=personal.role,
photo=personal.photo
)
db.add(db_personal)
db.commit()
db.refresh(db_personal)
return db_personal
def update_personal(db: Session, personal_id: int, personal_update: schemas.PersonalUpdate) -> Optional[Personal]:
db_personal = get_personal(db, personal_id)
if not db_personal:
return None
update_data = personal_update.model_dump(exclude_unset=True)
# Обновление полей
for key, value in update_data.items():
setattr(db_personal, key, value)
db.commit()
db.refresh(db_personal)
return db_personal
def delete_personal(db: Session, personal_id: int) -> bool:
db_personal = get_personal(db, personal_id)
if not db_personal:
return False
db.delete(db_personal)
db.commit()
return True

213
main.py
View File

@ -1,5 +1,6 @@
from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
import crud import crud
import models import models
@ -9,6 +10,8 @@ from typing import List, Optional
import uvicorn import uvicorn
from utils import save_image, delete_image from utils import save_image, delete_image
import json import json
from datetime import timedelta
import auth
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
# Создание таблиц в БД # Создание таблиц в БД
@ -16,18 +19,74 @@ models.Base.metadata.create_all(bind=engine)
app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей") app = FastAPI(title="AutoBro API", description="API для управления базой данных автомобилей")
# Настройка CORS
app.add_middleware( app.add_middleware(
CORSMiddleware, CORSMiddleware,
allow_origins=["*"], # В продакшене замените на конкретные домены allow_origins=["*"],
allow_credentials=True, allow_credentials=True,
allow_methods=["*"], allow_methods=["*"],
allow_headers=["*"], allow_headers=["*",]
) )
# Добавляем обработку статических файлов # Добавляем обработку статических файлов
app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static", StaticFiles(directory="static"), name="static")
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(
admin: schemas.AdminCreate,
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_super_admin)
):
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)):
# Проверка, что создать админа может только существующий админ
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
@app.get("/") @app.get("/")
def read_root(): def read_root():
return {"message": "AutoBro API"} return {"message": "AutoBro API"}
@ -59,7 +118,8 @@ def get_car(
async def create_car( async def create_car(
car_data: str = Form(..., description="Данные автомобиля в JSON формате"), car_data: str = Form(..., description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"), image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
try: try:
# Преобразуем строку JSON в словарь # Преобразуем строку JSON в словарь
@ -92,7 +152,8 @@ async def update_car(
car_id: int = Path(..., description="ID автомобиля", gt=0), car_id: int = Path(..., description="ID автомобиля", gt=0),
car_data: str = Form(None, description="Данные автомобиля в JSON формате"), car_data: str = Form(None, description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"), image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
# Проверяем существование автомобиля # Проверяем существование автомобиля
existing_car = crud.get_car(db, car_id=car_id) existing_car = crud.get_car(db, car_id=car_id)
@ -138,7 +199,8 @@ async def update_car(
@app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT) @app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_car( def delete_car(
car_id: int = Path(..., description="ID автомобиля", gt=0), car_id: int = Path(..., description="ID автомобиля", gt=0),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
# Получаем автомобиль перед удалением # Получаем автомобиль перед удалением
car = crud.get_car(db, car_id=car_id) car = crud.get_car(db, car_id=car_id)
@ -156,5 +218,144 @@ def delete_car(
crud.delete_car(db=db, car_id=car_id) crud.delete_car(db=db, car_id=car_id)
return None return None
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3000) uvicorn.run(app, host="0.0.0.0", port=3000)
# Персонал ---------
@app.get("/personal", response_model=schemas.PersonalListResponse)
def get_all_personal(
skip: int = Query(0, description="Количество пропускаемых записей"),
limit: int = Query(100, description="Максимальное количество записей"),
db: Session = Depends(get_db)
):
staff = crud.get_all_personal(db, skip=skip, limit=limit)
total = crud.get_personal_count(db)
return {"staff": staff, "total": total}
@app.get("/personal/{personal_id}", response_model=schemas.PersonalResponse)
def get_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db)
):
personal = crud.get_personal(db, personal_id=personal_id)
if personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
return {"personal": personal}
@app.post("/personal", response_model=schemas.PersonalResponse, status_code=status.HTTP_201_CREATED)
async def create_personal(
personal_data: str = Form(..., description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
try:
# Преобразуем строку JSON в словарь
personal_dict = json.loads(personal_data)
# Загружаем фото, если оно предоставлено
if photo:
photo_path = await save_image(photo)
personal_dict["photo"] = photo_path
# Создаем объект Pydantic для валидации данных
personal = schemas.PersonalCreate(**personal_dict)
# Создаем запись в БД
db_personal = crud.create_personal(db=db, personal=personal)
return {"personal": db_personal}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный формат JSON"
)
@app.put("/personal/{personal_id}", response_model=schemas.PersonalResponse)
async def update_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
personal_data: str = Form(None, description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Проверяем существование сотрудника
existing_personal = crud.get_personal(db, personal_id=personal_id)
if existing_personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
try:
# Преобразуем строку JSON в словарь, если она предоставлена
personal_dict = {}
if personal_data:
personal_dict = json.loads(personal_data)
# Загружаем новую фотографию, если она предоставлена
if photo:
# Удаляем старую фотографию, если есть
if existing_personal.photo:
delete_image(existing_personal.photo)
# Сохраняем новую фотографию
photo_path = await save_image(photo)
personal_dict["photo"] = photo_path
# Создаем объект Pydantic для валидации данных
personal_update = schemas.PersonalUpdate(**personal_dict)
# Обновляем запись в БД
updated_personal = crud.update_personal(db=db, personal_id=personal_id, personal_update=personal_update)
return {"personal": updated_personal}
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Неверный формат JSON"
)
@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Получаем сотрудника перед удалением
personal = crud.get_personal(db, personal_id=personal_id)
if personal is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Сотрудник не найден"
)
# Удаляем фотографию, если есть
if personal.photo:
delete_image(personal.photo)
# Удаляем запись из БД
crud.delete_personal(db=db, personal_id=personal_id)
return None

View File

@ -17,6 +17,15 @@ class PowerRatio(enum.Enum):
ELECTRIC_GREATER = "ЭД > ДВС" ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо" NA = "не применимо"
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
is_super_admin = Column(Boolean, default=False) # Новое поле для главного админа
class Car(Base): class Car(Base):
__tablename__ = "cars" __tablename__ = "cars"
@ -35,3 +44,12 @@ class Car(Base):
electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя electric_motor_power = Column(Integer, nullable=True) # мощность электродвигателя
hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида hybrid_type = Column(Enum(HybridType), default=HybridType.NONE) # тип гибрида
power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности power_ratio = Column(Enum(PowerRatio), default=PowerRatio.NA) # соотношение мощности
class Personal(Base):
__tablename__ = "personal"
id = Column(Integer, primary_key=True, index=True)
name = Column(String, index=True) # ФИО сотрудника
surname = Column(String, index=True) # ФИО сотрудника
role = Column(String) # Должность (произвольная строка)
photo = Column(String, nullable=True) # путь к фотографии

12
password_utils.py Normal file
View File

@ -0,0 +1,12 @@
from passlib.context import CryptContext
# Инициализация контекста для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
"""Проверяет, соответствует ли пароль хешу"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""Создаёт хеш пароля"""
return pwd_context.hash(password)

View File

@ -3,3 +3,5 @@ uvicorn==0.30.0
sqlalchemy==2.0.41 sqlalchemy==2.0.41
pydantic==2.11.7 pydantic==2.11.7
python-multipart==0.0.9 python-multipart==0.0.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4

View File

@ -18,6 +18,30 @@ class PowerRatioEnum(str, Enum):
ELECTRIC_GREATER = "ЭД > ДВС" ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо" NA = "не применимо"
# Схемы для авторизации
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# Схемы для пользователя-админа
class AdminBase(BaseModel):
username: str
class AdminCreate(AdminBase):
password: str
is_super_admin: Optional[bool] = False
class Admin(AdminBase):
id: int
is_active: bool
is_super_admin: bool
class Config:
from_attributes = True
class CarBase(BaseModel): class CarBase(BaseModel):
image: Optional[str] = None image: Optional[str] = None
name: str name: str
@ -65,3 +89,33 @@ class CarResponse(BaseModel):
class CarsResponse(BaseModel): class CarsResponse(BaseModel):
cars: List[Car] cars: List[Car]
total: int total: int
# Персонал ---------
class PersonalBase(BaseModel):
name: str
surname: str
role: str
photo: Optional[str] = None
class PersonalCreate(PersonalBase):
pass
class PersonalUpdate(BaseModel):
name: Optional[str] = None
surname: Optional[str] = None
role: Optional[str] = None
photo: Optional[str] = None
class Personal(PersonalBase):
id: int
class Config:
from_attributes = True
class PersonalResponse(BaseModel):
personal: Personal
class PersonalListResponse(BaseModel):
staff: List[Personal]
total: int