Compare commits

..

5 Commits

Author SHA1 Message Date
9ab99aa7e4 fix main 2025-07-13 17:32:08 +05:00
296f2a91b2 fix conflict 2025-07-13 17:31:48 +05:00
dd623cd1b8 add: admin required to personal endpoints 2025-07-13 17:27:13 +05:00
481612925c add: personal 2025-07-13 17:24:50 +05:00
b6901ddf7b add authorization and admin role 2025-07-13 17:19:52 +05:00
9 changed files with 260 additions and 10 deletions

View File

@ -9,16 +9,24 @@ API сервер для хранения и управления данными
- **schemas.py** - схемы Pydantic для валидации данных - **schemas.py** - схемы Pydantic для валидации данных
- **database.py** - настройка подключения к базе данных - **database.py** - настройка подключения к базе данных
- **crud.py** - функции для CRUD операций - **crud.py** - функции для CRUD операций
- **auth.py** - функции для JWT авторизации
- **create_admin.py** - скрипт для создания первого админа
## Запуск проекта ## Запуск проекта
1. Установите зависимости: 1. Установите зависимости:
```bash ```bash
pip install fastapi uvicorn sqlalchemy pydantic pip install -r requirements.txt
``` ```
2. Запустите сервер: 2. Создайте первого администратора:
```bash
python create_admin.py --username admin --password your_password
```
3. Запустите сервер:
```bash ```bash
uvicorn main:app --reload uvicorn main:app --reload
@ -26,8 +34,27 @@ uvicorn main:app --reload
Сервер будет доступен по адресу http://localhost:8000 Сервер будет доступен по адресу http://localhost:8000
## JWT Авторизация
Для доступа к API необходима JWT авторизация. Вот как она работает:
1. Получите токен доступа, отправив POST-запрос к `/token` с вашими учетными данными:
```bash
curl -X POST http://localhost:8000/token -d "username=admin&password=your_password" -H "Content-Type: application/x-www-form-urlencoded"
```
2. Используйте полученный токен для доступа к защищенным эндпоинтам:
```bash
curl -X GET http://localhost:8000/cars -H "Authorization: Bearer your_token_here"
```
## API эндпоинты ## API эндпоинты
- **POST /token** - получить JWT токен (авторизация)
- **POST /admins** - создать нового администратора (только для админов)
- **GET /users/me** - получить информацию о текущем пользователе
- **GET /cars** - получить список всех автомобилей - **GET /cars** - получить список всех автомобилей
- **GET /cars/{car_id}** - получить информацию о конкретном автомобиле - **GET /cars/{car_id}** - получить информацию о конкретном автомобиле
- **POST /cars** - добавить новый автомобиль - **POST /cars** - добавить новый автомобиль

59
auth.py Normal file
View File

@ -0,0 +1,59 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
import schemas
import models
import crud
from database import get_db
from password_utils import verify_password
# Константы для JWT
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7" # В реальном приложении используйте os.environ.get
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# Инициализация OAuth2PasswordBearer для получения токена из запроса
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def authenticate_admin(db: Session, username: str, password: str):
"""Аутентификация админа"""
admin = crud.get_admin_by_username(db, username)
if not admin:
return False
if not verify_password(password, admin.hashed_password):
return False
return admin
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Создаёт JWT токен"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
async def get_current_admin(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
"""Получает текущего админа из токена"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Некорректные учетные данные",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = schemas.TokenData(username=username)
except JWTError:
raise credentials_exception
admin = crud.get_admin_by_username(db, username=token_data.username)
if admin is None:
raise credentials_exception
return admin

35
create_admin.py Normal file
View File

@ -0,0 +1,35 @@
from password_utils import get_password_hash
from models import Admin, Base
from database import SessionLocal, engine
import argparse
def create_initial_admin(username: str, password: str):
"""
Создает первого администратора в системе
"""
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# Проверяем, существует ли уже админ с таким именем
existing_admin = db.query(Admin).filter(Admin.username == username).first()
if existing_admin:
print(f"Администратор с именем {username} уже существует!")
return
# Создаем нового админа
hashed_password = get_password_hash(password)
db_admin = Admin(username=username, hashed_password=hashed_password)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
print(f"Администратор {username} успешно создан!")
db.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Создание первого администратора")
parser.add_argument("--username", required=True, help="Имя пользователя администратора")
parser.add_argument("--password", required=True, help="Пароль администратора")
args = parser.parse_args()
create_initial_admin(args.username, args.password)

16
crud.py
View File

@ -1,7 +1,19 @@
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from models import Car, EngineType, HybridType, PowerRatio, Personal from models import Car, EngineType, HybridType, PowerRatio, Personal, Admin
import schemas import schemas
from typing import List, Optional from typing import List, Optional
from password_utils import get_password_hash
def get_admin_by_username(db: Session, username: str) -> Optional[Admin]:
return db.query(Admin).filter(Admin.username == username).first()
def create_admin(db: Session, admin: schemas.AdminCreate) -> Admin:
hashed_password = get_password_hash(admin.password)
db_admin = Admin(username=admin.username, hashed_password=hashed_password)
db.add(db_admin)
db.commit()
db.refresh(db_admin)
return db_admin
def get_car(db: Session, car_id: int) -> Optional[Car]: def get_car(db: Session, car_id: int) -> Optional[Car]:
return db.query(Car).filter(Car.id == car_id).first() return db.query(Car).filter(Car.id == car_id).first()
@ -118,4 +130,4 @@ def delete_personal(db: Session, personal_id: int) -> bool:
db.delete(db_personal) db.delete(db_personal)
db.commit() db.commit()
return True return True

85
main.py
View File

@ -1,5 +1,6 @@
from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form from fastapi import FastAPI, Depends, HTTPException, status, Query, Path, UploadFile, File, Form
from fastapi.staticfiles import StaticFiles from fastapi.staticfiles import StaticFiles
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
import crud import crud
import models import models
@ -9,6 +10,8 @@ from typing import List, Optional
import uvicorn import uvicorn
from utils import save_image, delete_image from utils import save_image, delete_image
import json import json
from datetime import timedelta
import auth
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
# Создание таблиц в БД # Создание таблиц в БД
@ -27,6 +30,60 @@ app.add_middleware(
# Добавляем обработку статических файлов # Добавляем обработку статических файлов
app.mount("/static", StaticFiles(directory="static"), name="static") app.mount("/static", StaticFiles(directory="static"), name="static")
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)):
# Проверка, что создать админа может только существующий админ
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
# Эндпоинты для авторизации
@app.post("/token", response_model=schemas.Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
admin = auth.authenticate_admin(db, form_data.username, form_data.password)
if not admin:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Неверное имя пользователя или пароль",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=auth.ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = auth.create_access_token(
data={"sub": admin.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@app.post("/admins", response_model=schemas.Admin, status_code=status.HTTP_201_CREATED)
def create_admin(admin: schemas.AdminCreate, db: Session = Depends(get_db), current_admin: models.Admin = Depends(auth.get_current_admin)):
# Проверка, что создать админа может только существующий админ
db_admin = crud.get_admin_by_username(db, username=admin.username)
if db_admin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Пользователь с таким именем уже существует"
)
return crud.create_admin(db=db, admin=admin)
@app.get("/") @app.get("/")
def read_root(): def read_root():
return {"message": "AutoBro API"} return {"message": "AutoBro API"}
@ -58,7 +115,8 @@ def get_car(
async def create_car( async def create_car(
car_data: str = Form(..., description="Данные автомобиля в JSON формате"), car_data: str = Form(..., description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"), image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
try: try:
# Преобразуем строку JSON в словарь # Преобразуем строку JSON в словарь
@ -91,7 +149,8 @@ async def update_car(
car_id: int = Path(..., description="ID автомобиля", gt=0), car_id: int = Path(..., description="ID автомобиля", gt=0),
car_data: str = Form(None, description="Данные автомобиля в JSON формате"), car_data: str = Form(None, description="Данные автомобиля в JSON формате"),
image: UploadFile = File(None, description="Изображение автомобиля"), image: UploadFile = File(None, description="Изображение автомобиля"),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
# Проверяем существование автомобиля # Проверяем существование автомобиля
existing_car = crud.get_car(db, car_id=car_id) existing_car = crud.get_car(db, car_id=car_id)
@ -137,7 +196,8 @@ async def update_car(
@app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT) @app.delete("/cars/{car_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_car( def delete_car(
car_id: int = Path(..., description="ID автомобиля", gt=0), car_id: int = Path(..., description="ID автомобиля", gt=0),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
# Получаем автомобиль перед удалением # Получаем автомобиль перед удалением
car = crud.get_car(db, car_id=car_id) car = crud.get_car(db, car_id=car_id)
@ -155,6 +215,16 @@ def delete_car(
crud.delete_car(db=db, car_id=car_id) crud.delete_car(db=db, car_id=car_id)
return None return None
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
# Эндпоинт для проверки текущего пользователя (только для отладки)
@app.get("/users/me", response_model=schemas.Admin)
def read_users_me(current_admin: models.Admin = Depends(auth.get_current_admin)):
return current_admin
if __name__ == "__main__": if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=3000) uvicorn.run(app, host="0.0.0.0", port=3000)
@ -187,7 +257,8 @@ def get_personal(
async def create_personal( async def create_personal(
personal_data: str = Form(..., description="Данные сотрудника в JSON формате"), personal_data: str = Form(..., description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"), photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
try: try:
# Преобразуем строку JSON в словарь # Преобразуем строку JSON в словарь
@ -220,7 +291,8 @@ async def update_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0), personal_id: int = Path(..., description="ID сотрудника", gt=0),
personal_data: str = Form(None, description="Данные сотрудника в JSON формате"), personal_data: str = Form(None, description="Данные сотрудника в JSON формате"),
photo: UploadFile = File(None, description="Фотография сотрудника"), photo: UploadFile = File(None, description="Фотография сотрудника"),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
# Проверяем существование сотрудника # Проверяем существование сотрудника
existing_personal = crud.get_personal(db, personal_id=personal_id) existing_personal = crud.get_personal(db, personal_id=personal_id)
@ -266,7 +338,8 @@ async def update_personal(
@app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT) @app.delete("/personal/{personal_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_personal( def delete_personal(
personal_id: int = Path(..., description="ID сотрудника", gt=0), personal_id: int = Path(..., description="ID сотрудника", gt=0),
db: Session = Depends(get_db) db: Session = Depends(get_db),
current_admin: models.Admin = Depends(auth.get_current_admin)
): ):
# Получаем сотрудника перед удалением # Получаем сотрудника перед удалением
personal = crud.get_personal(db, personal_id=personal_id) personal = crud.get_personal(db, personal_id=personal_id)

View File

@ -17,6 +17,14 @@ class PowerRatio(enum.Enum):
ELECTRIC_GREATER = "ЭД > ДВС" ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо" NA = "не применимо"
class Admin(Base):
__tablename__ = "admins"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True)
hashed_password = Column(String)
is_active = Column(Boolean, default=True)
class Car(Base): class Car(Base):
__tablename__ = "cars" __tablename__ = "cars"

12
password_utils.py Normal file
View File

@ -0,0 +1,12 @@
from passlib.context import CryptContext
# Инициализация контекста для хеширования паролей
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
"""Проверяет, соответствует ли пароль хешу"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password):
"""Создаёт хеш пароля"""
return pwd_context.hash(password)

View File

@ -3,3 +3,5 @@ uvicorn==0.30.0
sqlalchemy==2.0.41 sqlalchemy==2.0.41
pydantic==2.11.7 pydantic==2.11.7
python-multipart==0.0.9 python-multipart==0.0.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4

View File

@ -18,6 +18,28 @@ class PowerRatioEnum(str, Enum):
ELECTRIC_GREATER = "ЭД > ДВС" ELECTRIC_GREATER = "ЭД > ДВС"
NA = "не применимо" NA = "не применимо"
# Схемы для авторизации
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
# Схемы для пользователя-админа
class AdminBase(BaseModel):
username: str
class AdminCreate(AdminBase):
password: str
class Admin(AdminBase):
id: int
is_active: bool
class Config:
from_attributes = True
class CarBase(BaseModel): class CarBase(BaseModel):
image: Optional[str] = None image: Optional[str] = None
name: str name: str