Refactor admin handlers to improve access control and state management. Added checks for admin rights in ban functions and streamlined router inclusion order in main bot file. Updated keyboard layouts for better user experience and removed unused state definitions.
This commit is contained in:
3
__init__.py
Normal file
3
__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
# This file makes the root directory a Python package
|
||||
|
||||
|
||||
@@ -41,10 +41,12 @@ async def admin_panel(message: types.Message, state: FSMContext):
|
||||
reply_markup=markup)
|
||||
else:
|
||||
await message.answer('Доступ запрещен, досвидания!')
|
||||
await state.set_state("START")
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при запуске админ панели: {e}")
|
||||
await message.bot.send_message(IMPORTANT_LOGS,
|
||||
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
|
||||
await state.set_state("START")
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
@@ -52,7 +54,12 @@ async def admin_panel(message: types.Message, state: FSMContext):
|
||||
StateFilter("ADMIN"),
|
||||
F.text == 'Бан (Список)'
|
||||
)
|
||||
async def get_last_users(message: types.Message):
|
||||
async def get_last_users(message: types.Message, state: FSMContext):
|
||||
# Дополнительная проверка на админские права
|
||||
if not check_access(message.from_user.id, BotDB):
|
||||
await message.answer('Доступ запрещен!')
|
||||
await state.set_state("START")
|
||||
return
|
||||
logger.info(
|
||||
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
@@ -67,6 +74,11 @@ async def get_last_users(message: types.Message):
|
||||
F.text == 'Бан по нику'
|
||||
)
|
||||
async def ban_by_nickname(message: types.Message, state: FSMContext):
|
||||
# Дополнительная проверка на админские права
|
||||
if not check_access(message.from_user.id, BotDB):
|
||||
await message.answer('Доступ запрещен!')
|
||||
await state.set_state("START")
|
||||
return
|
||||
await message.answer('Пришли мне username блокируемого пользователя')
|
||||
await state.set_state('PRE_BAN')
|
||||
|
||||
@@ -77,25 +89,29 @@ async def ban_by_nickname(message: types.Message, state: FSMContext):
|
||||
F.text == 'Бан по ID'
|
||||
)
|
||||
async def ban_by_id(message: types.Message, state: FSMContext):
|
||||
# Дополнительная проверка на админские права
|
||||
if not check_access(message.from_user.id, BotDB):
|
||||
await message.answer('Доступ запрещен!')
|
||||
await state.set_state("START")
|
||||
return
|
||||
await message.answer('Пришли мне ID блокируемого пользователя')
|
||||
await state.set_state('PRE_BAN_ID')
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("ADMIN"),
|
||||
F.text == 'Тестовый бан'
|
||||
)
|
||||
async def ban_by_forward(message: types.Message, state: FSMContext):
|
||||
await message.answer('Перешлите мне сообщение от пользователя, которого хотите заблокировать')
|
||||
await state.set_state('PRE_BAN_FORWARD')
|
||||
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("PRE_BAN", "PRE_BAN_ID", "BAN_2"),
|
||||
F.text == 'Отменить'
|
||||
)
|
||||
async def decline_ban(message: types.Message, state: FSMContext):
|
||||
# Дополнительная проверка на админские права
|
||||
if not check_access(message.from_user.id, BotDB):
|
||||
await message.answer('Доступ запрещен!')
|
||||
await state.set_state("START")
|
||||
return
|
||||
current_state = await state.get_state()
|
||||
await state.set_data({})
|
||||
await state.set_state("ADMIN")
|
||||
@@ -109,6 +125,11 @@ async def decline_ban(message: types.Message, state: FSMContext):
|
||||
StateFilter("PRE_BAN")
|
||||
)
|
||||
async def ban_by_nickname_step_2(message: types.Message, state: FSMContext):
|
||||
# Дополнительная проверка на админские права
|
||||
if not check_access(message.from_user.id, BotDB):
|
||||
await message.answer('Доступ запрещен!')
|
||||
await state.set_state("START")
|
||||
return
|
||||
logger.info(
|
||||
f"Функция ban_by_nickname_2. Получен никнейм пользователя: {message.text}")
|
||||
user_name = message.text
|
||||
@@ -132,6 +153,11 @@ async def ban_by_nickname_step_2(message: types.Message, state: FSMContext):
|
||||
StateFilter("PRE_BAN_ID")
|
||||
)
|
||||
async def ban_by_id_step_2(message: types.Message, state: FSMContext):
|
||||
# Дополнительная проверка на админские права
|
||||
if not check_access(message.from_user.id, BotDB):
|
||||
await message.answer('Доступ запрещен!')
|
||||
await state.set_state("START")
|
||||
return
|
||||
try:
|
||||
user_id = int(message.text)
|
||||
logger.info(f"Функция ban_by_id_step_2. Получен ID пользователя: {user_id}")
|
||||
@@ -168,73 +194,10 @@ async def ban_by_id_step_2(message: types.Message, state: FSMContext):
|
||||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("PRE_BAN_FORWARD"),
|
||||
F.forward_from
|
||||
)
|
||||
async def ban_by_forward_step_2(message: types.Message, state: FSMContext):
|
||||
"""Обработчик пересланных сообщений для бана пользователя"""
|
||||
try:
|
||||
# Получаем информацию о пользователе из пересланного сообщения
|
||||
forwarded_user = message.forward_from
|
||||
|
||||
if not forwarded_user:
|
||||
await message.answer("Не удалось получить информацию о пользователе из пересланного сообщения. Возможно, пользователь скрыл возможность пересылки своих сообщений.")
|
||||
await state.set_state('ADMIN')
|
||||
markup = get_reply_keyboard_admin()
|
||||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||||
return
|
||||
|
||||
user_id = forwarded_user.id
|
||||
user_name = forwarded_user.username or "private_username"
|
||||
full_name = forwarded_user.full_name or "Неизвестно"
|
||||
|
||||
logger.info(f"Функция ban_by_forward_step_2. Получен пользователь из пересланного сообщения: ID={user_id}, username={user_name}, full_name={full_name}")
|
||||
|
||||
# Проверяем, существует ли пользователь в базе
|
||||
user_info = BotDB.get_user_info_by_id(user_id)
|
||||
if not user_info:
|
||||
# Если пользователя нет в базе, используем информацию из пересланного сообщения
|
||||
logger.info(f"Пользователь с ID {user_id} не найден в базе данных, используем данные из пересланного сообщения")
|
||||
user_name = user_name
|
||||
full_name = full_name
|
||||
else:
|
||||
# Если пользователь есть в базе, используем данные из базы
|
||||
user_name = user_info.get('username', user_name)
|
||||
full_name = user_info.get('full_name', full_name)
|
||||
|
||||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||||
date_to_unban=None)
|
||||
|
||||
markup = create_keyboard_for_ban_reason()
|
||||
# Экранируем потенциально проблемные символы
|
||||
user_name_escaped = html.escape(str(user_name))
|
||||
full_name_escaped = html.escape(str(full_name))
|
||||
await message.answer(
|
||||
text=f"<b>Выбран пользователь из пересланного сообщения:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
|
||||
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
|
||||
reply_markup=markup)
|
||||
await state.set_state('BAN_2')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Ошибка при обработке пересланного сообщения: {e}")
|
||||
await message.answer("Произошла ошибка при обработке пересланного сообщения.")
|
||||
await state.set_state('ADMIN')
|
||||
markup = get_reply_keyboard_admin()
|
||||
await message.answer('Вернулись в меню', reply_markup=markup)
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
ChatTypeFilter(chat_type=["private"]),
|
||||
StateFilter("PRE_BAN_FORWARD")
|
||||
)
|
||||
async def ban_by_forward_invalid(message: types.Message, state: FSMContext):
|
||||
"""Обработчик для случаев, когда сообщение не является пересланным или не содержит информацию о пользователе"""
|
||||
if message.forward_from_chat:
|
||||
await message.answer("Пересланное сообщение из канала или группы не содержит информацию о конкретном пользователе. Пожалуйста, перешлите сообщение из приватного чата.")
|
||||
else:
|
||||
await message.answer("Пожалуйста, перешлите сообщение от пользователя, которого хотите заблокировать. Обычное сообщение не подходит.")
|
||||
|
||||
|
||||
|
||||
|
||||
@admin_router.message(
|
||||
|
||||
@@ -1,284 +1,322 @@
|
||||
import html
|
||||
import traceback
|
||||
|
||||
from aiogram import Router, F
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import CallbackQuery
|
||||
|
||||
from helper_bot.keyboards.keyboards import create_keyboard_with_pagination, get_reply_keyboard_admin, \
|
||||
create_keyboard_for_ban_reason
|
||||
from helper_bot.utils.base_dependency_factory import get_global_instance
|
||||
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
|
||||
get_banned_users_buttons, delete_user_blacklist, send_media_group_to_channel, \
|
||||
send_video_message, send_video_note_message, send_audio_message, send_voice_message
|
||||
from logs.custom_logger import logger
|
||||
|
||||
callback_router = Router()
|
||||
|
||||
bdf = get_global_instance()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "publish"
|
||||
)
|
||||
async def post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с действием: {call.data} от пользователя {call.from_user.full_name} (ID сообщения: {call.message.message_id})')
|
||||
text_post = html.escape(str(call.message.text))
|
||||
text_post_with_photo = html.escape(str(call.message.caption))
|
||||
if call.message.content_type == 'text' and call.message.text != "^":
|
||||
try:
|
||||
# Пересылаем сообщение в канал
|
||||
await send_text_message(MAIN_PUBLIC, call.message, text_post)
|
||||
|
||||
# Получаем из базы автора
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
# Очищаем предложку и удаляем оттуда пост
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
|
||||
# Отвечаем пользователю
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'photo':
|
||||
try:
|
||||
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, text_post_with_photo)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
# Удаляем пост из предложки
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'video':
|
||||
try:
|
||||
await send_video_message(MAIN_PUBLIC, call.message, call.message.video.file_id, text_post_with_photo)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с видео опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации видео в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'video_note':
|
||||
try:
|
||||
await send_video_note_message(MAIN_PUBLIC, call.message, call.message.video_note.file_id)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с кружком опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации кружка в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'audio':
|
||||
try:
|
||||
await send_audio_message(MAIN_PUBLIC, call.message, call.message.audio.file_id, text_post_with_photo)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с аудио опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации аудио в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'voice':
|
||||
try:
|
||||
await send_voice_message(MAIN_PUBLIC, call.message, call.message.voice.file_id)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с войсом опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации войса в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.text == "^":
|
||||
# Получаем контент медиагруппы и текст для публикации
|
||||
post_content = BotDB.get_post_content_from_telegram_by_last_id(call.message.message_id)
|
||||
pre_text = BotDB.get_post_text_from_telegram_by_last_id(call.message.message_id)
|
||||
post_text = html.escape(str(pre_text))
|
||||
|
||||
# Готовим список для удаления
|
||||
post_ids = BotDB.get_post_ids_from_telegram_by_last_id(call.message.message_id)
|
||||
message_ids = [row[0] for row in post_ids]
|
||||
message_ids.append(call.message.message_id)
|
||||
|
||||
# Выкладываем пост в канал
|
||||
await send_media_group_to_channel(bot=call.bot, chat_id=MAIN_PUBLIC, post_content=post_content,
|
||||
post_text=post_text)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_helper_message_id(call.message.message_id)
|
||||
|
||||
# TODO: Удалить фотки с локалки после выкладки?
|
||||
await call.bot.delete_messages(chat_id=GROUP_FOR_POST, message_ids=message_ids)
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "decline"
|
||||
)
|
||||
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
|
||||
try:
|
||||
if call.message.content_type == 'text' and call.message.text != "^" or call.message.content_type == 'photo' \
|
||||
or call.message.content_type == 'audio' or call.message.content_type == 'voice' \
|
||||
or call.message.content_type == 'video' or call.message.content_type == 'video_note':
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
logger.info(
|
||||
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
|
||||
await call.answer(text='Отклонено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был отклонен😔')
|
||||
if call.message.text == '^':
|
||||
post_ids = BotDB.get_post_ids_from_telegram_by_last_id(call.message.message_id)
|
||||
message_ids = [row[0] for row in post_ids]
|
||||
message_ids.append(call.message.message_id)
|
||||
|
||||
await call.bot.delete_messages(chat_id=GROUP_FOR_POST, message_ids=message_ids)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_helper_message_id(call.message.message_id)
|
||||
|
||||
await call.answer(text='Удалено!', cache_time=3)
|
||||
|
||||
await send_text_message(author_id, call.message, 'Твой пост был отклонен😔')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('ban')
|
||||
)
|
||||
async def process_ban_user(call: CallbackQuery, state: FSMContext):
|
||||
user_id = call.data[4:]
|
||||
logger.info(
|
||||
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
if user_name:
|
||||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||||
date_to_unban=None)
|
||||
markup = create_keyboard_for_ban_reason()
|
||||
# Экранируем потенциально проблемные символы
|
||||
user_name_escaped = html.escape(str(user_name))
|
||||
full_name_escaped = html.escape(str(call.message.from_user.full_name))
|
||||
await call.message.answer(
|
||||
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\nИмя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
|
||||
reply_markup=markup)
|
||||
await state.set_state('BAN_2')
|
||||
else:
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup)
|
||||
await state.set_state('ADMIN')
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('unlock')
|
||||
)
|
||||
async def process_unlock_user(call: CallbackQuery):
|
||||
user_id = call.data[7:]
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
delete_user_blacklist(user_id, BotDB)
|
||||
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}")
|
||||
username = BotDB.get_username(user_id)
|
||||
await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == 'return'
|
||||
)
|
||||
async def return_to_main_menu(call: CallbackQuery):
|
||||
await call.message.delete()
|
||||
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||||
reply_markup=markup)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('page')
|
||||
)
|
||||
async def change_page(call: CallbackQuery):
|
||||
page_number = int(call.data[5:])
|
||||
logger.info(f"Переход на страницу {page_number}")
|
||||
if call.message.text == 'Список пользователей которые последними обращались к боту':
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
# TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
|
||||
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
|
||||
'ban')
|
||||
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
else:
|
||||
# Готовим сообщения
|
||||
message_user = get_banned_users_list(int(page_number) * 7 - 7, BotDB)
|
||||
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
text=message_user)
|
||||
|
||||
# Готовим клавиатуру
|
||||
buttons = get_banned_users_buttons(BotDB)
|
||||
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
import html
|
||||
import traceback
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from aiogram import Router, F
|
||||
from aiogram.fsm.context import FSMContext
|
||||
from aiogram.types import CallbackQuery
|
||||
|
||||
from helper_bot.keyboards.keyboards import create_keyboard_with_pagination, get_reply_keyboard_admin, \
|
||||
create_keyboard_for_ban_reason
|
||||
from helper_bot.utils.base_dependency_factory import get_global_instance
|
||||
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
|
||||
get_banned_users_buttons, delete_user_blacklist, send_media_group_to_channel, \
|
||||
send_video_message, send_video_note_message, send_audio_message, send_voice_message
|
||||
from logs.custom_logger import logger
|
||||
|
||||
callback_router = Router()
|
||||
|
||||
bdf = get_global_instance()
|
||||
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
|
||||
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
|
||||
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
|
||||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||||
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
|
||||
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
|
||||
LOGS = bdf.settings['Settings']['logs']
|
||||
TEST = bdf.settings['Settings']['test']
|
||||
|
||||
BotDB = bdf.get_db()
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "publish"
|
||||
)
|
||||
async def post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с действием: {call.data} от пользователя {call.from_user.full_name} (ID сообщения: {call.message.message_id})')
|
||||
text_post = html.escape(str(call.message.text))
|
||||
text_post_with_photo = html.escape(str(call.message.caption))
|
||||
if call.message.content_type == 'text' and call.message.text != "^":
|
||||
try:
|
||||
# Пересылаем сообщение в канал
|
||||
await send_text_message(MAIN_PUBLIC, call.message, text_post)
|
||||
|
||||
# Получаем из базы автора
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
# Очищаем предложку и удаляем оттуда пост
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
|
||||
# Отвечаем пользователю
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'photo':
|
||||
try:
|
||||
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, text_post_with_photo)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
# Удаляем пост из предложки
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'video':
|
||||
try:
|
||||
await send_video_message(MAIN_PUBLIC, call.message, call.message.video.file_id, text_post_with_photo)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с видео опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации видео в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'video_note':
|
||||
try:
|
||||
await send_video_note_message(MAIN_PUBLIC, call.message, call.message.video_note.file_id)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с кружком опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации кружка в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'audio':
|
||||
try:
|
||||
await send_audio_message(MAIN_PUBLIC, call.message, call.message.audio.file_id, text_post_with_photo)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с аудио опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации аудио в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.content_type == 'voice':
|
||||
try:
|
||||
await send_voice_message(MAIN_PUBLIC, call.message, call.message.voice.file_id)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
logger.info(f'Пост с войсом опубликован в канале {MAIN_PUBLIC}.')
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
|
||||
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при публикации войса в канал {MAIN_PUBLIC}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
elif call.message.text == "^":
|
||||
# Получаем контент медиагруппы и текст для публикации
|
||||
post_content = BotDB.get_post_content_from_telegram_by_last_id(call.message.message_id)
|
||||
pre_text = BotDB.get_post_text_from_telegram_by_last_id(call.message.message_id)
|
||||
post_text = html.escape(str(pre_text))
|
||||
|
||||
# Готовим список для удаления
|
||||
post_ids = BotDB.get_post_ids_from_telegram_by_last_id(call.message.message_id)
|
||||
message_ids = [row[0] for row in post_ids]
|
||||
message_ids.append(call.message.message_id)
|
||||
|
||||
# Выкладываем пост в канал
|
||||
await send_media_group_to_channel(bot=call.bot, chat_id=MAIN_PUBLIC, post_content=post_content,
|
||||
post_text=post_text)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_helper_message_id(call.message.message_id)
|
||||
|
||||
# TODO: Удалить фотки с локалки после выкладки?
|
||||
await call.bot.delete_messages(chat_id=GROUP_FOR_POST, message_ids=message_ids)
|
||||
await call.answer(text='Выложено!', cache_time=3)
|
||||
|
||||
await send_text_message(author_id, call.message, 'Твой пост был выложен🥰')
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "decline"
|
||||
)
|
||||
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
|
||||
logger.info(
|
||||
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
|
||||
try:
|
||||
if call.message.content_type == 'text' and call.message.text != "^" or call.message.content_type == 'photo' \
|
||||
or call.message.content_type == 'audio' or call.message.content_type == 'voice' \
|
||||
or call.message.content_type == 'video' or call.message.content_type == 'video_note':
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
|
||||
logger.info(
|
||||
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
|
||||
await call.answer(text='Отклонено!', cache_time=3)
|
||||
await send_text_message(author_id, call.message, 'Твой пост был отклонен😔')
|
||||
if call.message.text == '^':
|
||||
post_ids = BotDB.get_post_ids_from_telegram_by_last_id(call.message.message_id)
|
||||
message_ids = [row[0] for row in post_ids]
|
||||
message_ids.append(call.message.message_id)
|
||||
|
||||
await call.bot.delete_messages(chat_id=GROUP_FOR_POST, message_ids=message_ids)
|
||||
|
||||
# Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
|
||||
author_id = BotDB.get_author_id_by_helper_message_id(call.message.message_id)
|
||||
|
||||
await call.answer(text='Удалено!', cache_time=3)
|
||||
|
||||
await send_text_message(author_id, call.message, 'Твой пост был отклонен😔')
|
||||
except Exception as e:
|
||||
if e.message != 'Forbidden: bot was blocked by the user':
|
||||
await call.bot.send_message(IMPORTANT_LOGS,
|
||||
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
|
||||
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
|
||||
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == "ban"
|
||||
)
|
||||
async def ban_user_from_post(call: CallbackQuery):
|
||||
try:
|
||||
# Получаем информацию о пользователе из сообщения
|
||||
author_id = BotDB.get_author_id_by_message_id(call.message.message_id)
|
||||
user_name = BotDB.get_username(user_id=author_id)
|
||||
full_name = call.message.from_user.full_name if call.message.from_user else "Неизвестно"
|
||||
|
||||
# Устанавливаем причину бана и дату разблокировки (+7 дней)
|
||||
current_date = datetime.now()
|
||||
date_to_unban = current_date + timedelta(days=7)
|
||||
|
||||
# Записываем в базу данных
|
||||
BotDB.set_user_blacklist(
|
||||
user_id=author_id,
|
||||
user_name=user_name,
|
||||
message_for_user="Спам",
|
||||
date_to_unban=date_to_unban
|
||||
)
|
||||
|
||||
# Удаляем пост из предложки
|
||||
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
|
||||
|
||||
# Отправляем сообщение пользователю о блокировке
|
||||
date_str = date_to_unban.strftime("%d.%m.%Y %H:%M")
|
||||
await send_text_message(author_id, call.message, f"Ты заблокирован за спам. Дата разблокировки: {date_str}")
|
||||
|
||||
logger.info(f"Пользователь {author_id} заблокирован за спам до {date_str}")
|
||||
await call.answer(text='Пользователь заблокирован!', cache_time=3)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Ошибка при блокировке пользователя: {str(e)}')
|
||||
await call.answer(text='Ошибка при блокировке!', show_alert=True, cache_time=3)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('ban')
|
||||
)
|
||||
async def process_ban_user(call: CallbackQuery, state: FSMContext):
|
||||
user_id = call.data[4:]
|
||||
logger.info(
|
||||
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
if user_name:
|
||||
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
|
||||
date_to_unban=None)
|
||||
markup = create_keyboard_for_ban_reason()
|
||||
# Экранируем потенциально проблемные символы
|
||||
user_name_escaped = html.escape(str(user_name))
|
||||
full_name_escaped = html.escape(str(call.message.from_user.full_name))
|
||||
await call.message.answer(
|
||||
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\nИмя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
|
||||
reply_markup=markup)
|
||||
await state.set_state('BAN_2')
|
||||
else:
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer(text='Пользователь с таким ID не найден в базе', reply_markup=markup)
|
||||
await state.set_state('ADMIN')
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('unlock')
|
||||
)
|
||||
async def process_unlock_user(call: CallbackQuery):
|
||||
user_id = call.data[7:]
|
||||
user_name = BotDB.get_username(user_id=user_id)
|
||||
delete_user_blacklist(user_id, BotDB)
|
||||
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}")
|
||||
username = BotDB.get_username(user_id)
|
||||
await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data == 'return'
|
||||
)
|
||||
async def return_to_main_menu(call: CallbackQuery):
|
||||
await call.message.delete()
|
||||
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
|
||||
markup = get_reply_keyboard_admin()
|
||||
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
|
||||
reply_markup=markup)
|
||||
|
||||
|
||||
@callback_router.callback_query(
|
||||
F.data.contains('page')
|
||||
)
|
||||
async def change_page(call: CallbackQuery):
|
||||
page_number = int(call.data[5:])
|
||||
logger.info(f"Переход на страницу {page_number}")
|
||||
if call.message.text == 'Список пользователей которые последними обращались к боту':
|
||||
list_users = BotDB.get_last_users_from_db()
|
||||
# TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
|
||||
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
|
||||
'ban')
|
||||
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
else:
|
||||
# Готовим сообщения
|
||||
message_user = get_banned_users_list(int(page_number) * 7 - 7, BotDB)
|
||||
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
text=message_user)
|
||||
|
||||
# Готовим клавиатуру
|
||||
buttons = get_banned_users_buttons(BotDB)
|
||||
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
|
||||
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
|
||||
reply_markup=keyboard)
|
||||
|
||||
@@ -5,10 +5,12 @@ from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder
|
||||
def get_reply_keyboard_for_post():
|
||||
builder = InlineKeyboardBuilder()
|
||||
builder.row(types.InlineKeyboardButton(
|
||||
text="Опубликовать", callback_data="publish")
|
||||
text="Опубликовать", callback_data="publish"),
|
||||
types.InlineKeyboardButton(
|
||||
text="Отклонить", callback_data="decline")
|
||||
)
|
||||
builder.row(types.InlineKeyboardButton(
|
||||
text="Отклонить", callback_data="decline")
|
||||
text="👮♂️ Забанить", callback_data="ban")
|
||||
)
|
||||
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
|
||||
return markup
|
||||
@@ -37,7 +39,7 @@ def get_reply_keyboard_admin():
|
||||
builder.add(types.KeyboardButton(text="Бан (Список)"))
|
||||
builder.add(types.KeyboardButton(text="Бан по нику"))
|
||||
builder.add(types.KeyboardButton(text="Бан по ID"))
|
||||
builder.add(types.KeyboardButton(text="Тестовый бан"))
|
||||
builder.row()
|
||||
builder.add(types.KeyboardButton(text="Разбан (список)"))
|
||||
builder.add(types.KeyboardButton(text="Вернуться в бота"))
|
||||
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
|
||||
@@ -49,7 +51,7 @@ def create_keyboard_with_pagination(page: int, total_items: int, array_items: li
|
||||
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
|
||||
|
||||
Args:
|
||||
page: Номер текущей страницы.
|
||||
page: Номер текущей страницы (начинается с 1).
|
||||
total_items: Общее количество элементов.
|
||||
array_items: Лист кортежей. Содержит в себе user_name: user_id
|
||||
callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
|
||||
@@ -57,34 +59,75 @@ def create_keyboard_with_pagination(page: int, total_items: int, array_items: li
|
||||
Returns:
|
||||
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
|
||||
"""
|
||||
|
||||
# Проверяем валидность входных данных
|
||||
if page < 1:
|
||||
page = 1
|
||||
if not array_items:
|
||||
# Если нет элементов, возвращаем только кнопку "Назад"
|
||||
keyboard = InlineKeyboardBuilder()
|
||||
home_button = types.InlineKeyboardButton(text="🏠 Назад", callback_data="return")
|
||||
keyboard.row(home_button)
|
||||
return keyboard.as_markup()
|
||||
|
||||
# Определяем общее количество страниц
|
||||
total_pages = (total_items + 9 - 1) // 9
|
||||
items_per_page = 9
|
||||
total_pages = (total_items + items_per_page - 1) // items_per_page
|
||||
|
||||
# Ограничиваем страницу максимальным значением
|
||||
if page > total_pages:
|
||||
page = total_pages
|
||||
|
||||
# Создаем билдер для клавиатуры
|
||||
keyboard = InlineKeyboardBuilder()
|
||||
|
||||
# Вычисляем стартовый номер для текущей страницы
|
||||
start_index = (page - 1) * 9
|
||||
|
||||
# Кнопки с номерами страниц
|
||||
for i in range(start_index, min(start_index + 9, len(array_items))):
|
||||
keyboard.add(types.InlineKeyboardButton(
|
||||
start_index = (page - 1) * items_per_page
|
||||
|
||||
# Кнопки с элементами текущей страницы
|
||||
end_index = min(start_index + items_per_page, len(array_items))
|
||||
current_row = []
|
||||
|
||||
for i in range(start_index, end_index):
|
||||
current_row.append(types.InlineKeyboardButton(
|
||||
text=f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"
|
||||
))
|
||||
keyboard.adjust(3)
|
||||
|
||||
next_button = types.InlineKeyboardButton(
|
||||
text="➡️ Следующая", callback_data=f"page_{page + 1}"
|
||||
)
|
||||
prev_button = types.InlineKeyboardButton(
|
||||
text="⬅️ Предыдущая", callback_data=f"page_{page - 1}"
|
||||
)
|
||||
keyboard.row(prev_button, next_button)
|
||||
home_button = types.InlineKeyboardButton(
|
||||
text="🏠 Назад", callback_data="return")
|
||||
|
||||
# Когда набирается 3 кнопки, добавляем ряд
|
||||
if len(current_row) == 3:
|
||||
keyboard.row(*current_row)
|
||||
current_row = []
|
||||
|
||||
# Добавляем оставшиеся кнопки, если они есть
|
||||
if current_row:
|
||||
keyboard.row(*current_row)
|
||||
|
||||
# Создаем кнопки навигации только если нужно
|
||||
navigation_buttons = []
|
||||
|
||||
# Кнопка "Предыдущая" - показываем только если не первая страница
|
||||
if page > 1:
|
||||
prev_button = types.InlineKeyboardButton(
|
||||
text="⬅️ Предыдущая", callback_data=f"page_{page - 1}"
|
||||
)
|
||||
navigation_buttons.append(prev_button)
|
||||
|
||||
# Кнопка "Следующая" - показываем только если не последняя страница
|
||||
if page < total_pages:
|
||||
next_button = types.InlineKeyboardButton(
|
||||
text="➡️ Следующая", callback_data=f"page_{page + 1}"
|
||||
)
|
||||
navigation_buttons.append(next_button)
|
||||
|
||||
# Добавляем кнопки навигации, если они есть
|
||||
if navigation_buttons:
|
||||
keyboard.row(*navigation_buttons)
|
||||
|
||||
# Кнопка "Назад"
|
||||
home_button = types.InlineKeyboardButton(text="🏠 Назад", callback_data="return")
|
||||
keyboard.row(home_button)
|
||||
k = keyboard.as_markup()
|
||||
return k
|
||||
|
||||
return keyboard.as_markup()
|
||||
|
||||
|
||||
def create_keyboard_for_ban_reason():
|
||||
|
||||
@@ -16,6 +16,6 @@ async def start_bot(bdf):
|
||||
link_preview_is_disabled=bdf.settings['Telegram']['preview_link']
|
||||
), timeout=30.0) # Добавляем таймаут для предотвращения зависаний
|
||||
dp = Dispatcher(storage=MemoryStorage(), fsm_strategy=FSMStrategy.GLOBAL_USER)
|
||||
dp.include_routers(private_router, callback_router, group_router, admin_router)
|
||||
dp.include_routers(admin_router, private_router, callback_router, group_router)
|
||||
await bot.delete_webhook(drop_pending_updates=True)
|
||||
await dp.start_polling(bot, skip_updates=True)
|
||||
|
||||
@@ -9,7 +9,6 @@ class StateUser(StatesGroup):
|
||||
PRE_CHAT = State()
|
||||
PRE_BAN = State()
|
||||
PRE_BAN_ID = State()
|
||||
PRE_BAN_FORWARD = State()
|
||||
BAN_2 = State()
|
||||
BAN_3 = State()
|
||||
BAN_4 = State()
|
||||
|
||||
@@ -5,7 +5,8 @@ from aiogram.types import ReplyKeyboardMarkup, KeyboardButton, InlineKeyboardMar
|
||||
from helper_bot.keyboards.keyboards import (
|
||||
get_reply_keyboard,
|
||||
get_reply_keyboard_for_post,
|
||||
get_reply_keyboard_leave_chat
|
||||
get_reply_keyboard_leave_chat,
|
||||
create_keyboard_with_pagination
|
||||
)
|
||||
from helper_bot.filters.main import ChatTypeFilter
|
||||
from database.db import BotDB
|
||||
@@ -326,5 +327,125 @@ class TestKeyboardIntegration:
|
||||
assert 'Выйти из чата' in leave_buttons
|
||||
|
||||
|
||||
class TestPagination:
|
||||
"""Тесты для функции create_keyboard_with_pagination"""
|
||||
|
||||
def test_pagination_empty_list(self):
|
||||
"""Тест с пустым списком элементов"""
|
||||
keyboard = create_keyboard_with_pagination(1, 0, [], 'test')
|
||||
assert keyboard is not None
|
||||
# Проверяем, что есть только кнопка "Назад"
|
||||
assert len(keyboard.inline_keyboard) == 1
|
||||
assert keyboard.inline_keyboard[0][0].text == "🏠 Назад"
|
||||
|
||||
def test_pagination_single_page(self):
|
||||
"""Тест с одной страницей"""
|
||||
items = [("User1", 1), ("User2", 2), ("User3", 3)]
|
||||
keyboard = create_keyboard_with_pagination(1, 3, items, 'test')
|
||||
|
||||
# Проверяем количество кнопок (3 пользователя + кнопка "Назад")
|
||||
assert len(keyboard.inline_keyboard) == 2 # 1 ряд с пользователями + 1 ряд с "Назад"
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # 3 пользователя в первом ряду
|
||||
assert keyboard.inline_keyboard[1][0].text == "🏠 Назад"
|
||||
|
||||
# Проверяем, что нет кнопок навигации
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # только пользователи
|
||||
|
||||
def test_pagination_multiple_pages(self):
|
||||
"""Тест с несколькими страницами"""
|
||||
items = [("User" + str(i), i) for i in range(1, 15)] # 14 пользователей
|
||||
keyboard = create_keyboard_with_pagination(1, 14, items, 'test')
|
||||
|
||||
# На первой странице должно быть 9 пользователей (3 ряда по 3) + кнопка "Следующая" + "Назад"
|
||||
assert len(keyboard.inline_keyboard) == 5 # 3 ряда пользователей + навигация + назад
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # первый ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[1]) == 3 # второй ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[2]) == 3 # третий ряд: 3 пользователя
|
||||
assert keyboard.inline_keyboard[3][0].text == "➡️ Следующая" # кнопка навигации
|
||||
assert keyboard.inline_keyboard[4][0].text == "🏠 Назад" # кнопка назад
|
||||
|
||||
def test_pagination_second_page(self):
|
||||
"""Тест второй страницы"""
|
||||
items = [("User" + str(i), i) for i in range(1, 15)] # 14 пользователей
|
||||
keyboard = create_keyboard_with_pagination(2, 14, items, 'test')
|
||||
|
||||
# На второй странице должно быть 5 пользователей (2 ряда: 3+2) + кнопки "Предыдущая" и "Назад"
|
||||
assert len(keyboard.inline_keyboard) == 4 # 2 ряда пользователей + навигация + назад
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # первый ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[1]) == 2 # второй ряд: 2 пользователя
|
||||
assert keyboard.inline_keyboard[2][0].text == "⬅️ Предыдущая"
|
||||
assert keyboard.inline_keyboard[3][0].text == "🏠 Назад"
|
||||
|
||||
def test_pagination_middle_page(self):
|
||||
"""Тест средней страницы"""
|
||||
items = [("User" + str(i), i) for i in range(1, 25)] # 24 пользователя
|
||||
keyboard = create_keyboard_with_pagination(2, 24, items, 'test')
|
||||
|
||||
# На второй странице должно быть 9 пользователей (3 ряда по 3) + кнопки "Предыдущая" и "Следующая"
|
||||
assert len(keyboard.inline_keyboard) == 5 # 3 ряда пользователей + навигация + назад
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # первый ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[1]) == 3 # второй ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[2]) == 3 # третий ряд: 3 пользователя
|
||||
assert keyboard.inline_keyboard[3][0].text == "⬅️ Предыдущая"
|
||||
assert keyboard.inline_keyboard[3][1].text == "➡️ Следующая"
|
||||
|
||||
def test_pagination_invalid_page_number(self):
|
||||
"""Тест с некорректным номером страницы"""
|
||||
items = [("User" + str(i), i) for i in range(1, 10)] # 9 пользователей
|
||||
keyboard = create_keyboard_with_pagination(0, 9, items, 'test') # страница 0
|
||||
|
||||
# Должна вернуться первая страница
|
||||
assert len(keyboard.inline_keyboard) == 4 # 3 ряда пользователей + назад
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # первый ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[1]) == 3 # второй ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[2]) == 3 # третий ряд: 3 пользователя
|
||||
|
||||
def test_pagination_page_out_of_range(self):
|
||||
"""Тест с номером страницы больше максимального"""
|
||||
items = [("User" + str(i), i) for i in range(1, 10)] # 9 пользователей
|
||||
keyboard = create_keyboard_with_pagination(5, 9, items, 'test') # страница 5 при 1 странице
|
||||
|
||||
# Должна вернуться первая страница
|
||||
assert len(keyboard.inline_keyboard) == 4 # 3 ряда пользователей + назад
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # первый ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[1]) == 3 # второй ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[2]) == 3 # третий ряд: 3 пользователя
|
||||
|
||||
def test_pagination_callback_data_format(self):
|
||||
"""Тест формата callback_data"""
|
||||
items = [("User1", 123), ("User2", 456)]
|
||||
keyboard = create_keyboard_with_pagination(1, 2, items, 'ban')
|
||||
|
||||
# Проверяем формат callback_data для пользователей
|
||||
assert keyboard.inline_keyboard[0][0].callback_data == "ban_123"
|
||||
assert keyboard.inline_keyboard[0][1].callback_data == "ban_456"
|
||||
|
||||
# Проверяем формат callback_data для кнопки "Назад"
|
||||
assert keyboard.inline_keyboard[1][0].callback_data == "return"
|
||||
|
||||
def test_pagination_navigation_callback_data(self):
|
||||
"""Тест callback_data для кнопок навигации"""
|
||||
items = [("User" + str(i), i) for i in range(1, 15)] # 14 пользователей
|
||||
keyboard = create_keyboard_with_pagination(2, 14, items, 'test')
|
||||
|
||||
# Проверяем callback_data для кнопки "Предыдущая"
|
||||
assert keyboard.inline_keyboard[2][0].callback_data == "page_1"
|
||||
|
||||
# Проверяем callback_data для кнопки "Назад"
|
||||
assert keyboard.inline_keyboard[3][0].callback_data == "return"
|
||||
|
||||
def test_pagination_exactly_items_per_page(self):
|
||||
"""Тест когда количество элементов точно равно items_per_page"""
|
||||
items = [("User" + str(i), i) for i in range(1, 10)] # ровно 9 пользователей
|
||||
keyboard = create_keyboard_with_pagination(1, 9, items, 'test')
|
||||
|
||||
# Должна быть только одна страница без кнопок навигации
|
||||
assert len(keyboard.inline_keyboard) == 4 # 3 ряда пользователей + назад
|
||||
assert len(keyboard.inline_keyboard[0]) == 3 # первый ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[1]) == 3 # второй ряд: 3 пользователя
|
||||
assert len(keyboard.inline_keyboard[2]) == 3 # третий ряд: 3 пользователя
|
||||
assert keyboard.inline_keyboard[3][0].text == "🏠 Назад"
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
|
||||
@@ -1,11 +1,33 @@
|
||||
import pytest
|
||||
from unittest.mock import Mock, patch
|
||||
from unittest.mock import Mock, patch, AsyncMock
|
||||
from datetime import datetime
|
||||
import os
|
||||
|
||||
from helper_bot.utils.helper_func import (
|
||||
get_first_name,
|
||||
get_text_message,
|
||||
check_username_and_full_name
|
||||
check_username_and_full_name,
|
||||
safe_html_escape,
|
||||
download_file,
|
||||
prepare_media_group_from_middlewares,
|
||||
add_in_db_media_mediagroup,
|
||||
add_in_db_media,
|
||||
send_media_group_message_to_private_chat,
|
||||
send_media_group_to_channel,
|
||||
send_text_message,
|
||||
send_photo_message,
|
||||
send_video_message,
|
||||
send_video_note_message,
|
||||
send_audio_message,
|
||||
send_voice_message,
|
||||
check_access,
|
||||
add_days_to_date,
|
||||
get_banned_users_list,
|
||||
get_banned_users_buttons,
|
||||
delete_user_blacklist,
|
||||
update_user_info,
|
||||
check_user_emoji,
|
||||
get_random_emoji
|
||||
)
|
||||
from helper_bot.utils.messages import get_message
|
||||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance
|
||||
@@ -83,6 +105,40 @@ class TestHelperFunctions:
|
||||
assert result is True
|
||||
|
||||
|
||||
class TestSafeHtmlEscape:
|
||||
"""Тесты для функции безопасного экранирования HTML"""
|
||||
|
||||
def test_safe_html_escape_normal_text(self):
|
||||
"""Тест экранирования обычного текста"""
|
||||
result = safe_html_escape("Hello World")
|
||||
assert result == "Hello World"
|
||||
|
||||
def test_safe_html_escape_html_tags(self):
|
||||
"""Тест экранирования HTML тегов"""
|
||||
result = safe_html_escape("<script>alert('xss')</script>")
|
||||
assert result == "<script>alert('xss')</script>"
|
||||
|
||||
def test_safe_html_escape_special_chars(self):
|
||||
"""Тест экранирования специальных символов"""
|
||||
result = safe_html_escape("& < > \" '")
|
||||
assert result == "& < > " '"
|
||||
|
||||
def test_safe_html_escape_none_input(self):
|
||||
"""Тест экранирования None значения"""
|
||||
result = safe_html_escape(None)
|
||||
assert result == ""
|
||||
|
||||
def test_safe_html_escape_empty_string(self):
|
||||
"""Тест экранирования пустой строки"""
|
||||
result = safe_html_escape("")
|
||||
assert result == ""
|
||||
|
||||
def test_safe_html_escape_non_string_input(self):
|
||||
"""Тест экранирования нестрокового ввода"""
|
||||
result = safe_html_escape(123)
|
||||
assert result == "123"
|
||||
|
||||
|
||||
class TestMessages:
|
||||
"""Тесты для системы сообщений"""
|
||||
|
||||
@@ -204,5 +260,422 @@ class TestConfigurationHandling:
|
||||
pass
|
||||
|
||||
|
||||
class TestDownloadFile:
|
||||
"""Тесты для функции скачивания файлов"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_file_success(self):
|
||||
"""Тест успешного скачивания файла"""
|
||||
mock_message = Mock()
|
||||
mock_message.bot = AsyncMock()
|
||||
|
||||
# Мокаем get_file
|
||||
mock_file = Mock()
|
||||
mock_file.file_path = "photos/file_123.jpg"
|
||||
mock_message.bot.get_file.return_value = mock_file
|
||||
|
||||
# Мокаем download_file
|
||||
mock_message.bot.download_file = AsyncMock()
|
||||
|
||||
# Мокаем os.makedirs
|
||||
with patch('os.makedirs') as mock_makedirs:
|
||||
with patch('os.path.join', return_value="files/photos/file_123.jpg"):
|
||||
result = await download_file(mock_message, "file_id_123")
|
||||
|
||||
assert result == "files/photos/file_123.jpg"
|
||||
mock_makedirs.assert_called()
|
||||
mock_message.bot.get_file.assert_called_once_with("file_id_123")
|
||||
mock_message.bot.download_file.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_download_file_exception(self):
|
||||
"""Тест обработки ошибки при скачивании"""
|
||||
mock_message = Mock()
|
||||
mock_message.bot = AsyncMock()
|
||||
mock_message.bot.get_file.side_effect = Exception("Network error")
|
||||
|
||||
with patch('os.makedirs'):
|
||||
with patch('helper_bot.utils.helper_func.logger') as mock_logger:
|
||||
result = await download_file(mock_message, "file_id_123")
|
||||
|
||||
assert result is None
|
||||
mock_logger.error.assert_called_once()
|
||||
|
||||
|
||||
class TestPrepareMediaGroup:
|
||||
"""Тесты для подготовки медиагрупп"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepare_media_group_photos(self):
|
||||
"""Тест подготовки медиагруппы с фотографиями"""
|
||||
album = []
|
||||
for i in range(3):
|
||||
message = Mock()
|
||||
message.photo = [Mock()]
|
||||
message.photo[-1].file_id = f"photo_{i}"
|
||||
album.append(message)
|
||||
|
||||
result = await prepare_media_group_from_middlewares(album, "Тестовая подпись")
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].media == "photo_0"
|
||||
assert result[1].media == "photo_1"
|
||||
assert result[2].media == "photo_2"
|
||||
assert result[2].caption == "Тестовая подпись"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepare_media_group_mixed_types(self):
|
||||
"""Тест подготовки медиагруппы с разными типами медиа"""
|
||||
album = []
|
||||
|
||||
# Фото
|
||||
photo_message = Mock()
|
||||
photo_message.photo = [Mock()]
|
||||
photo_message.photo[-1].file_id = "photo_1"
|
||||
album.append(photo_message)
|
||||
|
||||
# Видео
|
||||
video_message = Mock()
|
||||
video_message.photo = None
|
||||
video_message.video = Mock()
|
||||
video_message.video.file_id = "video_1"
|
||||
album.append(video_message)
|
||||
|
||||
# Аудио
|
||||
audio_message = Mock()
|
||||
audio_message.photo = None
|
||||
audio_message.video = None
|
||||
audio_message.audio = Mock()
|
||||
audio_message.audio.file_id = "audio_1"
|
||||
album.append(audio_message)
|
||||
|
||||
result = await prepare_media_group_from_middlewares(album, "Смешанная группа")
|
||||
|
||||
assert len(result) == 3
|
||||
assert result[0].media == "photo_1"
|
||||
assert result[1].media == "video_1"
|
||||
assert result[2].media == "audio_1"
|
||||
assert result[2].caption == "Смешанная группа"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepare_media_group_empty_album(self):
|
||||
"""Тест подготовки пустой медиагруппы"""
|
||||
album = []
|
||||
result = await prepare_media_group_from_middlewares(album, "Пустая группа")
|
||||
assert result == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_prepare_media_group_unsupported_type(self):
|
||||
"""Тест подготовки медиагруппы с неподдерживаемым типом"""
|
||||
album = []
|
||||
message = Mock()
|
||||
message.photo = None
|
||||
message.video = None
|
||||
message.audio = None
|
||||
album.append(message)
|
||||
|
||||
result = await prepare_media_group_from_middlewares(album, "Тест")
|
||||
assert result == []
|
||||
|
||||
|
||||
class TestMediaDatabaseOperations:
|
||||
"""Тесты для операций с медиа в базе данных"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_in_db_media_mediagroup(self):
|
||||
"""Тест добавления медиагруппы в базу данных"""
|
||||
sent_message = []
|
||||
for i in range(2):
|
||||
message = Mock()
|
||||
message.message_id = i + 1
|
||||
message.photo = [Mock()]
|
||||
message.photo[-1].file_id = f"photo_{i}"
|
||||
sent_message.append(message)
|
||||
|
||||
mock_db = Mock()
|
||||
|
||||
with patch('helper_bot.utils.helper_func.download_file', return_value=f"files/photo_{i}.jpg"):
|
||||
await add_in_db_media_mediagroup(sent_message, mock_db)
|
||||
|
||||
assert mock_db.add_post_content_in_db.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_in_db_media_photo(self):
|
||||
"""Тест добавления фото в базу данных"""
|
||||
mock_message = Mock()
|
||||
mock_message.message_id = 123
|
||||
mock_message.photo = [Mock()]
|
||||
mock_message.photo[-1].file_id = "photo_123"
|
||||
|
||||
mock_db = Mock()
|
||||
|
||||
with patch('helper_bot.utils.helper_func.download_file', return_value="files/photo_123.jpg"):
|
||||
await add_in_db_media(mock_message, mock_db)
|
||||
|
||||
mock_db.add_post_content_in_db.assert_called_once_with(
|
||||
123, 123, "files/photo_123.jpg", 'photo'
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_in_db_media_video(self):
|
||||
"""Тест добавления видео в базу данных"""
|
||||
mock_message = Mock()
|
||||
mock_message.message_id = 123
|
||||
mock_message.photo = None # У видео нет фото
|
||||
mock_message.video = Mock()
|
||||
mock_message.video.file_id = "video_123"
|
||||
|
||||
mock_db = Mock()
|
||||
|
||||
with patch('helper_bot.utils.helper_func.download_file', return_value="files/video_123.mp4"):
|
||||
await add_in_db_media(mock_message, mock_db)
|
||||
|
||||
mock_db.add_post_content_in_db.assert_called_once_with(
|
||||
123, 123, "files/video_123.mp4", 'video'
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_in_db_media_voice(self):
|
||||
"""Тест добавления голосового сообщения в базу данных"""
|
||||
mock_message = Mock()
|
||||
mock_message.message_id = 123
|
||||
mock_message.photo = None # У голосового сообщения нет фото
|
||||
mock_message.video = None # У голосового сообщения нет видео
|
||||
mock_message.voice = Mock()
|
||||
mock_message.voice.file_id = "voice_123"
|
||||
|
||||
mock_db = Mock()
|
||||
|
||||
with patch('helper_bot.utils.helper_func.download_file', return_value="files/voice_123.ogg"):
|
||||
await add_in_db_media(mock_message, mock_db)
|
||||
|
||||
mock_db.add_post_content_in_db.assert_called_once_with(
|
||||
123, 123, "files/voice_123.ogg", 'voice'
|
||||
)
|
||||
|
||||
|
||||
class TestSendMessageFunctions:
|
||||
"""Тесты для функций отправки сообщений"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_text_message_without_markup(self):
|
||||
"""Тест отправки текстового сообщения без разметки"""
|
||||
mock_message = Mock()
|
||||
mock_message.bot = AsyncMock()
|
||||
mock_message.bot.send_message = AsyncMock()
|
||||
|
||||
mock_sent_message = Mock()
|
||||
mock_sent_message.message_id = 456
|
||||
mock_message.bot.send_message.return_value = mock_sent_message
|
||||
|
||||
result = await send_text_message(123, mock_message, "Тестовое сообщение")
|
||||
|
||||
assert result == 456
|
||||
mock_message.bot.send_message.assert_called_once_with(
|
||||
chat_id=123,
|
||||
text="Тестовое сообщение"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_text_message_with_markup(self):
|
||||
"""Тест отправки текстового сообщения с разметкой"""
|
||||
mock_message = Mock()
|
||||
mock_message.bot = AsyncMock()
|
||||
mock_message.bot.send_message = AsyncMock()
|
||||
|
||||
mock_markup = Mock()
|
||||
mock_sent_message = Mock()
|
||||
mock_sent_message.message_id = 456
|
||||
mock_message.bot.send_message.return_value = mock_sent_message
|
||||
|
||||
result = await send_text_message(123, mock_message, "Тестовое сообщение", mock_markup)
|
||||
|
||||
assert result == 456
|
||||
mock_message.bot.send_message.assert_called_once_with(
|
||||
chat_id=123,
|
||||
text="Тестовое сообщение",
|
||||
reply_markup=mock_markup
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_photo_message(self):
|
||||
"""Тест отправки фото"""
|
||||
mock_message = Mock()
|
||||
mock_message.bot = AsyncMock()
|
||||
mock_message.bot.send_photo = AsyncMock()
|
||||
|
||||
mock_sent_message = Mock()
|
||||
mock_message.bot.send_photo.return_value = mock_sent_message
|
||||
|
||||
result = await send_photo_message(123, mock_message, "photo.jpg", "Подпись к фото")
|
||||
|
||||
assert result == mock_sent_message
|
||||
mock_message.bot.send_photo.assert_called_once_with(
|
||||
chat_id=123,
|
||||
caption="Подпись к фото",
|
||||
photo="photo.jpg"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_send_video_message(self):
|
||||
"""Тест отправки видео"""
|
||||
mock_message = Mock()
|
||||
mock_message.bot = AsyncMock()
|
||||
mock_message.bot.send_video = AsyncMock()
|
||||
|
||||
mock_sent_message = Mock()
|
||||
mock_message.bot.send_video.return_value = mock_sent_message
|
||||
|
||||
result = await send_video_message(123, mock_message, "video.mp4", "Подпись к видео")
|
||||
|
||||
assert result == mock_sent_message
|
||||
mock_message.bot.send_video.assert_called_once_with(
|
||||
chat_id=123,
|
||||
caption="Подпись к видео",
|
||||
video="video.mp4"
|
||||
)
|
||||
|
||||
|
||||
class TestUtilityFunctions:
|
||||
"""Тесты для утилитарных функций"""
|
||||
|
||||
def test_check_access(self):
|
||||
"""Тест проверки доступа"""
|
||||
mock_db = Mock()
|
||||
mock_db.is_admin.return_value = True
|
||||
|
||||
result = check_access(123, mock_db)
|
||||
assert result is True
|
||||
|
||||
mock_db.is_admin.return_value = False
|
||||
result = check_access(123, mock_db)
|
||||
assert result is False
|
||||
|
||||
def test_add_days_to_date(self):
|
||||
"""Тест добавления дней к дате"""
|
||||
with patch('helper_bot.utils.helper_func.datetime') as mock_datetime:
|
||||
from datetime import timedelta
|
||||
mock_now = datetime(2024, 1, 1)
|
||||
mock_datetime.now.return_value = mock_now
|
||||
mock_datetime.timedelta = timedelta
|
||||
|
||||
result = add_days_to_date(5)
|
||||
expected_date = (mock_now + timedelta(days=5)).strftime("%d-%m-%Y")
|
||||
assert result == expected_date
|
||||
|
||||
def test_get_banned_users_list(self):
|
||||
"""Тест получения списка заблокированных пользователей"""
|
||||
mock_db = Mock()
|
||||
mock_db.get_banned_users_from_db_with_limits.return_value = [
|
||||
("User1", 123, "Spam", "01-01-2025"),
|
||||
("User2", 456, "Violation", "02-01-2025")
|
||||
]
|
||||
|
||||
result = get_banned_users_list(0, mock_db)
|
||||
|
||||
assert "Список заблокированных пользователей:" in result
|
||||
assert "User1" in result
|
||||
assert "User2" in result
|
||||
assert "Spam" in result
|
||||
assert "Violation" in result
|
||||
|
||||
def test_get_banned_users_buttons(self):
|
||||
"""Тест получения кнопок заблокированных пользователей"""
|
||||
mock_db = Mock()
|
||||
mock_db.get_banned_users_from_db.return_value = [
|
||||
("User1", 123),
|
||||
("User2", 456)
|
||||
]
|
||||
|
||||
result = get_banned_users_buttons(mock_db)
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0] == ("User1", 123)
|
||||
assert result[1] == ("User2", 456)
|
||||
|
||||
def test_delete_user_blacklist(self):
|
||||
"""Тест удаления пользователя из черного списка"""
|
||||
mock_db = Mock()
|
||||
mock_db.delete_user_blacklist.return_value = True
|
||||
|
||||
result = delete_user_blacklist(123, mock_db)
|
||||
assert result is True
|
||||
|
||||
mock_db.delete_user_blacklist.assert_called_once_with(user_id=123)
|
||||
|
||||
|
||||
class TestUserManagement:
|
||||
"""Тесты для управления пользователями"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_user_info_new_user(self):
|
||||
"""Тест обновления информации о новом пользователе"""
|
||||
mock_message = Mock()
|
||||
mock_message.from_user.id = 123
|
||||
mock_message.from_user.full_name = "Test User"
|
||||
mock_message.from_user.username = "testuser"
|
||||
mock_message.from_user.is_bot = False
|
||||
mock_message.from_user.language_code = "ru"
|
||||
mock_message.answer = AsyncMock()
|
||||
mock_message.bot.send_message = AsyncMock()
|
||||
|
||||
with patch('helper_bot.utils.helper_func.get_first_name', return_value="Test"):
|
||||
with patch('helper_bot.utils.helper_func.get_random_emoji', return_value="😀"):
|
||||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||||
mock_bot_db.user_exists.return_value = False
|
||||
mock_bot_db.add_new_user_in_db = Mock()
|
||||
mock_bot_db.update_date_for_user = Mock()
|
||||
|
||||
await update_user_info("test", mock_message)
|
||||
|
||||
mock_bot_db.add_new_user_in_db.assert_called_once()
|
||||
mock_bot_db.update_date_for_user.assert_called_once()
|
||||
|
||||
def test_check_user_emoji_existing(self):
|
||||
"""Тест проверки эмодзи пользователя (существующий)"""
|
||||
mock_message = Mock()
|
||||
mock_message.from_user.id = 123
|
||||
|
||||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||||
mock_bot_db.check_emoji_for_user.return_value = "😀"
|
||||
|
||||
result = check_user_emoji(mock_message)
|
||||
assert result == "😀"
|
||||
|
||||
def test_check_user_emoji_new(self):
|
||||
"""Тест проверки эмодзи пользователя (новый)"""
|
||||
mock_message = Mock()
|
||||
mock_message.from_user.id = 123
|
||||
|
||||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||||
mock_bot_db.check_emoji_for_user.return_value = None
|
||||
mock_bot_db.update_emoji_for_user = Mock()
|
||||
|
||||
with patch('helper_bot.utils.helper_func.get_random_emoji', return_value="😀"):
|
||||
result = check_user_emoji(mock_message)
|
||||
assert result == "😀"
|
||||
mock_bot_db.update_emoji_for_user.assert_called_once_with(user_id=123, emoji="😀")
|
||||
|
||||
def test_get_random_emoji_success(self):
|
||||
"""Тест получения случайного эмодзи (успех)"""
|
||||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||||
mock_bot_db.check_emoji.return_value = False
|
||||
|
||||
with patch('helper_bot.utils.helper_func.random.choice', return_value="😀"):
|
||||
result = get_random_emoji()
|
||||
assert result == "😀"
|
||||
|
||||
def test_get_random_emoji_fallback(self):
|
||||
"""Тест получения случайного эмодзи (fallback)"""
|
||||
with patch('helper_bot.utils.helper_func.BotDB') as mock_bot_db:
|
||||
mock_bot_db.check_emoji.return_value = True # Все эмодзи заняты
|
||||
|
||||
with patch('helper_bot.utils.helper_func.random.choice', return_value="😀"):
|
||||
with patch('helper_bot.utils.helper_func.logger') as mock_logger:
|
||||
result = get_random_emoji()
|
||||
assert result == "Эмоджи не определен"
|
||||
mock_logger.error.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
|
||||
Reference in New Issue
Block a user