322 lines
17 KiB
Python
322 lines
17 KiB
Python
import traceback
|
||
import html
|
||
|
||
from aiogram import Router, types, F
|
||
from aiogram.filters import Command, StateFilter
|
||
from aiogram.fsm.context import FSMContext
|
||
|
||
from helper_bot.filters.main import ChatTypeFilter
|
||
from helper_bot.keyboards.keyboards import get_reply_keyboard_admin, create_keyboard_with_pagination, \
|
||
create_keyboard_for_ban_days, create_keyboard_for_approve_ban, create_keyboard_for_ban_reason
|
||
from helper_bot.utils.base_dependency_factory import get_global_instance
|
||
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
|
||
from logs.custom_logger import logger
|
||
|
||
admin_router = Router()
|
||
|
||
bdf = get_global_instance()
|
||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||
LOGS = bdf.settings['Settings']['logs']
|
||
TEST = bdf.settings['Settings']['test']
|
||
|
||
BotDB = bdf.get_db()
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
Command('admin')
|
||
)
|
||
async def admin_panel(message: types.Message, state: FSMContext):
|
||
try:
|
||
if check_access(message.from_user.id, BotDB):
|
||
await state.set_state("ADMIN")
|
||
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||
reply_markup=markup)
|
||
else:
|
||
await message.answer('Доступ запрещен, досвидания!')
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при запуске админ панели: {e}")
|
||
await message.bot.send_message(IMPORTANT_LOGS,
|
||
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("ADMIN"),
|
||
F.text == 'Бан (Список)'
|
||
)
|
||
async def get_last_users(message: types.Message):
|
||
logger.info(
|
||
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||
list_users = BotDB.get_last_users_from_db()
|
||
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
|
||
await message.answer(text="Список пользователей которые последними обращались к боту",
|
||
reply_markup=keyboard)
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("ADMIN"),
|
||
F.text == 'Бан по нику'
|
||
)
|
||
async def ban_by_nickname(message: types.Message, state: FSMContext):
|
||
await message.answer('Пришли мне username блокируемого пользователя')
|
||
await state.set_state('PRE_BAN')
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("ADMIN"),
|
||
F.text == 'Бан по ID'
|
||
)
|
||
async def ban_by_id(message: types.Message, state: FSMContext):
|
||
await message.answer('Пришли мне ID блокируемого пользователя')
|
||
await state.set_state('PRE_BAN_ID')
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("ADMIN"),
|
||
F.text == 'Тестовый бан'
|
||
)
|
||
async def ban_by_forward(message: types.Message, state: FSMContext):
|
||
await message.answer('Перешлите мне сообщение от пользователя, которого хотите заблокировать')
|
||
await state.set_state('PRE_BAN_FORWARD')
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
F.text == 'Отменить'
|
||
)
|
||
async def decline_ban(message: types.Message, state: FSMContext):
|
||
current_state = await state.get_state()
|
||
await state.set_data({})
|
||
await state.set_state("ADMIN")
|
||
logger.info(f"Отмена процедуры блокировки из состояния: {current_state}")
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("PRE_BAN")
|
||
)
|
||
async def ban_by_nickname_step_2(message: types.Message, state: FSMContext):
|
||
logger.info(
|
||
f"Функция ban_by_nickname_2. Получен никнейм пользователя: {message.text}")
|
||
user_name = message.text
|
||
user_id = BotDB.get_user_id_by_username(user_name)
|
||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||
date_to_unban=None)
|
||
full_name = BotDB.get_full_name_by_id(user_id)
|
||
markup = create_keyboard_for_ban_reason()
|
||
# Экранируем потенциально проблемные символы
|
||
user_name_escaped = html.escape(str(user_name))
|
||
full_name_escaped = html.escape(str(full_name))
|
||
await message.answer(
|
||
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
|
||
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
|
||
reply_markup=markup)
|
||
await state.set_state('BAN_2')
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("PRE_BAN_ID")
|
||
)
|
||
async def ban_by_id_step_2(message: types.Message, state: FSMContext):
|
||
try:
|
||
user_id = int(message.text)
|
||
logger.info(f"Функция ban_by_id_step_2. Получен ID пользователя: {user_id}")
|
||
|
||
# Проверяем, существует ли пользователь в базе
|
||
user_info = BotDB.get_user_info_by_id(user_id)
|
||
if not user_info:
|
||
await message.answer(f"Пользователь с ID {user_id} не найден в базе данных.")
|
||
await state.set_state('ADMIN')
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||
return
|
||
|
||
user_name = user_info.get('username', 'Неизвестно')
|
||
full_name = user_info.get('full_name', 'Неизвестно')
|
||
|
||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||
date_to_unban=None)
|
||
|
||
markup = create_keyboard_for_ban_reason()
|
||
# Экранируем потенциально проблемные символы
|
||
user_name_escaped = html.escape(str(user_name))
|
||
full_name_escaped = html.escape(str(full_name))
|
||
await message.answer(
|
||
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
|
||
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
|
||
reply_markup=markup)
|
||
await state.set_state('BAN_2')
|
||
|
||
except ValueError:
|
||
await message.answer("Пожалуйста, введите корректный числовой ID пользователя.")
|
||
await state.set_state('ADMIN')
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("PRE_BAN_FORWARD"),
|
||
F.forward_from
|
||
)
|
||
async def ban_by_forward_step_2(message: types.Message, state: FSMContext):
|
||
"""Обработчик пересланных сообщений для бана пользователя"""
|
||
try:
|
||
# Получаем информацию о пользователе из пересланного сообщения
|
||
forwarded_user = message.forward_from
|
||
|
||
if not forwarded_user:
|
||
await message.answer("Не удалось получить информацию о пользователе из пересланного сообщения. Возможно, пользователь скрыл возможность пересылки своих сообщений.")
|
||
await state.set_state('ADMIN')
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||
return
|
||
|
||
user_id = forwarded_user.id
|
||
user_name = forwarded_user.username or "private_username"
|
||
full_name = forwarded_user.full_name or "Неизвестно"
|
||
|
||
logger.info(f"Функция ban_by_forward_step_2. Получен пользователь из пересланного сообщения: ID={user_id}, username={user_name}, full_name={full_name}")
|
||
|
||
# Проверяем, существует ли пользователь в базе
|
||
user_info = BotDB.get_user_info_by_id(user_id)
|
||
if not user_info:
|
||
# Если пользователя нет в базе, используем информацию из пересланного сообщения
|
||
logger.info(f"Пользователь с ID {user_id} не найден в базе данных, используем данные из пересланного сообщения")
|
||
user_name = user_name
|
||
full_name = full_name
|
||
else:
|
||
# Если пользователь есть в базе, используем данные из базы
|
||
user_name = user_info.get('username', user_name)
|
||
full_name = user_info.get('full_name', full_name)
|
||
|
||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||
date_to_unban=None)
|
||
|
||
markup = create_keyboard_for_ban_reason()
|
||
# Экранируем потенциально проблемные символы
|
||
user_name_escaped = html.escape(str(user_name))
|
||
full_name_escaped = html.escape(str(full_name))
|
||
await message.answer(
|
||
text=f"<b>Выбран пользователь из пересланного сообщения:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
|
||
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
|
||
reply_markup=markup)
|
||
await state.set_state('BAN_2')
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при обработке пересланного сообщения: {e}")
|
||
await message.answer("Произошла ошибка при обработке пересланного сообщения.")
|
||
await state.set_state('ADMIN')
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("PRE_BAN_FORWARD")
|
||
)
|
||
async def ban_by_forward_invalid(message: types.Message, state: FSMContext):
|
||
"""Обработчик для случаев, когда сообщение не является пересланным или не содержит информацию о пользователе"""
|
||
if message.forward_from_chat:
|
||
await message.answer("Пересланное сообщение из канала или группы не содержит информацию о конкретном пользователе. Пожалуйста, перешлите сообщение из приватного чата.")
|
||
else:
|
||
await message.answer("Пожалуйста, перешлите сообщение от пользователя, которого хотите заблокировать. Обычное сообщение не подходит.")
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("ADMIN"),
|
||
F.text == 'Разбан (список)'
|
||
)
|
||
async def get_banned_users(message):
|
||
logger.info(
|
||
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||
message_text = get_banned_users_list(0, BotDB)
|
||
buttons_list = get_banned_users_buttons(BotDB)
|
||
if buttons_list:
|
||
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
|
||
await message.answer(text=message_text, reply_markup=k)
|
||
else:
|
||
await message.answer(text="В списке забанненых пользователей никого нет")
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("BAN_2")
|
||
)
|
||
async def ban_user_step_2(message: types.Message, state: FSMContext):
|
||
user_data = await state.get_data()
|
||
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
|
||
await state.update_data(message_for_user=message.text)
|
||
markup = create_keyboard_for_ban_days()
|
||
# Экранируем message.text для безопасного использования
|
||
safe_message_text = html.escape(str(message.text)) if message.text else ""
|
||
await message.answer(f"Выбрана причина: {safe_message_text}. Выбери срок бана в днях или напиши "
|
||
f"его в чат", reply_markup=markup)
|
||
await state.set_state("BAN_3")
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("BAN_3")
|
||
)
|
||
async def ban_user_step_3(message: types.Message, state: FSMContext):
|
||
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
|
||
if message.text != 'Навсегда':
|
||
count_days = int(message.text)
|
||
date_to_unban = add_days_to_date(count_days)
|
||
else:
|
||
date_to_unban = None
|
||
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
|
||
await state.update_data(date_to_unban=date_to_unban)
|
||
user_data = await state.get_data()
|
||
markup = create_keyboard_for_approve_ban()
|
||
# Экранируем user_data для безопасного использования
|
||
safe_message_for_user = html.escape(str(user_data['message_for_user'])) if user_data.get('message_for_user') else ""
|
||
safe_date_to_unban = html.escape(str(user_data['date_to_unban'])) if user_data.get('date_to_unban') else ""
|
||
await message.answer(
|
||
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{safe_message_for_user}\nСрок бана:{safe_date_to_unban}",
|
||
reply_markup=markup)
|
||
await state.set_state("BAN_FINAL")
|
||
|
||
|
||
@admin_router.message(
|
||
ChatTypeFilter(chat_type=["private"]),
|
||
StateFilter("BAN_FINAL"),
|
||
F.text == 'Подтвердить'
|
||
)
|
||
async def approve_ban(message: types.Message, state: FSMContext):
|
||
user_data = await state.get_data()
|
||
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
|
||
exists = BotDB.check_user_in_blacklist(user_data['user_id'])
|
||
if exists:
|
||
await message.reply(f"Пользователь уже был заблокирован ранее.")
|
||
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
|
||
await state.set_state('ADMIN')
|
||
else:
|
||
BotDB.set_user_blacklist(user_data['user_id'],
|
||
user_data['user_name'],
|
||
user_data['message_for_user'],
|
||
user_data['date_to_unban'])
|
||
# Экранируем user_name для безопасного использования
|
||
safe_user_name = html.escape(str(user_data['user_name'])) if user_data.get('user_name') else "Неизвестный пользователь"
|
||
await message.reply(f"Пользователь {safe_user_name} успешно заблокирован.")
|
||
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
|
||
await state.set_state('ADMIN')
|
||
markup = get_reply_keyboard_admin()
|
||
await message.answer('Вернулись в меню', reply_markup=markup)
|