Files
popa_minecraft_launcher_api/app/services/bonus.py
2025-12-07 20:08:49 +05:00

300 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid
from datetime import datetime, timedelta
from fastapi import HTTPException
from app.db.database import db
from app.services.coins import CoinsService
from app.models.bonus import BonusType
# Коллекции для бонусов
bonus_types_collection = db.bonus_types
user_bonuses_collection = db.user_bonuses
class BonusService:
async def create_bonus_type(self, bonus_data):
"""Создание нового типа бонуса"""
bonus_id = str(uuid.uuid4())
bonus = {
"id": bonus_id,
"name": bonus_data.name,
"description": bonus_data.description,
"effect_type": bonus_data.effect_type,
"base_effect_value": bonus_data.base_effect_value,
"effect_increment": bonus_data.effect_increment,
"price": bonus_data.price,
"upgrade_price": bonus_data.upgrade_price,
"duration": bonus_data.duration,
"max_level": bonus_data.max_level,
"image_url": bonus_data.image_url,
}
# Проверка на дубликат имени
existing = await bonus_types_collection.find_one({"name": bonus_data.name})
if existing:
raise HTTPException(status_code=400, detail="Бонус с таким именем уже существует")
await bonus_types_collection.insert_one(bonus)
return {
"status": "success",
"message": "Тип бонуса успешно создан",
"bonus_id": bonus_id
}
async def get_user_active_effects(self, username: str):
"""Получить активные эффекты пользователя для плагина"""
from app.db.database import users_collection
user = await users_collection.find_one({"username": username})
if not user:
return {"effects": []}
# Находим активные бонусы с учетом бесконечных (expires_at = null) или действующих
active_bonuses = await user_bonuses_collection.find({
"user_id": str(user["_id"]),
"is_active": True,
}).to_list(50)
effects = []
for bonus in active_bonuses:
bonus_type = await bonus_types_collection.find_one({"id": bonus["bonus_type_id"]})
if bonus_type:
# Рассчитываем итоговое значение эффекта с учетом уровня
level = bonus.get("level", 1)
effect_value = bonus_type["base_effect_value"] + (level - 1) * bonus_type["effect_increment"]
effect = {
"effect_type": bonus_type["effect_type"],
"effect_value": effect_value
}
# Для временных бонусов добавляем срок
if bonus.get("expires_at"):
effect["expires_at"] = bonus["expires_at"].isoformat()
effects.append(effect)
return {"effects": effects}
async def list_available_bonuses(self):
"""Получить список доступных типов бонусов"""
bonuses = await bonus_types_collection.find().to_list(50)
return {"bonuses": [BonusType(**bonus) for bonus in bonuses]}
async def get_user_bonuses(self, username: str):
"""Получить бонусы пользователя (активные и неактивные)"""
from app.db.database import users_collection
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
user_bonuses = await user_bonuses_collection.find({
"user_id": str(user["_id"]),
}).to_list(50)
result = []
for bonus in user_bonuses:
bonus_type = await bonus_types_collection.find_one({"id": bonus["bonus_type_id"]})
if bonus_type:
level = bonus.get("level", 1)
effect_value = bonus_type["base_effect_value"] + (level - 1) * bonus_type["effect_increment"]
bonus_data = {
"id": bonus["id"],
"bonus_type_id": bonus["bonus_type_id"],
"name": bonus_type["name"],
"description": bonus_type["description"],
"effect_type": bonus_type["effect_type"],
"effect_value": effect_value,
"level": level,
"purchased_at": bonus["purchased_at"].isoformat(),
"can_upgrade": bonus_type["max_level"] == 0 or level < bonus_type["max_level"],
"upgrade_price": bonus_type["upgrade_price"],
"image_url": bonus_type.get("image_url"),
"is_active": bonus.get("is_active", True),
}
if bonus.get("expires_at"):
bonus_data["expires_at"] = bonus["expires_at"].isoformat()
bonus_data["time_left"] = (bonus["expires_at"] - datetime.utcnow()).total_seconds()
bonus_data["is_permanent"] = False
else:
bonus_data["is_permanent"] = True
result.append(bonus_data)
return {"bonuses": result}
async def toggle_bonus_activation(self, username: str, bonus_id: str):
"""Переключить активность бонуса у пользователя"""
from app.db.database import users_collection
# Находим пользователя
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Находим бонус пользователя
user_bonus = await user_bonuses_collection.find_one({
"id": bonus_id,
"user_id": str(user["_id"]),
})
if not user_bonus:
raise HTTPException(status_code=404, detail="Бонус не найден или не принадлежит вам")
# Проверяем, не истек ли бонус при попытке включить
if user_bonus.get("expires_at") and user_bonus["expires_at"] < datetime.utcnow():
# На всякий случай зафиксируем в БД, что он не активен
await user_bonuses_collection.update_one(
{"id": bonus_id},
{"$set": {"is_active": False}}
)
raise HTTPException(status_code=400, detail="Срок действия бонуса истёк и он не может быть активирован")
new_status = not user_bonus.get("is_active", False)
await user_bonuses_collection.update_one(
{"id": bonus_id},
{"$set": {"is_active": new_status}}
)
return {
"status": "success",
"message": "Активность бонуса переключена",
"bonus_id": bonus_id,
"is_active": new_status,
}
async def purchase_bonus(self, username: str, bonus_type_id: str):
"""Покупка базового бонуса пользователем"""
from app.db.database import users_collection
# Находим пользователя
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Находим тип бонуса
bonus_type = await bonus_types_collection.find_one({"id": bonus_type_id})
if not bonus_type:
raise HTTPException(status_code=404, detail="Бонус не найден")
# Проверяем, есть ли уже такой бонус у пользователя
existing_bonus = await user_bonuses_collection.find_one({
"user_id": str(user["_id"]),
"bonus_type_id": bonus_type_id,
"is_active": True
})
if existing_bonus:
raise HTTPException(status_code=400, detail="Этот бонус уже приобретен. Вы можете улучшить его.")
# Проверяем достаточно ли монет
coins_service = CoinsService()
user_coins = await coins_service.get_balance(username)
if user_coins < bonus_type["price"]:
raise HTTPException(status_code=400,
detail=f"Недостаточно монет. Требуется: {bonus_type['price']}, имеется: {user_coins}")
# Создаем запись о бонусе для пользователя
bonus_id = str(uuid.uuid4())
now = datetime.utcnow()
# Если бонус имеет длительность
expires_at = None
if bonus_type["duration"] > 0:
expires_at = now + timedelta(seconds=bonus_type["duration"])
user_bonus = {
"id": bonus_id,
"user_id": str(user["_id"]),
"username": username,
"bonus_type_id": bonus_type_id,
"level": 1, # Начальный уровень
"purchased_at": now,
"expires_at": expires_at,
"is_active": True
}
# Сохраняем бонус в БД
await user_bonuses_collection.insert_one(user_bonus)
# Списываем монеты
await coins_service.decrease_balance(username, bonus_type["price"])
# Формируем текст сообщения
duration_text = "навсегда" if bonus_type["duration"] == 0 else f"на {bonus_type['duration'] // 60} мин."
message = f"Бонус '{bonus_type['name']}' успешно приобретен {duration_text}"
return {
"status": "success",
"message": message,
"remaining_coins": user_coins - bonus_type["price"]
}
async def upgrade_bonus(self, username: str, bonus_id: str):
"""Улучшение уже купленного бонуса"""
from app.db.database import users_collection
# Находим пользователя
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Находим бонус пользователя
user_bonus = await user_bonuses_collection.find_one({
"id": bonus_id,
"user_id": str(user["_id"]),
"is_active": True
})
if not user_bonus:
raise HTTPException(status_code=404, detail="Бонус не найден или не принадлежит вам")
# Находим тип бонуса
bonus_type = await bonus_types_collection.find_one({"id": user_bonus["bonus_type_id"]})
if not bonus_type:
raise HTTPException(status_code=404, detail="Тип бонуса не найден")
# Проверяем ограничение на максимальный уровень
current_level = user_bonus["level"]
if bonus_type["max_level"] > 0 and current_level >= bonus_type["max_level"]:
raise HTTPException(status_code=400, detail="Достигнут максимальный уровень бонуса")
# Рассчитываем стоимость улучшения
upgrade_price = bonus_type["upgrade_price"]
# Проверяем достаточно ли монет
coins_service = CoinsService()
user_coins = await coins_service.get_balance(username)
if user_coins < upgrade_price:
raise HTTPException(status_code=400,
detail=f"Недостаточно монет. Требуется: {upgrade_price}, имеется: {user_coins}")
# Обновляем уровень бонуса
new_level = current_level + 1
await user_bonuses_collection.update_one(
{"id": bonus_id},
{"$set": {"level": new_level}}
)
# Списываем монеты
await coins_service.decrease_balance(username, upgrade_price)
# Рассчитываем новое значение эффекта
new_effect_value = bonus_type["base_effect_value"] + (new_level - 1) * bonus_type["effect_increment"]
return {
"status": "success",
"message": f"Бонус '{bonus_type['name']}' улучшен до уровня {new_level}",
"new_level": new_level,
"effect_value": new_effect_value,
"remaining_coins": user_coins - upgrade_price
}