Add middleware and refactor admin handlers for improved functionality

- Introduced `DependenciesMiddleware` and `BlacklistMiddleware` for enhanced request handling across all routers.
- Refactored admin handlers to utilize new middleware, improving access control and error handling.
- Updated the `admin_router` to include middleware for access checks and streamlined the process of banning users.
- Enhanced the structure of admin handler imports for better organization and maintainability.
- Improved error handling in various admin functions to ensure robust user interactions.
This commit is contained in:
2025-08-28 23:54:17 +03:00
parent f75e7f82c9
commit 8cee629e28
32 changed files with 1922 additions and 1574 deletions

View File

@@ -1 +1,37 @@
from .admin_handlers import admin_router
from .admin_handlers import admin_router
from .dependencies import AdminAccessMiddleware, BotDB, Settings
from .services import AdminService, User, BannedUser
from .exceptions import (
AdminError,
AdminAccessDeniedError,
UserNotFoundError,
InvalidInputError,
UserAlreadyBannedError
)
from .utils import (
return_to_admin_menu,
handle_admin_error,
format_user_info,
format_ban_confirmation,
escape_html
)
__all__ = [
'admin_router',
'AdminAccessMiddleware',
'BotDB',
'Settings',
'AdminService',
'User',
'BannedUser',
'AdminError',
'AdminAccessDeniedError',
'UserNotFoundError',
'InvalidInputError',
'UserAlreadyBannedError',
'return_to_admin_menu',
'handle_admin_error',
'format_user_info',
'format_ban_confirmation',
'escape_html'
]

View File

@@ -1,52 +1,55 @@
import traceback
import html
from aiogram import Router, types, F
from aiogram.filters import Command, StateFilter
from aiogram.filters import Command, StateFilter, MagicData
from aiogram.fsm.context import FSMContext
from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.keyboards import get_reply_keyboard_admin, create_keyboard_with_pagination, \
create_keyboard_for_ban_days, create_keyboard_for_approve_ban, create_keyboard_for_ban_reason
from helper_bot.utils.base_dependency_factory import get_global_instance
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
from helper_bot.keyboards.keyboards import (
get_reply_keyboard_admin,
create_keyboard_with_pagination,
create_keyboard_for_ban_days,
create_keyboard_for_approve_ban,
create_keyboard_for_ban_reason
)
from helper_bot.handlers.admin.dependencies import AdminAccessMiddleware
from helper_bot.handlers.admin.services import AdminService
from helper_bot.handlers.admin.exceptions import (
UserAlreadyBannedError,
InvalidInputError
)
from helper_bot.handlers.admin.utils import (
return_to_admin_menu,
handle_admin_error,
format_user_info,
format_ban_confirmation,
escape_html
)
from logs.custom_logger import logger
# Создаем роутер с middleware для проверки доступа
admin_router = Router()
admin_router.message.middleware(AdminAccessMiddleware())
bdf = get_global_instance()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
# ============================================================================
# ХЕНДЛЕРЫ МЕНЮ
# ============================================================================
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
Command('admin')
)
async def admin_panel(message: types.Message, state: FSMContext):
async def admin_panel(
message: types.Message,
state: FSMContext
):
"""Главное меню администратора"""
try:
if check_access(message.from_user.id, BotDB):
await state.set_state("ADMIN")
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
markup = get_reply_keyboard_admin()
await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
else:
await message.answer('Доступ запрещен, досвидания!')
await state.set_state("START")
await state.set_state("ADMIN")
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
markup = get_reply_keyboard_admin()
await message.answer("Добро пожаловать в админку. Выбери что хочешь:", reply_markup=markup)
except Exception as e:
logger.error(f"Ошибка при запуске админ панели: {e}")
await message.bot.send_message(IMPORTANT_LOGS,
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
await state.set_state("START")
await handle_admin_error(message, e, state, "admin_panel")
@admin_router.message(
@@ -54,150 +57,30 @@ async def admin_panel(message: types.Message, state: FSMContext):
StateFilter("ADMIN"),
F.text == 'Бан (Список)'
)
async def get_last_users(message: types.Message, state: FSMContext):
# Дополнительная проверка на админские права
if not check_access(message.from_user.id, BotDB):
await message.answer('Доступ запрещен!')
await state.set_state("START")
return
logger.info(
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
list_users = BotDB.get_last_users_from_db()
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
await message.answer(text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard)
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("ADMIN"),
F.text == 'Бан по нику'
)
async def ban_by_nickname(message: types.Message, state: FSMContext):
# Дополнительная проверка на админские права
if not check_access(message.from_user.id, BotDB):
await message.answer('Доступ запрещен!')
await state.set_state("START")
return
await message.answer('Пришли мне username блокируемого пользователя')
await state.set_state('PRE_BAN')
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("ADMIN"),
F.text == 'Бан по ID'
)
async def ban_by_id(message: types.Message, state: FSMContext):
# Дополнительная проверка на админские права
if not check_access(message.from_user.id, BotDB):
await message.answer('Доступ запрещен!')
await state.set_state("START")
return
await message.answer('Пришли мне ID блокируемого пользователя')
await state.set_state('PRE_BAN_ID')
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("PRE_BAN", "PRE_BAN_ID", "BAN_2"),
F.text == 'Отменить'
)
async def decline_ban(message: types.Message, state: FSMContext):
# Дополнительная проверка на админские права
if not check_access(message.from_user.id, BotDB):
await message.answer('Доступ запрещен!')
await state.set_state("START")
return
current_state = await state.get_state()
await state.set_data({})
await state.set_state("ADMIN")
logger.info(f"Отмена процедуры блокировки из состояния: {current_state}")
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("PRE_BAN")
)
async def ban_by_nickname_step_2(message: types.Message, state: FSMContext):
# Дополнительная проверка на админские права
if not check_access(message.from_user.id, BotDB):
await message.answer('Доступ запрещен!')
await state.set_state("START")
return
logger.info(
f"Функция ban_by_nickname_2. Получен никнейм пользователя: {message.text}")
user_name = message.text
user_id = BotDB.get_user_id_by_username(user_name)
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
date_to_unban=None)
full_name = BotDB.get_full_name_by_id(user_id)
markup = create_keyboard_for_ban_reason()
# Экранируем потенциально проблемные символы
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(full_name))
await message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("PRE_BAN_ID")
)
async def ban_by_id_step_2(message: types.Message, state: FSMContext):
# Дополнительная проверка на админские права
if not check_access(message.from_user.id, BotDB):
await message.answer('Доступ запрещен!')
await state.set_state("START")
return
async def get_last_users(
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db")
):
"""Получение списка последних пользователей"""
try:
user_id = int(message.text)
logger.info(f"Функция ban_by_id_step_2. Получен ID пользователя: {user_id}")
logger.info(f"Получение списка последних пользователей. Пользователь: {message.from_user.full_name}")
admin_service = AdminService(bot_db)
users = admin_service.get_last_users()
# Проверяем, существует ли пользователь в базе
user_info = BotDB.get_user_info_by_id(user_id)
if not user_info:
await message.answer(f"Пользователь с ID {user_id} не найден в базе данных.")
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)
return
# Преобразуем в формат для клавиатуры (кортежи как ожидает create_keyboard_with_pagination)
users_data = [
(user.full_name, user.username) # (full_name, username) - формат кортежей
for user in users
]
user_name = user_info.get('username', 'Неизвестно')
full_name = user_info.get('full_name', 'Неизвестно')
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
date_to_unban=None)
markup = create_keyboard_for_ban_reason()
# Экранируем потенциально проблемные символы
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(full_name))
keyboard = create_keyboard_with_pagination(1, len(users_data), users_data, 'ban')
await message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
except ValueError:
await message.answer("Пожалуйста, введите корректный числовой ID пользователя.")
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)
text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard
)
except Exception as e:
await handle_admin_error(message, e, state, "get_last_users")
@admin_router.message(
@@ -205,80 +88,222 @@ async def ban_by_id_step_2(message: types.Message, state: FSMContext):
StateFilter("ADMIN"),
F.text == 'Разбан (список)'
)
async def get_banned_users(message):
logger.info(
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
message_text = get_banned_users_list(0, BotDB)
buttons_list = get_banned_users_buttons(BotDB)
if buttons_list:
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
await message.answer(text=message_text, reply_markup=k)
else:
await message.answer(text="В списке забанненых пользователей никого нет")
async def get_banned_users(
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db")
):
"""Получение списка заблокированных пользователей"""
try:
logger.info(f"Получение списка заблокированных пользователей. Пользователь: {message.from_user.full_name}")
admin_service = AdminService(bot_db)
message_text, buttons_list = admin_service.get_banned_users_for_display(0)
if buttons_list:
keyboard = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
await message.answer(text=message_text, reply_markup=keyboard)
else:
await message.answer(text="В списке заблокированных пользователей никого нет")
except Exception as e:
await handle_admin_error(message, e, state, "get_banned_users")
# ============================================================================
# ХЕНДЛЕРЫ ПРОЦЕССА БАНА
# ============================================================================
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_2")
StateFilter("ADMIN"),
F.text.in_(['Бан по нику', 'Бан по ID'])
)
async def ban_user_step_2(message: types.Message, state: FSMContext):
user_data = await state.get_data()
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
await state.update_data(message_for_user=message.text)
markup = create_keyboard_for_ban_days()
# Экранируем message.text для безопасного использования
safe_message_text = html.escape(str(message.text)) if message.text else ""
await message.answer(f"Выбрана причина: {safe_message_text}. Выбери срок бана в днях или напиши "
f"его в чат", reply_markup=markup)
await state.set_state("BAN_3")
async def start_ban_process(
message: types.Message,
state: FSMContext,
):
"""Начало процесса блокировки пользователя"""
try:
ban_type = "username" if message.text == 'Бан по нику' else "id"
await state.update_data(ban_type=ban_type)
prompt_text = "Пришли мне username блокируемого пользователя" if ban_type == "username" else "Пришли мне ID блокируемого пользователя"
await message.answer(prompt_text)
await state.set_state('AWAIT_BAN_TARGET')
except Exception as e:
await handle_admin_error(message, e, state, "start_ban_process")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_3")
StateFilter("AWAIT_BAN_TARGET")
)
async def ban_user_step_3(message: types.Message, state: FSMContext):
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
if message.text != 'Навсегда':
count_days = int(message.text)
date_to_unban = add_days_to_date(count_days)
else:
date_to_unban = None
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
await state.update_data(date_to_unban=date_to_unban)
user_data = await state.get_data()
markup = create_keyboard_for_approve_ban()
# Экранируем user_data для безопасного использования
safe_message_for_user = html.escape(str(user_data['message_for_user'])) if user_data.get('message_for_user') else ""
safe_date_to_unban = html.escape(str(user_data['date_to_unban'])) if user_data.get('date_to_unban') else ""
await message.answer(
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{safe_message_for_user}\nСрок бана:{safe_date_to_unban}",
reply_markup=markup)
await state.set_state("BAN_FINAL")
async def process_ban_target(
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db")
):
"""Обработка введенного username/ID для блокировки"""
try:
user_data = await state.get_data()
ban_type = user_data.get('ban_type')
admin_service = AdminService(bot_db)
# Определяем пользователя
if ban_type == "username":
user = admin_service.get_user_by_username(message.text)
if not user:
await message.answer(f"Пользователь с username '{escape_html(message.text)}' не найден.")
await return_to_admin_menu(message, state)
return
else: # ban_type == "id"
try:
user_id = admin_service.validate_user_input(message.text)
user = admin_service.get_user_by_id(user_id)
if not user:
await message.answer(f"Пользователь с ID {user_id} не найден в базе данных.")
await return_to_admin_menu(message, state)
return
except InvalidInputError as e:
await message.answer(str(e))
await return_to_admin_menu(message, state)
return
# Сохраняем данные пользователя
await state.update_data(
target_user_id=user.user_id,
target_username=user.username,
target_full_name=user.full_name
)
# Показываем информацию о пользователе и запрашиваем причину
user_info = format_user_info(user.user_id, user.username, user.full_name)
markup = create_keyboard_for_ban_reason()
await message.answer(
text=f"{user_info}\n\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup
)
await state.set_state('AWAIT_BAN_DETAILS')
except Exception as e:
await handle_admin_error(message, e, state, "process_ban_target")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_FINAL"),
StateFilter("AWAIT_BAN_DETAILS")
)
async def process_ban_reason(
message: types.Message,
state: FSMContext
):
"""Обработка причины блокировки"""
try:
await state.update_data(ban_reason=message.text)
markup = create_keyboard_for_ban_days()
safe_reason = escape_html(message.text)
await message.answer(
f"Выбрана причина: {safe_reason}. Выбери срок бана в днях или напиши его в чат",
reply_markup=markup
)
await state.set_state('AWAIT_BAN_DURATION')
except Exception as e:
await handle_admin_error(message, e, state, "process_ban_reason")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("AWAIT_BAN_DURATION")
)
async def process_ban_duration(
message: types.Message,
state: FSMContext,
):
"""Обработка срока блокировки"""
try:
user_data = await state.get_data()
# Определяем срок блокировки
if message.text == 'Навсегда':
ban_days = None
else:
try:
ban_days = int(message.text)
if ban_days <= 0:
await message.answer("Срок блокировки должен быть положительным числом.")
return
except ValueError:
await message.answer("Пожалуйста, введите корректное число дней или выберите 'Навсегда'.")
return
await state.update_data(ban_days=ban_days)
# Показываем подтверждение
confirmation_text = format_ban_confirmation(
user_data['target_user_id'],
user_data['ban_reason'],
ban_days
)
markup = create_keyboard_for_approve_ban()
await message.answer(confirmation_text, reply_markup=markup)
await state.set_state('BAN_CONFIRMATION')
except Exception as e:
await handle_admin_error(message, e, state, "process_ban_duration")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_CONFIRMATION"),
F.text == 'Подтвердить'
)
async def approve_ban(message: types.Message, state: FSMContext):
user_data = await state.get_data()
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
exists = BotDB.check_user_in_blacklist(user_data['user_id'])
if exists:
await message.reply(f"Пользователь уже был заблокирован ранее.")
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
await state.set_state('ADMIN')
else:
BotDB.set_user_blacklist(user_data['user_id'],
user_data['user_name'],
user_data['message_for_user'],
user_data['date_to_unban'])
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user_data['user_name'])) if user_data.get('user_name') else "Неизвестный пользователь"
await message.reply(f"Пользователь {safe_user_name} успешно заблокирован.")
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)
async def confirm_ban(
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db")
):
"""Подтверждение блокировки пользователя"""
try:
user_data = await state.get_data()
admin_service = AdminService(bot_db)
# Выполняем блокировку
admin_service.ban_user(
user_id=user_data['target_user_id'],
username=user_data['target_username'],
reason=user_data['ban_reason'],
ban_days=user_data['ban_days']
)
safe_username = escape_html(user_data['target_username'])
await message.reply(f"Пользователь {safe_username} успешно заблокирован.")
await return_to_admin_menu(message, state)
except UserAlreadyBannedError as e:
await message.reply(str(e))
await return_to_admin_menu(message, state)
except Exception as e:
await handle_admin_error(message, e, state, "confirm_ban")
# ============================================================================
# ХЕНДЛЕРЫ ОТМЕНЫ И НАВИГАЦИИ
# ============================================================================
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("AWAIT_BAN_TARGET", "AWAIT_BAN_DETAILS", "AWAIT_BAN_DURATION", "BAN_CONFIRMATION"),
F.text == 'Отменить'
)
async def cancel_ban_process(
message: types.Message,
state: FSMContext
):
"""Отмена процесса блокировки"""
try:
current_state = await state.get_state()
logger.info(f"Отмена процедуры блокировки из состояния: {current_state}")
await return_to_admin_menu(message, state)
except Exception as e:
await handle_admin_error(message, e, state, "cancel_ban_process")

View File

@@ -0,0 +1,60 @@
from typing import Annotated, Dict, Any
from aiogram import BaseMiddleware
from aiogram.types import TelegramObject
from helper_bot.utils.base_dependency_factory import get_global_instance
from helper_bot.utils.helper_func import check_access
from logs.custom_logger import logger
class AdminAccessMiddleware(BaseMiddleware):
"""Middleware для проверки административного доступа"""
async def __call__(self, handler, event: TelegramObject, data: Dict[str, Any]) -> Any:
if hasattr(event, 'from_user'):
user_id = event.from_user.id
# Получаем bot_db из data (внедренного DependenciesMiddleware)
bot_db = data.get('bot_db')
if not bot_db:
# Fallback: получаем напрямую если middleware не сработала
bdf = get_global_instance()
bot_db = bdf.get_db()
if not check_access(user_id, bot_db):
if hasattr(event, 'answer'):
await event.answer('Доступ запрещен!')
return
try:
# Вызываем хендлер с data
return await handler(event, data)
except TypeError as e:
if "missing 1 required positional argument: 'data'" in str(e):
logger.error(f"Ошибка в AdminAccessMiddleware: {e}. Хендлер не принимает параметр 'data'")
# Пытаемся вызвать хендлер без data (для совместимости с MagicData)
return await handler(event)
else:
logger.error(f"TypeError в AdminAccessMiddleware: {e}")
raise
except Exception as e:
logger.error(f"Неожиданная ошибка в AdminAccessMiddleware: {e}")
raise
# Dependency providers
def get_bot_db():
"""Провайдер для получения экземпляра БД"""
bdf = get_global_instance()
return bdf.get_db()
def get_settings():
"""Провайдер для получения настроек"""
bdf = get_global_instance()
return bdf.settings
# Type aliases for dependency injection
BotDB = Annotated[object, get_bot_db()]
Settings = Annotated[dict, get_settings()]

View File

@@ -0,0 +1,23 @@
class AdminError(Exception):
"""Базовое исключение для административных операций"""
pass
class AdminAccessDeniedError(AdminError):
"""Исключение при отказе в административном доступе"""
pass
class UserNotFoundError(AdminError):
"""Исключение при отсутствии пользователя"""
pass
class InvalidInputError(AdminError):
"""Исключение при некорректном вводе данных"""
pass
class UserAlreadyBannedError(AdminError):
"""Исключение при попытке забанить уже заблокированного пользователя"""
pass

View File

@@ -0,0 +1,146 @@
from typing import List, Optional
from datetime import datetime
from helper_bot.utils.helper_func import add_days_to_date, get_banned_users_buttons, get_banned_users_list
from helper_bot.handlers.admin.exceptions import UserAlreadyBannedError, InvalidInputError
from logs.custom_logger import logger
class User:
"""Модель пользователя"""
def __init__(self, user_id: int, username: str, full_name: str):
self.user_id = user_id
self.username = username
self.full_name = full_name
class BannedUser:
"""Модель заблокированного пользователя"""
def __init__(self, user_id: int, username: str, reason: str, unban_date: Optional[datetime]):
self.user_id = user_id
self.username = username
self.reason = reason
self.unban_date = unban_date
class AdminService:
"""Сервис для административных операций"""
def __init__(self, bot_db):
self.bot_db = bot_db
def get_last_users(self) -> List[User]:
"""Получить список последних пользователей"""
try:
users_data = self.bot_db.get_last_users_from_db()
return [
User(
user_id=user[1],
username='Неизвестно',
full_name=user[0]
)
for user in users_data
]
except Exception as e:
logger.error(f"Ошибка при получении списка последних пользователей: {e}")
raise
def get_banned_users(self) -> List[BannedUser]:
"""Получить список заблокированных пользователей"""
try:
banned_users_data = self.bot_db.get_banned_users_from_db()
return [
BannedUser(
user_id=user[1], # user_id
username=user[0], # user_name
reason=user[2], # message_for_user
unban_date=user[3] # date_to_unban
)
for user in banned_users_data
]
except Exception as e:
logger.error(f"Ошибка при получении списка заблокированных пользователей: {e}")
raise
def get_user_by_username(self, username: str) -> Optional[User]:
"""Получить пользователя по username"""
try:
user_id = self.bot_db.get_user_id_by_username(username)
if not user_id:
return None
full_name = self.bot_db.get_full_name_by_id(user_id)
return User(
user_id=user_id,
username=username,
full_name=full_name or 'Неизвестно'
)
except Exception as e:
logger.error(f"Ошибка при поиске пользователя по username {username}: {e}")
raise
def get_user_by_id(self, user_id: int) -> Optional[User]:
"""Получить пользователя по ID"""
try:
user_info = self.bot_db.get_user_info_by_id(user_id)
if not user_info:
return None
return User(
user_id=user_id,
username=user_info.get('username', 'Неизвестно'),
full_name=user_info.get('full_name', 'Неизвестно')
)
except Exception as e:
logger.error(f"Ошибка при поиске пользователя по ID {user_id}: {e}")
raise
def ban_user(self, user_id: int, username: str, reason: str, ban_days: Optional[int]) -> None:
"""Заблокировать пользователя"""
try:
# Проверяем, не заблокирован ли уже пользователь
if self.bot_db.check_user_in_blacklist(user_id):
raise UserAlreadyBannedError(f"Пользователь {user_id} уже заблокирован")
# Рассчитываем дату разблокировки
date_to_unban = None
if ban_days is not None:
date_to_unban = add_days_to_date(ban_days)
# Сохраняем в БД
self.bot_db.set_user_blacklist(user_id, username, reason, date_to_unban)
logger.info(f"Пользователь {user_id} ({username}) заблокирован. Причина: {reason}, срок: {ban_days} дней")
except Exception as e:
logger.error(f"Ошибка при блокировке пользователя {user_id}: {e}")
raise
def unban_user(self, user_id: int) -> None:
"""Разблокировать пользователя"""
try:
self.bot_db.delete_user_blacklist(user_id)
logger.info(f"Пользователь {user_id} разблокирован")
except Exception as e:
logger.error(f"Ошибка при разблокировке пользователя {user_id}: {e}")
raise
def validate_user_input(self, input_text: str) -> int:
"""Валидация введенного ID пользователя"""
try:
user_id = int(input_text.strip())
if user_id <= 0:
raise InvalidInputError("ID пользователя должен быть положительным числом")
return user_id
except ValueError:
raise InvalidInputError("ID пользователя должен быть числом")
def get_banned_users_for_display(self, page: int = 0) -> tuple[str, list]:
"""Получить данные заблокированных пользователей для отображения"""
try:
message_text = get_banned_users_list(page, self.bot_db)
buttons_list = get_banned_users_buttons(self.bot_db)
return message_text, buttons_list
except Exception as e:
logger.error(f"Ошибка при получении данных заблокированных пользователей: {e}")
raise

View File

@@ -0,0 +1,61 @@
import html
from typing import Optional
from aiogram import types
from aiogram.fsm.context import FSMContext
from helper_bot.keyboards.keyboards import get_reply_keyboard_admin
from helper_bot.handlers.admin.exceptions import AdminError
from logs.custom_logger import logger
def escape_html(text: str) -> str:
"""Экранирование HTML для безопасного использования в сообщениях"""
return html.escape(str(text)) if text else ""
async def return_to_admin_menu(message: types.Message, state: FSMContext,
additional_message: Optional[str] = None) -> None:
"""Универсальная функция для возврата в админ-меню"""
await state.set_data({})
await state.set_state("ADMIN")
markup = get_reply_keyboard_admin()
if additional_message:
await message.answer(additional_message)
await message.answer('Вернулись в меню', reply_markup=markup)
async def handle_admin_error(message: types.Message, error: Exception,
state: FSMContext, error_context: str = "") -> None:
"""Централизованная обработка ошибок административных операций"""
logger.error(f"Ошибка в {error_context}: {error}")
if isinstance(error, AdminError):
await message.answer(f"Ошибка: {str(error)}")
else:
await message.answer("Произошла внутренняя ошибка. Попробуйте позже.")
await return_to_admin_menu(message, state)
def format_user_info(user_id: int, username: str, full_name: str) -> str:
"""Форматирование информации о пользователе для отображения"""
safe_username = escape_html(username)
safe_full_name = escape_html(full_name)
return (f"<b>Выбран пользователь:</b>\n"
f"<b>ID:</b> {user_id}\n"
f"<b>Username:</b> {safe_username}\n"
f"<b>Имя:</b> {safe_full_name}")
def format_ban_confirmation(user_id: int, reason: str, ban_days: Optional[int]) -> str:
"""Форматирование подтверждения бана"""
safe_reason = escape_html(reason)
ban_text = "Навсегда" if ban_days is None else f"{ban_days} дней"
return (f"<b>Необходимо подтверждение:</b>\n"
f"<b>Пользователь:</b> {user_id}\n"
f"<b>Причина бана:</b> {safe_reason}\n"
f"<b>Срок бана:</b> {ban_text}")