import html import os import random from datetime import datetime, timedelta from time import sleep import emoji from aiogram import types from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio from helper_bot.utils.base_dependency_factory import BaseDependencyFactory from logs.custom_logger import logger GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] emoji_list = list(emoji.EMOJI_DATA.keys()) def safe_html_escape(text: str) -> str: """ Безопасно экранирует текст для использования в HTML разметке. Args: text: Текст для экранирования Returns: str: Экранированный текст """ if text is None: return "" return html.escape(str(text)) def get_first_name(message: types.Message) -> str: """ Безопасно получает и экранирует имя пользователя для использования в HTML разметке. Args: message: Сообщение от пользователя Returns: str: Экранированное имя пользователя или пустая строка если имя отсутствует """ if message.from_user.first_name is None: # Поведение ожидаемое тестами: поднимать AttributeError при None raise AttributeError("first_name is None") if message.from_user.first_name: # Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML first_name = str(message.from_user.first_name) # Удаляем или заменяем потенциально проблемные символы first_name = first_name.replace('\u0cc0', '') # Убираем символ "ೀ" (U+0CC0) first_name = first_name.replace('\u0cc1', '') # Убираем символ "ೀ" (U+0CC1) first_name = html.escape(first_name) return first_name return "" def get_text_message(post_text: str, first_name: str, username: str = None): """ Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон". Args: post_text: Текст сообщения first_name: Имя автора поста username: Юзернейм автора поста (может быть None) Returns: str: - Сформированный текст сообщения. """ # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" # Экранируем username для безопасного использования в HTML safe_username = html.escape(username) if username else None # Формируем строку с информацией об авторе if safe_username: author_info = f"{first_name} @{safe_username}" else: author_info = f"{first_name} (Ник не указан)" if "неанон" in post_text or "не анон" in post_text: return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' elif "анон" in post_text: return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно' else: return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' async def download_file(message: types.Message, file_id: str): """ Скачивает файл по file_id из Telegram. Args: message: сообщение file_id: File ID фотографии filename: Имя файла, под которым будет сохранено фото Returns: Путь к сохраненному файлу, если файл был скачан успешно, иначе None """ try: os.makedirs("files", exist_ok=True) os.makedirs("files/photos", exist_ok=True) os.makedirs("files/videos", exist_ok=True) os.makedirs("files/music", exist_ok=True) os.makedirs("files/voice", exist_ok=True) os.makedirs("files/video_notes", exist_ok=True) file = await message.bot.get_file(file_id) file_path = os.path.join("files", file.file_path) await message.bot.download_file(file_path=file.file_path, destination=file_path) return file_path except Exception as e: logger.error(f"Ошибка скачивания фотографии: {e}") return None async def prepare_media_group_from_middlewares(album, post_caption: str = ''): """ Создает MediaGroup. Args: album: Album объект из Telegram API. post_caption: Текст подписи к первому фото. Returns: Список InputMediaPhoto (MediaGroup). """ # Экранируем post_caption для безопасного использования в HTML safe_post_caption = html.escape(str(post_caption)) if post_caption else "" media_group = [] for i, message in enumerate(album): if message.photo: file_id = message.photo[-1].file_id media_type = 'photo' elif message.video: file_id = message.video.file_id media_type = 'video' elif message.audio: file_id = message.audio.file_id media_type = 'audio' else: # Если нет фото, видео или аудио, пропускаем сообщение continue # Формируем объект MediaGroup с учетом типа медиа if i == len(album) - 1: if media_type == 'photo': media_group.append(InputMediaPhoto(media=file_id, caption=safe_post_caption)) elif media_type == 'video': media_group.append(InputMediaVideo(media=file_id, caption=safe_post_caption)) elif media_type == 'audio': media_group.append(InputMediaAudio(media=file_id, caption=safe_post_caption)) else: if media_type == 'photo': media_group.append(InputMediaPhoto(media=file_id)) elif media_type == 'video': media_group.append(InputMediaVideo(media=file_id)) elif media_type == 'audio': media_group.append(InputMediaAudio(media=file_id)) return media_group # Возвращаем MediaGroup async def add_in_db_media_mediagroup(sent_message, bot_db): """ Идентификатор медиа-группы Args: sent_message: sent_message объект из Telegram API bot_db: Экземпляр базы данных Returns: Список InputFile (FSInputFile). """ media_group_message_id = sent_message[-1].message_id # Получаем идентификатор медиа-группы for i, message in enumerate(sent_message): if message.photo: file_id = message.photo[-1].file_id file_path = await download_file(message, file_id=file_id) bot_db.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'photo') elif message.video: file_id = message.video.file_id file_path = await download_file(message, file_id=file_id) bot_db.add_post_content_in_db(media_group_message_id, message.message_id, file_path, 'video') else: # Если нет фото, видео или аудио, или другой контент, пропускаем сообщение continue async def add_in_db_media(sent_message, bot_db): """ Args: sent_message: sent_message объект из Telegram API bot_db: Экземпляр базы данных Returns: Список InputFile (FSInputFile). """ if sent_message.photo: file_id = sent_message.photo[-1].file_id file_path = await download_file(sent_message, file_id=file_id) bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'photo') elif sent_message.video: file_id = sent_message.video.file_id file_path = await download_file(sent_message, file_id=file_id) bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video') elif sent_message.voice: file_id = sent_message.voice.file_id file_path = await download_file(sent_message, file_id=file_id) bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'voice') elif sent_message.audio: file_id = sent_message.audio.file_id file_path = await download_file(sent_message, file_id=file_id) bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'audio') elif sent_message.video_note: file_id = sent_message.video_note.file_id file_path = await download_file(sent_message, file_id=file_id) bot_db.add_post_content_in_db(sent_message.message_id, sent_message.message_id, file_path, 'video_note') async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message, media_group: list[InputMediaPhoto], bot_db): sent_message = await message.bot.send_media_group( chat_id=chat_id, media=media_group, ) bot_db.add_post_in_db(sent_message[-1].message_id, sent_message[-1].caption, message.from_user.id) await add_in_db_media_mediagroup(sent_message, bot_db) message_id = sent_message[-1].message_id return message_id async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tuple[str]], post_text: str): """ Отправляет медиа-группу с подписью к последнему файлу. Args: bot: Экземпляр бота aiogram. chat_id: ID чата для отправки. post_content: Список кортежей с путями к файлам. post_text: Текст подписи. """ media = [] for file_path in post_content: try: file = FSInputFile(path=file_path[0]) type = file_path[1] if type == 'video': media.append(types.InputMediaVideo(media=file)) if type == 'photo': media.append(types.InputMediaPhoto(media=file)) except FileNotFoundError: logger.error(f"Файл не найден: {file_path[0]}") return # Добавляем подпись к последнему файлу if media: # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" media[-1].caption = safe_post_text await bot.send_media_group(chat_id=chat_id, media=media) async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_message( chat_id=chat_id, text=safe_post_text ) message_id = sent_message.message_id return message_id else: sent_message = await message.bot.send_message( chat_id=chat_id, text=safe_post_text, reply_markup=markup ) message_id = sent_message.message_id return message_id async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_photo( chat_id=chat_id, caption=safe_post_text, photo=photo ) else: sent_message = await message.bot.send_photo( chat_id=chat_id, caption=safe_post_text, photo=photo, reply_markup=markup ) return sent_message async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "", markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_video( chat_id=chat_id, caption=safe_post_text, video=video ) else: sent_message = await message.bot.send_video( chat_id=chat_id, caption=safe_post_text, video=video, reply_markup=markup ) return sent_message async def send_video_note_message(chat_id, message: types.Message, video_note: str, markup: types.ReplyKeyboardMarkup = None): if markup is None: sent_message = await message.bot.send_video_note( chat_id=chat_id, video_note=video_note ) else: sent_message = await message.bot.send_video_note( chat_id=chat_id, video_note=video_note, reply_markup=markup ) return sent_message async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_audio( chat_id=chat_id, caption=safe_post_text, audio=audio ) else: sent_message = await message.bot.send_audio( chat_id=chat_id, caption=safe_post_text, audio=audio, reply_markup=markup ) return sent_message async def send_voice_message(chat_id, message: types.Message, voice: str, markup: types.ReplyKeyboardMarkup = None): if markup is None: sent_message = await message.bot.send_voice( chat_id=chat_id, voice=voice ) else: sent_message = await message.bot.send_voice( chat_id=chat_id, voice=voice, reply_markup=markup ) return sent_message def check_access(user_id: int, bot_db): """Проверка прав на совершение действий""" return bot_db.is_admin(user_id) def add_days_to_date(days: int): """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" current_date = datetime.now() future_date = current_date + timedelta(days=days) formatted_date = future_date.strftime("%d-%m-%Y") return formatted_date def get_banned_users_list(offset: int, bot_db): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: offset: отступ для запроса в базу данных bot_db: Экземпляр базы данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = bot_db.get_banned_users_from_db_with_limits(limit=7, offset=offset) message = "Список заблокированных пользователей:\n" for user in users: # Экранируем пользовательские данные для безопасного использования safe_user_name = html.escape(str(user[0])) if user[0] else "Неизвестный пользователь" safe_ban_reason = html.escape(str(user[2])) if user[2] else "Причина не указана" safe_unban_date = html.escape(str(user[3])) if user[3] else "Дата не указана" message += f"Пользователь: {safe_user_name}\n" message += f"Причина бана: {safe_ban_reason}\n" message += f"Дата разбана: {safe_unban_date}\n\n" return message def get_banned_users_buttons(bot_db): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: bot_db: Экземпляр базы данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = bot_db.get_banned_users_from_db() user_ids = [] for user in users: # Экранируем user_name для безопасного использования safe_user_name = html.escape(str(user[0])) if user[0] else "Неизвестный пользователь" user_ids.append((safe_user_name, user[1])) return user_ids def delete_user_blacklist(user_id: int, bot_db): return bot_db.delete_user_blacklist(user_id=user_id) def check_username_and_full_name(user_id: int, username: str, full_name: str, bot_db): username_db, full_name_db = bot_db.get_username_and_full_name(user_id=user_id) return username != username_db or full_name != full_name_db def unban_notifier(self): # Получение сегодняшней даты в формате DD-MM-YYYY current_date = datetime.now() today = current_date.strftime("%d-%m-%Y") # Получение списка разблокированных пользователей unblocked_users = self.BotDB.get_users_for_unblock_today(today) message = "Разблокированные пользователи:\n" for user_id, user_name in unblocked_users.items(): # Экранируем user_name для безопасного использования safe_user_name = html.escape(str(user_name)) if user_name else "Неизвестный пользователь" message += f"ID: {user_id}, Имя: {safe_user_name}\n" # Отправка сообщения в канал self.bot.send_message(self.GROUP_FOR_MESSAGE, message) async def update_user_info(source: str, message: types.Message): # Собираем данные full_name = message.from_user.full_name username = message.from_user.username first_name = get_first_name(message) is_bot = message.from_user.is_bot language_code = message.from_user.language_code user_id = message.from_user.id current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") # Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было user_emoji = get_random_emoji() if not BotDB.user_exists(user_id): BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, user_emoji, date, date) else: is_need_update = check_username_and_full_name(user_id, username, full_name) if is_need_update: BotDB.update_username_and_full_name(user_id, username, full_name) if source != 'voice': await message.answer( f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}") await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}') sleep(1) BotDB.update_date_for_user(date, user_id) def check_user_emoji(message: types.Message): user_id = message.from_user.id user_emoji = BotDB.check_emoji_for_user(user_id=user_id) if user_emoji is None: user_emoji = get_random_emoji() BotDB.update_emoji_for_user(user_id=user_id, emoji=user_emoji) return user_emoji def get_random_emoji(): attempts = 0 while attempts < 100: user_emoji = random.choice(emoji_list) if not BotDB.check_emoji(user_emoji): return user_emoji attempts += 1 logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.") return "Эмоджи не определен"