- Introduced `DependenciesMiddleware` and `BlacklistMiddleware` for enhanced request handling across all routers. - Refactored admin handlers to utilize new middleware, improving access control and error handling. - Updated the `admin_router` to include middleware for access checks and streamlined the process of banning users. - Enhanced the structure of admin handler imports for better organization and maintainability. - Improved error handling in various admin functions to ensure robust user interactions.
147 lines
6.4 KiB
Python
147 lines
6.4 KiB
Python
from typing import List, Optional
|
|
from datetime import datetime
|
|
|
|
from helper_bot.utils.helper_func import add_days_to_date, get_banned_users_buttons, get_banned_users_list
|
|
from helper_bot.handlers.admin.exceptions import UserAlreadyBannedError, InvalidInputError
|
|
from logs.custom_logger import logger
|
|
|
|
|
|
class User:
|
|
"""Модель пользователя"""
|
|
def __init__(self, user_id: int, username: str, full_name: str):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.full_name = full_name
|
|
|
|
|
|
class BannedUser:
|
|
"""Модель заблокированного пользователя"""
|
|
def __init__(self, user_id: int, username: str, reason: str, unban_date: Optional[datetime]):
|
|
self.user_id = user_id
|
|
self.username = username
|
|
self.reason = reason
|
|
self.unban_date = unban_date
|
|
|
|
|
|
class AdminService:
|
|
"""Сервис для административных операций"""
|
|
|
|
def __init__(self, bot_db):
|
|
self.bot_db = bot_db
|
|
|
|
def get_last_users(self) -> List[User]:
|
|
"""Получить список последних пользователей"""
|
|
try:
|
|
users_data = self.bot_db.get_last_users_from_db()
|
|
return [
|
|
User(
|
|
user_id=user[1],
|
|
username='Неизвестно',
|
|
full_name=user[0]
|
|
)
|
|
for user in users_data
|
|
]
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при получении списка последних пользователей: {e}")
|
|
raise
|
|
|
|
def get_banned_users(self) -> List[BannedUser]:
|
|
"""Получить список заблокированных пользователей"""
|
|
try:
|
|
banned_users_data = self.bot_db.get_banned_users_from_db()
|
|
return [
|
|
BannedUser(
|
|
user_id=user[1], # user_id
|
|
username=user[0], # user_name
|
|
reason=user[2], # message_for_user
|
|
unban_date=user[3] # date_to_unban
|
|
)
|
|
for user in banned_users_data
|
|
]
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при получении списка заблокированных пользователей: {e}")
|
|
raise
|
|
|
|
def get_user_by_username(self, username: str) -> Optional[User]:
|
|
"""Получить пользователя по username"""
|
|
try:
|
|
user_id = self.bot_db.get_user_id_by_username(username)
|
|
if not user_id:
|
|
return None
|
|
|
|
full_name = self.bot_db.get_full_name_by_id(user_id)
|
|
return User(
|
|
user_id=user_id,
|
|
username=username,
|
|
full_name=full_name or 'Неизвестно'
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при поиске пользователя по username {username}: {e}")
|
|
raise
|
|
|
|
def get_user_by_id(self, user_id: int) -> Optional[User]:
|
|
"""Получить пользователя по ID"""
|
|
try:
|
|
user_info = self.bot_db.get_user_info_by_id(user_id)
|
|
if not user_info:
|
|
return None
|
|
|
|
return User(
|
|
user_id=user_id,
|
|
username=user_info.get('username', 'Неизвестно'),
|
|
full_name=user_info.get('full_name', 'Неизвестно')
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при поиске пользователя по ID {user_id}: {e}")
|
|
raise
|
|
|
|
def ban_user(self, user_id: int, username: str, reason: str, ban_days: Optional[int]) -> None:
|
|
"""Заблокировать пользователя"""
|
|
try:
|
|
# Проверяем, не заблокирован ли уже пользователь
|
|
if self.bot_db.check_user_in_blacklist(user_id):
|
|
raise UserAlreadyBannedError(f"Пользователь {user_id} уже заблокирован")
|
|
|
|
# Рассчитываем дату разблокировки
|
|
date_to_unban = None
|
|
if ban_days is not None:
|
|
date_to_unban = add_days_to_date(ban_days)
|
|
|
|
# Сохраняем в БД
|
|
self.bot_db.set_user_blacklist(user_id, username, reason, date_to_unban)
|
|
|
|
logger.info(f"Пользователь {user_id} ({username}) заблокирован. Причина: {reason}, срок: {ban_days} дней")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при блокировке пользователя {user_id}: {e}")
|
|
raise
|
|
|
|
def unban_user(self, user_id: int) -> None:
|
|
"""Разблокировать пользователя"""
|
|
try:
|
|
self.bot_db.delete_user_blacklist(user_id)
|
|
logger.info(f"Пользователь {user_id} разблокирован")
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при разблокировке пользователя {user_id}: {e}")
|
|
raise
|
|
|
|
def validate_user_input(self, input_text: str) -> int:
|
|
"""Валидация введенного ID пользователя"""
|
|
try:
|
|
user_id = int(input_text.strip())
|
|
if user_id <= 0:
|
|
raise InvalidInputError("ID пользователя должен быть положительным числом")
|
|
return user_id
|
|
except ValueError:
|
|
raise InvalidInputError("ID пользователя должен быть числом")
|
|
|
|
def get_banned_users_for_display(self, page: int = 0) -> tuple[str, list]:
|
|
"""Получить данные заблокированных пользователей для отображения"""
|
|
try:
|
|
message_text = get_banned_users_list(page, self.bot_db)
|
|
buttons_list = get_banned_users_buttons(self.bot_db)
|
|
return message_text, buttons_list
|
|
except Exception as e:
|
|
logger.error(f"Ошибка при получении данных заблокированных пользователей: {e}")
|
|
raise
|