import random import traceback import asyncio import html from datetime import datetime from pathlib import Path from aiogram import types, Router, F from aiogram.filters import Command, StateFilter from aiogram.fsm.context import FSMContext from aiogram.types import FSInputFile from helper_bot.filters.main import ChatTypeFilter from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post from helper_bot.keyboards.keyboards import get_reply_keyboard_leave_chat from helper_bot.middlewares.album_middleware import AlbumMiddleware from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware from helper_bot.utils import messages from helper_bot.utils.base_dependency_factory import get_global_instance from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \ send_media_group_message_to_private_chat, prepare_media_group_from_middlewares, send_video_message, \ send_video_note_message, send_audio_message, send_voice_message, add_in_db_media, \ check_user_emoji, check_username_and_full_name from logs.custom_logger import logger private_router = Router() private_router.message.middleware(AlbumMiddleware()) private_router.message.middleware(BlacklistMiddleware()) bdf = get_global_instance() GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] LOGS = bdf.settings['Settings']['logs'] TEST = bdf.settings['Settings']['test'] BotDB = bdf.get_db() # Expose sleep for tests (tests patch helper_bot.handlers.private.private_handlers.sleep) sleep = asyncio.sleep @private_router.message( ChatTypeFilter(chat_type=["private"]), Command("emoji") ) async def handle_emoji_message(message: types.Message, state: FSMContext): await message.forward(chat_id=GROUP_FOR_LOGS) user_emoji = check_user_emoji(message) await state.set_state("START") if user_emoji is not None: await message.answer(f'Твоя эмодзя - {user_emoji}', parse_mode='HTML') @private_router.message( ChatTypeFilter(chat_type=["private"]), Command("restart") ) async def handle_restart_message(message: types.Message, state: FSMContext): try: markup = get_reply_keyboard(BotDB, message.from_user.id) await message.forward(chat_id=GROUP_FOR_LOGS) await state.set_state("START") await update_user_info('love', message) check_user_emoji(message) await message.answer('Я перезапущен!', reply_markup=markup, parse_mode='HTML') except Exception as e: logger.error(f"Произошла ошибка handle_restart_message. Ошибка:{str(e)}") await message.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка handle_restart_message: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message( ChatTypeFilter(chat_type=["private"]), Command("start") ) @private_router.message( ChatTypeFilter(chat_type=["private"]), F.text == 'Вернуться в бота' ) async def handle_start_message(message: types.Message, state: FSMContext): try: await message.forward(chat_id=GROUP_FOR_LOGS) full_name = message.from_user.full_name username = message.from_user.username first_name = get_first_name(message) is_bot = message.from_user.is_bot language_code = message.from_user.language_code user_id = message.from_user.id # Проверяем наличие username для логирования if not username: # Экранируем full_name для безопасного использования safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь" await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Пользователь {user_id} ({safe_full_name}) обратился к боту без username') logger.warning(f"Пользователь {user_id} ({safe_full_name}) обратился к боту без username") # Устанавливаем значение по умолчанию для username username = "private_username" current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") if not BotDB.user_exists(user_id): # Для первоначального добавления эмодзи пока не назначаем (совместимость) BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, "", date, date) else: is_need_update = check_username_and_full_name(user_id, username, full_name, BotDB) if is_need_update: BotDB.update_username_and_full_name(user_id, username, full_name) # Экранируем пользовательские данные для безопасного использования safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь" safe_username = html.escape(username) if username else "Без никнейма" await message.answer( f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}") await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}') await asyncio.sleep(1) BotDB.update_date_for_user(date, user_id) await state.set_state("START") logger.info( f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} " f"Имя автора сообщения: {message.from_user.full_name})") name_stick_hello = list(Path('Stick').rglob('Hello_*')) random_stick_hello = random.choice(name_stick_hello) random_stick_hello = FSInputFile(path=random_stick_hello) logger.info(f"Стикер успешно получен из БД") await message.answer_sticker(random_stick_hello) await asyncio.sleep(0.3) except Exception as e: logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}") await message.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка при получении стикеров: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") try: markup = get_reply_keyboard(BotDB, message.from_user.id) hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE') await message.answer(hello_message, reply_markup=markup, parse_mode='HTML') except Exception as e: logger.error( f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}") await message.bot.send_message(IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message( ChatTypeFilter(chat_type=["private"]), Command("restart") ) async def restart_function(message: types.Message, state: FSMContext): await message.forward(chat_id=GROUP_FOR_LOGS) full_name = message.from_user.full_name username = message.from_user.username user_id = message.from_user.id # Проверяем наличие username для логирования if not username: # Экранируем full_name для безопасного использования safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь" await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Пользователь {user_id} ({safe_full_name}) обратился к боту без username') logger.warning(f"Пользователь {user_id} ({safe_full_name}) обратился к боту без username") # Устанавливаем значение по умолчанию для username username = "private_username" markup = get_reply_keyboard(BotDB, message.from_user.id) await message.answer(text='Я перезапущен!', reply_markup=markup) await state.set_state('START') @private_router.message( StateFilter("START"), ChatTypeFilter(chat_type=["private"]), F.text == '📢Предложить свой пост' ) async def suggest_post(message: types.Message, state: FSMContext): try: user_id = message.from_user.id current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") BotDB.update_date_for_user(date, user_id) await message.forward(chat_id=GROUP_FOR_LOGS) await state.set_state("SUGGEST") current_state = await state.get_state() # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.info( f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {safe_full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}") markup = types.ReplyKeyboardRemove() suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS') await message.answer(suggest_news) await asyncio.sleep(0.3) suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2') await message.answer(suggest_news_2, reply_markup=markup) except Exception as e: await message.bot.send_message(IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message( ChatTypeFilter(chat_type=["private"]), F.text == '👋🏼Сказать пока!' ) @private_router.message( ChatTypeFilter(chat_type=["private"]), F.text == 'Выйти из чата' ) async def end_message(message: types.Message, state: FSMContext): try: user_id = message.from_user.id current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") BotDB.update_date_for_user(date, user_id) await message.forward(chat_id=GROUP_FOR_LOGS) # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.info( f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") name_stick_bye = list(Path('Stick').rglob('Universal_*')) random_stick_bye = random.choice(name_stick_bye) random_stick_bye = FSInputFile(path=random_stick_bye) await message.answer_sticker(random_stick_bye) except Exception as e: # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.error( f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") await message.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") try: markup = types.ReplyKeyboardRemove() bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE') await message.answer(bye_message, reply_markup=markup) await state.set_state("START") except Exception as e: # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.error( f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") await message.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message( StateFilter("SUGGEST"), ChatTypeFilter(chat_type=["private"]), ) async def suggest_router(message: types.Message, state: FSMContext, album: list = None): # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.info( f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") first_name = get_first_name(message) try: post_caption = '' if message.media_group_id is not None: # Экранируем username для безопасного использования safe_username = html.escape(message.from_user.username) if message.from_user.username else "Без никнейма" await send_text_message(GROUP_FOR_LOGS, message, f'Закинул медиагруппу, пользователь: имя - {first_name}, ник - {safe_username}') else: await message.forward(chat_id=GROUP_FOR_LOGS) if message.content_type == 'text': lower_text = message.text.lower() # Получаем текст сообщения и преобразовываем его по правилам post_text = get_text_message(lower_text, first_name, message.from_user.username) # Получаем клавиатуру для поста markup = get_reply_keyboard_for_post() # Отправляем сообщение в приватный канал sent_message_id = await send_text_message(GROUP_FOR_POST, message, post_text, markup) # Записываем в базу пост BotDB.add_post_in_db(sent_message_id, message.text, message.from_user.id) # Отправляем юзеру ответ, что сообщение отравлено и возвращаем его в меню markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") elif message.content_type == 'photo' and message.media_group_id is None: if message.caption: lower_caption = message.caption.lower() # Получаем текст сообщения и преобразовываем его по правилам post_caption = get_text_message(lower_caption, first_name, message.from_user.username) markup = get_reply_keyboard_for_post() # Отправляем фото и текст в приватный канал sent_message = await send_photo_message(GROUP_FOR_POST, message, message.photo[-1].file_id, post_caption, markup) # Записываем в базу пост и контент BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id) await add_in_db_media(sent_message, BotDB) # Отправляем юзеру ответ и возвращаем его в меню markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") elif message.content_type == 'video' and message.media_group_id is None: if message.caption: lower_caption = message.caption.lower() post_caption = get_text_message(lower_caption, first_name, message.from_user.username) markup = get_reply_keyboard_for_post() # Получаем текст сообщения и преобразовываем его по правилам # Отправляем видео и текст в приватный канал sent_message = await send_video_message(GROUP_FOR_POST, message, message.video.file_id, post_caption, markup) # Записываем в базу пост и контент BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id) await add_in_db_media(sent_message, BotDB) # Записываем в базу пост и контент BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id) await add_in_db_media(sent_message) # Отправляем юзеру ответ и возвращаем его в меню markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") elif message.content_type == 'video_note' and message.media_group_id is None: markup = get_reply_keyboard_for_post() # Отправляем видеокружок в приватный канал sent_message = await send_video_note_message(GROUP_FOR_POST, message, message.video_note.file_id, markup) # Записываем в базу пост и контент BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id) await add_in_db_media(sent_message, BotDB) # Отправляем юзеру ответ и возвращаем его в меню markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") elif message.content_type == 'audio' and message.media_group_id is None: if message.caption: lower_caption = message.caption.lower() # Получаем текст сообщения и преобразовываем его по правилам post_caption = get_text_message(lower_caption, first_name, message.from_user.username) markup = get_reply_keyboard_for_post() # Отправляем аудио и текст в приватный канал sent_message = await send_audio_message(GROUP_FOR_POST, message, message.audio.file_id, post_caption, markup) # Записываем в базу пост и контент BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id) await add_in_db_media(sent_message, BotDB) # Отправляем юзеру ответ и возвращаем его в меню markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") elif message.content_type == 'voice' and message.media_group_id is None: markup = get_reply_keyboard_for_post() # Отправляем войс и текст в приватный канал sent_message = await send_voice_message(GROUP_FOR_POST, message, message.voice.file_id, markup) # Записываем в базу пост и контент BotDB.add_post_in_db(sent_message.message_id, sent_message.caption, message.from_user.id) await add_in_db_media(sent_message, BotDB) # Отправляем юзеру ответ и возвращаем его в меню markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") elif message.media_group_id is not None: post_caption = " " # Получаем сообщение и проверяем есть ли подпись. Если подпись есть, то преобразуем ее через функцию if album[0].caption: lower_caption = album[0].caption.lower() post_caption = get_text_message(lower_caption, first_name, message.from_user.username) # Иначе обрабатываем фото и получаем медиагруппу media_group = await prepare_media_group_from_middlewares(album, post_caption) # Отправляем медиагруппу в секретный чат media_group_message_id = await send_media_group_message_to_private_chat(GROUP_FOR_POST, message, media_group, BotDB) await asyncio.sleep(0.2) # Получаем клавиатуру и отправляем еще одно текстовое сообщение с кнопками markup = get_reply_keyboard_for_post() help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup) # Записываем в state идентификаторы текстового сообщения И последнего сообщения медиагруппы BotDB.update_helper_message_in_db(message_id=media_group_message_id, helper_message_id=help_message_id) # Получаем клавиатуру для пользователя, благодарим за пост, и возвращаем в дефолтное сообщение markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) success_send_message = messages.get_message(first_name, 'SUCCESS_SEND_MESSAGE') await message.answer(success_send_message, reply_markup=markup_for_user) await state.set_state("START") else: await message.bot.send_message(message.chat.id, 'Я пока не умею работать с таким сообщением. ' 'Пришли текст и фото/фоты(ы). А лучше перешли это сообщение админу @kerrad1\n' 'Мы добавим его к обработке если необходимо') except Exception as e: await message.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @private_router.message( ChatTypeFilter(chat_type=["private"]), F.text == '🤪Хочу стикеры' ) async def stickers(message: types.Message, state: FSMContext): # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.info( f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") markup = get_reply_keyboard(BotDB, message.from_user.id) try: BotDB.update_info_about_stickers(user_id=message.from_user.id) await message.forward(chat_id=GROUP_FOR_LOGS) await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', reply_markup=markup) await state.set_state("START") except Exception as e: await message.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.error( f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") @private_router.message( StateFilter("START"), ChatTypeFilter(chat_type=["private"]), F.text == '📩Связаться с админами' ) async def connect_with_admin(message: types.Message, state: FSMContext): # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.info( f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}") user_id = message.from_user.id current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") BotDB.update_date_for_user(date, user_id) admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN') await message.answer(admin_message, parse_mode="html") await message.forward(chat_id=GROUP_FOR_LOGS) await state.set_state("PRE_CHAT") @private_router.message( StateFilter("PRE_CHAT"), ChatTypeFilter(chat_type=["private"]), ) @private_router.message( StateFilter("CHAT"), ChatTypeFilter(chat_type=["private"]), ) async def resend_message_in_group_for_message(message: types.Message, state: FSMContext): user_id = message.from_user.id current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") BotDB.update_date_for_user(date, user_id) # Экранируем full_name для безопасного использования в логах safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь" logger.info( f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {safe_full_name} Идентификатор сообщения: {message.message_id})") await message.forward(chat_id=GROUP_FOR_MESSAGE) current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) question = messages.get_message(get_first_name(message), 'QUESTION') user_state = await state.get_state() if user_state == "PRE_CHAT": markup = get_reply_keyboard(BotDB, message.from_user.id) await message.answer(question, reply_markup=markup) await state.set_state("START") elif user_state == "CHAT": markup = get_reply_keyboard_leave_chat() await message.answer(question, reply_markup=markup)