Compare commits

...

5 Commits

Author SHA1 Message Date
9ab99aa7e4 fix main 2025-07-13 17:32:08 +05:00
296f2a91b2 fix conflict 2025-07-13 17:31:48 +05:00
dd623cd1b8 add: admin required to personal endpoints 2025-07-13 17:27:13 +05:00
481612925c add: personal 2025-07-13 17:24:50 +05:00
b6901ddf7b add authorization and admin role 2025-07-13 17:19:52 +05:00
9 changed files with 260 additions and 10 deletions

View File

@ -9,16 +9,24 @@ API сервер для хранения и управления данными
- **schemas.py** - схемы Pydantic для валидации данных
- **database.py** - настройка подключения к базе данных
- **crud.py** - функции для CRUD операций
- **auth.py** - функции для JWT авторизации
- **create_admin.py** - скрипт для создания первого админа
## Запуск проекта
1. Установите зависимости:
```bash
pip install fastapi uvicorn sqlalchemy pydantic
pip install -r requirements.txt
```
2. Запустите сервер:
2. Создайте первого администратора:
```bash
python create_admin.py --username admin --password your_password
```
3. Запустите сервер:
```bash
uvicorn main:app --reload
@ -26,8 +34,27 @@ uvicorn main:app --reload
Сервер будет доступен по адресу http://localhost:8000
## JWT Авторизация
Для доступа к API необходима JWT авторизация. Вот как она работает:
1. Получите токен доступа, отправив POST-запрос к `/token` с вашими учетными данными:
```bash
curl -X POST http://localhost:8000/token -d "username=admin&password=your_password" -H "Content-Type: application/x-www-form-urlencoded"
```
2. Используйте полученный токен для доступа к защищенным эндпоинтам:
```bash
curl -X GET http://localhost:8000/cars -H "Authorization: Bearer your_token_here"
```
## API эндпоинты
- **POST /token** - получить JWT токен (авторизация)
- **POST /admins** - создать нового администратора (только для админов)
- **GET /users/me** - получить информацию о текущем пользователе
- **GET /cars** - получить список всех автомобилей
- **GET /cars/{car_id}** - получить информацию о конкретном автомобиле
- **POST /cars** - добавить новый автомобиль

59
auth.py Normal file
View File

@ -0,0 +1,59 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
import schemas
import models
import crud
from database import get_db
from password_utils import verify_password
# Константы для JWT
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Инициализация OAuth2PasswordBearer для получения токена из запроса
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def authenticate_admin(db: Session, username: str, password: str):
"""Аутентификация админа"""
admin = crud.get_admin_by_username(db, username)
if not admin:
return False
if not verify_password(password, admin.hashed_password):
return False
return admin
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создаёт JWT токен"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
"""Получает текущего админа из токена"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Некорректные учетные данные",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = schemas.TokenData(username=username)
except JWTError:
raise credentials_exception
admin = crud.get_admin_by_username(db, username=token_data.username)
if admin is None:
raise credentials_exception
return admin

35
create_admin.py Normal file
View File

@ -0,0 +1,35 @@
from password_utils import get_password_hash
from models import Admin, Base
from database import SessionLocal, engine
import argparse
def create_initial_admin(username: str, password: str):
"""
Создает первого администратора в системе
"""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# Проверяем, существует ли уже админ с таким именем
existing_admin = db.query(Admin).filter(Admin.username == username).first()
if existing_admin:
print(f"Администратор с именем {username} уже существует!")
return
# Создаем нового админа
hashed_password = get_password_hash(password)
db_admin = Admin(username=username, hashed_password=hashed_password)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
print(f"Администратор {username} успешно создан!")
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Создание первого администратора")
parser.add_argument("--username", required=True, help="Имя пользователя администратора")
parser.add_argument("--password", required=True, help="Пароль администратора")
args = parser.parse_args()
create_initial_admin(args.username, args.password)

14
crud.py
View File

@ -1,7 +1,19 @@
from sqlalchemy.orm import Session
from models import Car, EngineType, HybridType, PowerRatio, Personal
from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin
import schemas
from typing import List, Optional
from password_utils import get_password_hash
def get_admin_by_username(db: Session, username: str) -> Optional[Admin]:
return db.query(Admin).filter(Admin.username == username).first()
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
hashed_password = get_password_hash(admin.password)
db_admin = Admin(username=admin.username, hashed_password=hashed_password)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
return db_admin
def get_car(db: Session, car_id: int) -> Optional[Car]:
return db.query(Car).filter(Car.id == car_id).first()

85
main.py
View File

@ -1,5 +1,6 @@
from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form
from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
import crud
import models
@ -9,6 +10,8 @@ from typing import List, Optional
import uvicorn
from utils import save_image, delete_image
import json
from datetime import timedelta
import auth
from fastapi.middleware.cors import CORSMiddleware
# Создание таблиц в БД
@ -27,6 +30,60 @@ app.add_middleware(
# Добавляем обработку статических файлов
app.mount("/static", StaticFiles(directory="static"), name="static")
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)):
# Проверка, что создать админа может только существующий админ
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)):
# Проверка, что создать админа может только существующий админ
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
@app.get("/")
def read_root():
return {"message": "AutoBro API"}
@ -58,7 +115,8 @@ def get_car(
async def create_car(
car_data: str = Form(..., description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
try:
# Преобразуем строку JSON в словарь
@ -91,7 +149,8 @@ async def update_car(
car_id: int = Path(..., description="ID автомобиля", gt=0),
car_data: str = Form(None, description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Проверяем существование автомобиля
existing_car = crud.get_car(db, car_id=car_id)
@ -137,7 +196,8 @@ async def update_car(
@app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_car(
car_id: int = Path(..., description="ID автомобиля", gt=0),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Получаем автомобиль перед удалением
car = crud.get_car(db, car_id=car_id)
@ -155,6 +215,16 @@ def delete_car(
crud.delete_car(db=db, car_id=car_id)
return None
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3000)
@ -187,7 +257,8 @@ def get_personal(
async def create_personal(
personal_data: str = Form(..., description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
try:
# Преобразуем строку JSON в словарь
@ -220,7 +291,8 @@ async def update_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
personal_data: str = Form(None, description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Проверяем существование сотрудника
existing_personal = crud.get_personal(db, personal_id=personal_id)
@ -266,7 +338,8 @@ async def update_personal(
@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db)
db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
):
# Получаем сотрудника перед удалением
personal = crud.get_personal(db, personal_id=personal_id)

View File

@ -17,6 +17,14 @@ class PowerRatio(enum.Enum):
ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо"
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
class Car(Base):
__tablename__ = "cars"

12
password_utils.py Normal file
View File

@ -0,0 +1,12 @@
from passlib.context import CryptContext
# Инициализация контекста для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
"""Проверяет, соответствует ли пароль хешу"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""Создаёт хеш пароля"""
return pwd_context.hash(password)

View File

@ -3,3 +3,5 @@ uvicorn==0.30.0
sqlalchemy==2.0.41
pydantic==2.11.7
python-multipart==0.0.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4

View File

@ -18,6 +18,28 @@ class PowerRatioEnum(str, Enum):
ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо"
# Схемы для авторизации
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# Схемы для пользователя-админа
class AdminBase(BaseModel):
username: str
class AdminCreate(AdminBase):
password: str
class Admin(AdminBase):
id: int
is_active: bool
class Config:
from_attributes = True
class CarBase(BaseModel):
image: Optional[str] = None
name: str