Files
popa_minecraft_launcher_api/app/services/bonus.py
2025-07-31 07:00:07 +05:00

227 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid
from datetime import datetime, timedelta
from fastapi import HTTPException
from app.db.database import db
from app.services.coins import CoinsService
from app.models.bonus import BonusType
# Коллекции для бонусов
bonus_types_collection = db.bonus_types
user_bonuses_collection = db.user_bonuses
class BonusService:
async def get_user_active_effects(self, username: str):
"""Получить активные эффекты пользователя для плагина"""
from app.db.database import users_collection
user = await users_collection.find_one({"username": username})
if not user:
return {"effects": []}
# Находим активные бонусы с учетом бесконечных (expires_at = null) или действующих
active_bonuses = await user_bonuses_collection.find({
"user_id": str(user["_id"]),
"is_active": True,
}).to_list(50)
effects = []
for bonus in active_bonuses:
bonus_type = await bonus_types_collection.find_one({"id": bonus["bonus_type_id"]})
if bonus_type:
# Рассчитываем итоговое значение эффекта с учетом уровня
level = bonus.get("level", 1)
effect_value = bonus_type["base_effect_value"] + (level - 1) * bonus_type["effect_increment"]
effect = {
"effect_type": bonus_type["effect_type"],
"effect_value": effect_value
}
# Для временных бонусов добавляем срок
if bonus.get("expires_at"):
effect["expires_at"] = bonus["expires_at"].isoformat()
effects.append(effect)
return {"effects": effects}
async def list_available_bonuses(self):
"""Получить список доступных типов бонусов"""
bonuses = await bonus_types_collection.find().to_list(50)
return {"bonuses": [BonusType(**bonus) for bonus in bonuses]}
async def get_user_bonuses(self, username: str):
"""Получить активные бонусы пользователя"""
from app.db.database import users_collection
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Находим активные бонусы с учетом бесконечных (expires_at = null) или действующих
active_bonuses = await user_bonuses_collection.find({
"user_id": str(user["_id"]),
"is_active": True,
}).to_list(50)
result = []
for bonus in active_bonuses:
bonus_type = await bonus_types_collection.find_one({"id": bonus["bonus_type_id"]})
if bonus_type:
# Рассчитываем итоговое значение эффекта с учетом уровня
level = bonus.get("level", 1)
effect_value = bonus_type["base_effect_value"] + (level - 1) * bonus_type["effect_increment"]
bonus_data = {
"id": bonus["id"],
"name": bonus_type["name"],
"description": bonus_type["description"],
"effect_type": bonus_type["effect_type"],
"effect_value": effect_value,
"level": level,
"purchased_at": bonus["purchased_at"].isoformat(),
"can_upgrade": bonus_type["max_level"] == 0 or level < bonus_type["max_level"],
"upgrade_price": bonus_type["upgrade_price"]
}
# Для временных бонусов добавляем срок
if bonus.get("expires_at"):
bonus_data["expires_at"] = bonus["expires_at"].isoformat()
bonus_data["time_left"] = (bonus["expires_at"] - datetime.utcnow()).total_seconds()
else:
bonus_data["is_permanent"] = True
result.append(bonus_data)
return {"bonuses": result}
async def purchase_bonus(self, username: str, bonus_type_id: str):
"""Покупка базового бонуса пользователем"""
from app.db.database import users_collection
# Находим пользователя
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Находим тип бонуса
bonus_type = await bonus_types_collection.find_one({"id": bonus_type_id})
if not bonus_type:
raise HTTPException(status_code=404, detail="Бонус не найден")
# Проверяем, есть ли уже такой бонус у пользователя
existing_bonus = await user_bonuses_collection.find_one({
"user_id": str(user["_id"]),
"bonus_type_id": bonus_type_id,
"is_active": True
})
if existing_bonus:
raise HTTPException(status_code=400, detail="Этот бонус уже приобретен. Вы можете улучшить его.")
# Проверяем достаточно ли монет
coins_service = CoinsService()
user_coins = await coins_service.get_balance(username)
if user_coins < bonus_type["price"]:
raise HTTPException(status_code=400,
detail=f"Недостаточно монет. Требуется: {bonus_type['price']}, имеется: {user_coins}")
# Создаем запись о бонусе для пользователя
bonus_id = str(uuid.uuid4())
now = datetime.utcnow()
# Если бонус имеет длительность
expires_at = None
if bonus_type["duration"] > 0:
expires_at = now + timedelta(seconds=bonus_type["duration"])
user_bonus = {
"id": bonus_id,
"user_id": str(user["_id"]),
"username": username,
"bonus_type_id": bonus_type_id,
"level": 1, # Начальный уровень
"purchased_at": now,
"expires_at": expires_at,
"is_active": True
}
# Сохраняем бонус в БД
await user_bonuses_collection.insert_one(user_bonus)
# Списываем монеты
await coins_service.decrease_balance(username, bonus_type["price"])
# Формируем текст сообщения
duration_text = "навсегда" if bonus_type["duration"] == 0 else f"на {bonus_type['duration'] // 60} мин."
message = f"Бонус '{bonus_type['name']}' успешно приобретен {duration_text}"
return {
"status": "success",
"message": message,
"remaining_coins": user_coins - bonus_type["price"]
}
async def upgrade_bonus(self, username: str, bonus_id: str):
"""Улучшение уже купленного бонуса"""
from app.db.database import users_collection
# Находим пользователя
user = await users_collection.find_one({"username": username})
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Находим бонус пользователя
user_bonus = await user_bonuses_collection.find_one({
"id": bonus_id,
"user_id": str(user["_id"]),
"is_active": True
})
if not user_bonus:
raise HTTPException(status_code=404, detail="Бонус не найден или не принадлежит вам")
# Находим тип бонуса
bonus_type = await bonus_types_collection.find_one({"id": user_bonus["bonus_type_id"]})
if not bonus_type:
raise HTTPException(status_code=404, detail="Тип бонуса не найден")
# Проверяем ограничение на максимальный уровень
current_level = user_bonus["level"]
if bonus_type["max_level"] > 0 and current_level >= bonus_type["max_level"]:
raise HTTPException(status_code=400, detail="Достигнут максимальный уровень бонуса")
# Рассчитываем стоимость улучшения
upgrade_price = bonus_type["upgrade_price"]
# Проверяем достаточно ли монет
coins_service = CoinsService()
user_coins = await coins_service.get_balance(username)
if user_coins < upgrade_price:
raise HTTPException(status_code=400,
detail=f"Недостаточно монет. Требуется: {upgrade_price}, имеется: {user_coins}")
# Обновляем уровень бонуса
new_level = current_level + 1
await user_bonuses_collection.update_one(
{"id": bonus_id},
{"$set": {"level": new_level}}
)
# Списываем монеты
await coins_service.decrease_balance(username, upgrade_price)
# Рассчитываем новое значение эффекта
new_effect_value = bonus_type["base_effect_value"] + (new_level - 1) * bonus_type["effect_increment"]
return {
"status": "success",
"message": f"Бонус '{bonus_type['name']}' улучшен до уровня {new_level}",
"new_level": new_level,
"effect_value": new_effect_value,
"remaining_coins": user_coins - upgrade_price
}