import html import os import random import time import tempfile import asyncio from datetime import datetime, timedelta from time import sleep from typing import List, Dict, Any, Optional, TYPE_CHECKING, Union try: import emoji as _emoji_lib _emoji_lib_available = True except ImportError: _emoji_lib = None _emoji_lib_available = False from aiogram import types from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio, InputMediaDocument from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance from logs.custom_logger import logger from database.models import TelegramPost # Local imports - metrics from .metrics import ( track_time, track_errors, db_query_time, track_media_processing, track_file_operations, ) bdf = get_global_instance() #TODO: поменять архитектуру и подключить правильный BotDB BotDB = bdf.get_db() GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] if _emoji_lib_available and _emoji_lib is not None: emoji_list = list(_emoji_lib.EMOJI_DATA.keys()) else: # Fallback minimal emoji set for environments without the 'emoji' package (e.g., CI tests) emoji_list = [ "🙂", "😀", "😉", "😎", "🤖", "🦄", "🐱", "🐶", "🍀", "🔥", "🌟", "🎉", "💡", "🚀", "🌈" ] def safe_html_escape(text: str) -> str: """ Безопасно экранирует текст для использования в HTML разметке. Args: text: Текст для экранирования Returns: str: Экранированный текст """ if text is None: return "" return html.escape(str(text)) @track_time("get_first_name", "helper_func") @track_errors("helper_func", "get_first_name") def get_first_name(message: types.Message) -> str: """ Безопасно получает и экранирует имя пользователя для использования в HTML разметке. Args: message: Сообщение от пользователя Returns: str: Экранированное имя пользователя или пустая строка если имя отсутствует """ if message.from_user.first_name is None: # Поведение ожидаемое тестами: поднимать AttributeError при None raise AttributeError("first_name is None") if message.from_user.first_name: # Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML first_name = str(message.from_user.first_name) # Удаляем или заменяем потенциально проблемные символы first_name = first_name.replace('\u0cc0', '') # Убираем символ "ೀ" (U+0CC0) first_name = first_name.replace('\u0cc1', '') # Убираем символ "ೀ" (U+0CC1) first_name = html.escape(first_name) return first_name return "" def determine_anonymity(post_text: str) -> bool: """ Определяет, является ли пост анонимным на основе ключевых слов в тексте. Args: post_text: Текст сообщения Returns: bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте; False по умолчанию (если нет ключевых слов) """ if not post_text: return False post_text_lower = post_text.lower() # Сначала проверяем "неанон" или "не анон" (более специфичное условие) if "неанон" in post_text_lower or "не анон" in post_text_lower: return False # Проверяем "анон" if "анон" in post_text_lower: return True # По умолчанию False return False def get_text_message(post_text: str, first_name: str, username: str = None, is_anonymous: Optional[bool] = None): """ Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон" или переданного параметра is_anonymous. Args: post_text: Текст сообщения first_name: Имя автора поста username: Юзернейм автора поста (может быть None) is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту) Returns: str: - Сформированный текст сообщения. """ # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" # Экранируем username для безопасного использования в HTML safe_username = html.escape(username) if username else None # Формируем строку с информацией об авторе if safe_username: author_info = f"{first_name} @{safe_username}" else: author_info = f"{first_name} (Ник не указан)" # Если передан is_anonymous, используем его, иначе определяем по тексту (legacy) # TODO: Уверен можно укоротить if is_anonymous is not None: if is_anonymous: return f'{safe_post_text}\n\nПост опубликован анонимно' else: return f'{safe_post_text}\n\nАвтор поста: {author_info}' else: # Legacy: определяем по тексту if "неанон" in post_text or "не анон" in post_text: return f'{safe_post_text}\n\nАвтор поста: {author_info}' elif "анон" in post_text: return f'{safe_post_text}\n\nПост опубликован анонимно' else: return f'{safe_post_text}\n\nАвтор поста: {author_info}' @track_time("download_file", "helper_func") @track_errors("helper_func", "download_file") @track_file_operations("unknown") async def download_file(message: types.Message, file_id: str, content_type: str = None, s3_storage = None) -> Optional[str]: """ Скачивает файл по file_id из Telegram и сохраняет в S3 или на локальный диск. Args: message: сообщение file_id: File ID файла content_type: тип контента (photo, video, audio, voice, video_note) s3_storage: опциональный S3StorageService для сохранения в S3 Returns: S3 ключ (если s3_storage указан) или локальный путь к файлу, иначе None """ start_time = time.time() try: # Валидация параметров if not file_id or not message or not message.bot: logger.error("download_file: Неверные параметры - file_id, message или bot отсутствуют") return None # Получаем информацию о файле file = await message.bot.get_file(file_id) if not file or not file.file_path: logger.error(f"download_file: Не удалось получить информацию о файле {file_id}") return None # Определяем расширение original_filename = os.path.basename(file.file_path) file_extension = os.path.splitext(original_filename)[1] or '.bin' if s3_storage: # Сохраняем в S3 # Скачиваем во временный файл temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) temp_path = temp_file.name temp_file.close() try: # Скачиваем из Telegram await message.bot.download_file(file_path=file.file_path, destination=temp_path) # Генерируем S3 ключ s3_key = s3_storage.generate_s3_key(content_type, file_id) # Загружаем в S3 success = await s3_storage.upload_file(temp_path, s3_key) # Удаляем временный файл try: os.remove(temp_path) except: pass if success: file_size = file.file_size if hasattr(file, 'file_size') else 0 download_time = time.time() - start_time logger.info(f"download_file: Файл загружен в S3 - {s3_key}, размер: {file_size} байт, время: {download_time:.2f}с") return s3_key else: logger.error(f"download_file: Не удалось загрузить файл в S3: {s3_key}") return None except Exception as e: # Удаляем временный файл при ошибке try: os.remove(temp_path) except: pass download_time = time.time() - start_time logger.error(f"download_file: Ошибка загрузки файла в S3 {file_id}: {e}, время: {download_time:.2f}с") return None else: # Старая логика - сохраняем на локальный диск # Определяем папку по типу контента type_folders = { 'photo': 'photos', 'video': 'videos', 'audio': 'music', 'voice': 'voice', 'video_note': 'video_notes' } folder = type_folders.get(content_type, 'other') base_path = "files" full_folder_path = os.path.join(base_path, folder) # Создаем необходимые папки os.makedirs(base_path, exist_ok=True) os.makedirs(full_folder_path, exist_ok=True) logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}") # Генерируем уникальное имя файла safe_filename = f"{file_id}{file_extension}" file_path = os.path.join(full_folder_path, safe_filename) # Скачиваем файл await message.bot.download_file(file_path=file.file_path, destination=file_path) # Проверяем, что файл действительно скачался if not os.path.exists(file_path): logger.error(f"download_file: Файл не был скачан - {file_path}") return None file_size = os.path.getsize(file_path) download_time = time.time() - start_time logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с") return file_path except Exception as e: download_time = time.time() - start_time logger.error(f"download_file: Ошибка скачивания файла {file_id}: {e}, время: {download_time:.2f}с") return None @track_time("prepare_media_group_from_middlewares", "helper_func") @track_errors("helper_func", "prepare_media_group_from_middlewares") @track_media_processing("media_group") async def prepare_media_group_from_middlewares(album, post_caption: str = ''): """ Создает MediaGroup согласно best practices aiogram 3.x. Args: album: Album объект из Telegram API (список сообщений). post_caption: Текст подписи к первому медиа файлу. Returns: Список InputMedia объектов для MediaGroup. """ # Экранируем post_caption для безопасного использования в HTML safe_post_caption = html.escape(str(post_caption)) if post_caption else "" media_group = [] for i, message in enumerate(album): if message.photo: file_id = message.photo[-1].file_id # Для фото используем InputMediaPhoto if i == 0: # Первое фото получает подпись media_group.append(InputMediaPhoto(media=file_id, caption=safe_post_caption)) else: media_group.append(InputMediaPhoto(media=file_id)) elif message.video: file_id = message.video.file_id # Для видео используем InputMediaVideo if i == 0: # Первое видео получает подпись media_group.append(InputMediaVideo(media=file_id, caption=safe_post_caption)) else: media_group.append(InputMediaVideo(media=file_id)) elif message.audio: file_id = message.audio.file_id # Для аудио используем InputMediaAudio if i == 0: # Первое аудио получает подпись media_group.append(InputMediaAudio(media=file_id, caption=safe_post_caption)) else: media_group.append(InputMediaAudio(media=file_id)) elif message.document: file_id = message.document.file_id # Для документов используем InputMediaDocument (если поддерживается) if i == 0: # Первый документ получает подпись media_group.append(InputMediaDocument(media=file_id, caption=safe_post_caption)) else: media_group.append(InputMediaDocument(media=file_id)) else: # Если нет поддерживаемого медиа, пропускаем сообщение continue return media_group async def _save_media_group_background(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int], s3_storage) -> None: """Сохраняет медиагруппу в фоне, чтобы не блокировать ответ пользователю""" try: success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id, s3_storage) if not success: logger.warning(f"_save_media_group_background: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}") except Exception as e: logger.error(f"_save_media_group_background: Ошибка при сохранении медиа для медиагруппы {sent_message[-1].message_id}: {e}") @track_time("add_in_db_media_mediagroup", "helper_func") @track_errors("helper_func", "add_in_db_media_mediagroup") @track_media_processing("media_group") @db_query_time("add_in_db_media_mediagroup", "posts", "insert") async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None, s3_storage = None) -> bool: """ Добавляет контент медиа-группы в базу данных Args: sent_message: sent_message объект из Telegram API bot_db: Экземпляр базы данных main_post_id: ID основного поста медиагруппы (если не указан, используется последний message_id) Returns: bool: True если весь контент успешно добавлен, False в случае ошибки """ start_time = time.time() try: # Валидация параметров if not sent_message or not bot_db or not isinstance(sent_message, list): logger.error("add_in_db_media_mediagroup: Неверные параметры - sent_message, bot_db или sent_message не является списком") return False if len(sent_message) == 0: logger.warning("add_in_db_media_mediagroup: Пустая медиагруппа") return False post_id = main_post_id or sent_message[-1].message_id processed_count = 0 failed_count = 0 for i, message in enumerate(sent_message): try: content_type = None file_id = None if message.photo: content_type = 'photo' file_id = message.photo[-1].file_id elif message.video: content_type = 'video' file_id = message.video.file_id elif message.audio: content_type = 'audio' file_id = message.audio.file_id elif message.voice: content_type = 'voice' file_id = message.voice.file_id elif message.video_note: content_type = 'video_note' file_id = message.video_note.file_id else: logger.warning(f"add_in_db_media_mediagroup: Неподдерживаемый тип контента в сообщении {i+1}/{len(sent_message)}") failed_count += 1 continue if not file_id: logger.error(f"add_in_db_media_mediagroup: file_id отсутствует в сообщении {i+1}/{len(sent_message)}") failed_count += 1 continue if s3_storage is None: bdf = get_global_instance() s3_storage = bdf.get_s3_storage() file_path = await download_file(message, file_id=file_id, content_type=content_type, s3_storage=s3_storage) if not file_path: logger.error(f"add_in_db_media_mediagroup: Не удалось скачать файл {file_id} в сообщении {i+1}/{len(sent_message)}") failed_count += 1 continue success = await bot_db.add_post_content(post_id, post_id, file_path, content_type) if not success: logger.error(f"add_in_db_media_mediagroup: Не удалось добавить контент в БД для сообщения {i+1}/{len(sent_message)}") if file_path.startswith('files/'): try: os.remove(file_path) except Exception as e: logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}") failed_count += 1 continue processed_count += 1 except Exception as e: logger.error(f"add_in_db_media_mediagroup: Ошибка обработки сообщения {i+1}/{len(sent_message)}: {e}") failed_count += 1 continue if processed_count == 0: logger.error(f"add_in_db_media_mediagroup: Не удалось обработать ни одного сообщения из медиагруппы {post_id}") return False if failed_count > 0: logger.warning(f"add_in_db_media_mediagroup: Обработано {processed_count}/{len(sent_message)} сообщений медиагруппы {post_id}, ошибок: {failed_count}") return failed_count == 0 except Exception as e: processing_time = time.time() - start_time logger.error(f"add_in_db_media_mediagroup: Критическая ошибка обработки медиагруппы: {e}, время: {processing_time:.2f}с") return False @track_time("add_in_db_media", "helper_func") @track_errors("helper_func", "add_in_db_media") @track_media_processing("media_group") @db_query_time("add_in_db_media", "posts", "insert") @track_file_operations("media") async def add_in_db_media(sent_message: types.Message, bot_db: Any, s3_storage = None) -> bool: """ Добавляет контент одиночного сообщения в базу данных Args: sent_message: sent_message объект из Telegram API bot_db: Экземпляр базы данных Returns: bool: True если контент успешно добавлен, False в случае ошибки """ start_time = time.time() try: # Валидация параметров if not sent_message or not bot_db: logger.error("add_in_db_media: Неверные параметры - sent_message или bot_db отсутствуют") return False post_id = sent_message.message_id # ID поста (это же сообщение) content_type = None file_id = None # Определяем тип контента и file_id if sent_message.photo: content_type = 'photo' file_id = sent_message.photo[-1].file_id elif sent_message.video: content_type = 'video' file_id = sent_message.video.file_id elif sent_message.voice: content_type = 'voice' file_id = sent_message.voice.file_id elif sent_message.audio: content_type = 'audio' file_id = sent_message.audio.file_id elif sent_message.video_note: content_type = 'video_note' file_id = sent_message.video_note.file_id else: logger.warning(f"add_in_db_media: Неподдерживаемый тип контента для сообщения {post_id}") return False if not file_id: logger.error(f"add_in_db_media: file_id отсутствует для сообщения {post_id}") return False logger.debug(f"add_in_db_media: Обрабатываю {content_type} для сообщения {post_id}") # Получаем s3_storage если не передан if s3_storage is None: bdf = get_global_instance() s3_storage = bdf.get_s3_storage() # Скачиваем файл (в S3 или на локальный диск) file_path = await download_file(sent_message, file_id=file_id, content_type=content_type, s3_storage=s3_storage) if not file_path: logger.error(f"add_in_db_media: Не удалось скачать файл {file_id} для сообщения {post_id}") return False # Добавляем в базу данных success = await bot_db.add_post_content(post_id, sent_message.message_id, file_path, content_type) if not success: logger.error(f"add_in_db_media: Не удалось добавить контент в БД для сообщения {post_id}") # Удаляем скачанный файл при ошибке БД (только если это локальный файл, не S3) if file_path.startswith('files/'): try: os.remove(file_path) logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД") except Exception as e: logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}") return False processing_time = time.time() - start_time logger.info(f"add_in_db_media: Контент успешно добавлен для сообщения {post_id}, тип: {content_type}, время: {processing_time:.2f}с") return True except Exception as e: processing_time = time.time() - start_time logger.error(f"add_in_db_media: Ошибка обработки медиа для сообщения {post_id}: {e}, время: {processing_time:.2f}с") return False @track_time("send_media_group_message_to_private_chat", "helper_func") @track_errors("helper_func", "send_media_group_message_to_private_chat") @track_media_processing("media_group") @db_query_time("send_media_group_message_to_private_chat", "posts", "insert") async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message, media_group: List, bot_db: Any, main_post_id: Optional[int] = None, s3_storage=None) -> List[int]: """ Отправляет медиагруппу в чат и возвращает все message_id отправленных сообщений. Args: chat_id: ID чата для отправки message: Оригинальное сообщение от пользователя media_group: Список InputMedia объектов bot_db: Экземпляр базы данных main_post_id: ID основного поста в БД (опционально) s3_storage: S3StorageService для сохранения медиа Returns: List[int]: Список всех message_id отправленных сообщений медиагруппы """ sent_messages = await message.bot.send_media_group( chat_id=chat_id, media=media_group, ) sent_message_ids = [msg.message_id for msg in sent_messages] main_message_id = sent_message_ids[-1] asyncio.create_task(_save_media_group_background(sent_messages, bot_db, main_message_id, s3_storage)) return sent_message_ids @track_time("send_media_group_to_channel", "helper_func") @track_errors("helper_func", "send_media_group_to_channel") @track_media_processing("media_group") async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str, s3_storage = None): """ Отправляет медиа-группу с подписью к последнему файлу. Args: bot: Экземпляр бота aiogram. chat_id: ID чата для отправки. post_content: Список кортежей с путями к файлам (локальные пути или S3 ключи). post_text: Текст подписи. s3_storage: опциональный S3StorageService для работы с S3. """ logger.info(f"Начинаю отправку медиа-группы в чат {chat_id}, количество файлов: {len(post_content)}") # Получаем s3_storage если не передан if s3_storage is None: bdf = get_global_instance() s3_storage = bdf.get_s3_storage() media = [] temp_files = [] # Для хранения путей к временным файлам try: for i, file_path_tuple in enumerate(post_content): try: file_path, content_type = file_path_tuple logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path} (тип: {content_type})") # Проверяем, это S3 ключ или локальный путь actual_path = file_path if s3_storage and not file_path.startswith('files/') and not os.path.exists(file_path): # Это S3 ключ, скачиваем во временный файл temp_path = await s3_storage.download_to_temp(file_path) if not temp_path: logger.error(f"Не удалось скачать файл из S3: {file_path}") continue temp_files.append(temp_path) actual_path = temp_path elif not os.path.exists(file_path): logger.error(f"Файл не найден: {file_path}") continue file = FSInputFile(path=actual_path) if content_type == 'video': media.append(types.InputMediaVideo(media=file)) elif content_type == 'photo': media.append(types.InputMediaPhoto(media=file)) else: logger.warning(f"Неизвестный тип файла: {content_type} для {file_path}") except FileNotFoundError: logger.error(f"Файл не найден: {file_path_tuple[0]}") continue except Exception as e: logger.error(f"Ошибка при обработке файла {file_path_tuple[0]}: {e}") continue logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки") # Добавляем подпись к последнему файлу if media: # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" media[-1].caption = safe_post_text logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}") try: sent_messages = await bot.send_media_group(chat_id=chat_id, media=media) logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}, количество сообщений: {len(sent_messages)}") return sent_messages except Exception as e: logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}") raise finally: # Удаляем временные файлы for temp_file in temp_files: try: os.remove(temp_file) except: pass @track_time("send_text_message", "helper_func") @track_errors("helper_func", "send_text_message") async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None): from .rate_limiter import send_with_rate_limit # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" async def _send_message(): if markup is None: return await message.bot.send_message( chat_id=chat_id, text=safe_post_text ) else: return await message.bot.send_message( chat_id=chat_id, text=safe_post_text, reply_markup=markup ) sent_message = await send_with_rate_limit(_send_message, chat_id) return sent_message @track_time("send_photo_message", "helper_func") @track_errors("helper_func", "send_photo_message") async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_photo( chat_id=chat_id, caption=safe_post_text, photo=photo ) else: sent_message = await message.bot.send_photo( chat_id=chat_id, caption=safe_post_text, photo=photo, reply_markup=markup ) return sent_message @track_time("send_video_message", "helper_func") @track_errors("helper_func", "send_video_message") async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "", markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_video( chat_id=chat_id, caption=safe_post_text, video=video ) else: sent_message = await message.bot.send_video( chat_id=chat_id, caption=safe_post_text, video=video, reply_markup=markup ) return sent_message @track_time("send_video_note_message", "helper_func") @track_errors("helper_func", "send_video_note_message") async def send_video_note_message(chat_id, message: types.Message, video_note: str, markup: types.ReplyKeyboardMarkup = None): if markup is None: sent_message = await message.bot.send_video_note( chat_id=chat_id, video_note=video_note ) else: sent_message = await message.bot.send_video_note( chat_id=chat_id, video_note=video_note, reply_markup=markup ) return sent_message @track_time("send_audio_message", "helper_func") @track_errors("helper_func", "send_audio_message") async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" if markup is None: sent_message = await message.bot.send_audio( chat_id=chat_id, caption=safe_post_text, audio=audio ) else: sent_message = await message.bot.send_audio( chat_id=chat_id, caption=safe_post_text, audio=audio, reply_markup=markup ) return sent_message @track_time("send_voice_message", "helper_func") @track_errors("helper_func", "send_voice_message") async def send_voice_message(chat_id, message: types.Message, voice: str, markup: types.ReplyKeyboardMarkup = None): from .rate_limiter import send_with_rate_limit async def _send_voice(): if markup is None: return await message.bot.send_voice( chat_id=chat_id, voice=voice ) else: return await message.bot.send_voice( chat_id=chat_id, voice=voice, reply_markup=markup ) return await send_with_rate_limit(_send_voice, chat_id) @track_time("check_access", "helper_func") @track_errors("helper_func", "check_access") @db_query_time("check_access", "users", "select") async def check_access(user_id: int, bot_db): """Проверка прав на совершение действий""" from logs.custom_logger import logger result = await bot_db.is_admin(user_id) logger.info(f"check_access: пользователь {user_id} - результат: {result}") return result def add_days_to_date(days: int): """Прибавляет указанное количество дней к текущей дате и возвращает UNIX timestamp.""" current_date = datetime.now() future_date = current_date + timedelta(days=days) return int(future_date.timestamp()) @track_time("get_banned_users_list", "helper_func") @track_errors("helper_func", "get_banned_users_list") @db_query_time("get_banned_users_list", "users", "select") async def get_banned_users_list(offset: int, bot_db): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: offset: отступ для запроса в базу данных bot_db: Экземпляр базы данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = await bot_db.get_banned_users_from_db_with_limits(limit=7, offset=offset) message = "Список заблокированных пользователей:\n" for user in users: user_id, ban_reason, unban_date = user # Получаем имя пользователя из таблицы users username = await bot_db.get_username(user_id) full_name = await bot_db.get_full_name_by_id(user_id) safe_user_name = username or full_name or f"User_{user_id}" # Экранируем пользовательские данные для безопасного использования safe_user_name = html.escape(str(safe_user_name)) safe_ban_reason = html.escape(str(ban_reason)) if ban_reason else "Причина не указана" # Форматируем дату разбана в человекочитаемый формат if unban_date: try: # Предполагаем, что unban_date это UNIX timestamp if isinstance(unban_date, (int, float)): unban_datetime = datetime.fromtimestamp(unban_date) safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M") elif isinstance(unban_date, str): # Если это строка, попытаемся её обработать try: # Попробуем преобразовать строку в timestamp timestamp = int(unban_date) unban_datetime = datetime.fromtimestamp(timestamp) safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M") except (ValueError, TypeError): # Если не удалось, показываем как есть safe_unban_date = html.escape(str(unban_date)) elif hasattr(unban_date, 'strftime'): # Если это datetime объект safe_unban_date = unban_date.strftime("%d-%m-%Y %H:%M") else: # Для всех остальных случаев safe_unban_date = html.escape(str(unban_date)) except (ValueError, TypeError, OSError): # В случае ошибки показываем исходное значение safe_unban_date = html.escape(str(unban_date)) else: safe_unban_date = "Дата не указана" message += f"**Пользователь:** {safe_user_name}\n" message += f"**Причина бана:** {safe_ban_reason}\n" message += f"**Дата разбана:** {safe_unban_date}\n\n" return message @track_time("get_banned_users_buttons", "helper_func") @track_errors("helper_func", "get_banned_users_buttons") @db_query_time("get_banned_users_buttons", "users", "select") async def get_banned_users_buttons(bot_db): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: bot_db: Экземпляр базы данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = await bot_db.get_banned_users_from_db() user_ids = [] for user in users: user_id, ban_reason, unban_date = user # Получаем имя пользователя из таблицы users username = await bot_db.get_username(user_id) full_name = await bot_db.get_full_name_by_id(user_id) safe_user_name = username or full_name or f"User_{user_id}" # Экранируем user_name для безопасного использования safe_user_name = html.escape(str(safe_user_name)) user_ids.append((safe_user_name, user_id)) return user_ids @track_time("delete_user_blacklist", "helper_func") @track_errors("helper_func", "delete_user_blacklist") @db_query_time("delete_user_blacklist", "users", "delete") async def delete_user_blacklist(user_id: int, bot_db): return await bot_db.delete_user_blacklist(user_id=user_id) @track_time("check_username_and_full_name", "helper_func") @track_errors("helper_func", "check_username_and_full_name") @db_query_time("check_username_and_full_name", "users", "select") async def check_username_and_full_name(user_id: int, username: str, full_name: str, bot_db): """Проверяет, изменились ли username или full_name пользователя""" try: username_db = await bot_db.get_username(user_id) full_name_db = await bot_db.get_full_name_by_id(user_id) return username != username_db or full_name != full_name_db except Exception as e: logger.error(f"Ошибка при проверке username и full_name: {e}") return False @track_time("unban_notifier", "helper_func") @track_errors("helper_func", "unban_notifier") @db_query_time("unban_notifier", "users", "select") async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE): # Получение текущего UNIX timestamp current_date = datetime.now() current_timestamp = int(current_date.timestamp()) # Получение списка разблокированных пользователей unblocked_users = await BotDB.get_users_for_unblock_today(current_timestamp) message = "Разблокированные пользователи:\n" for user_id in unblocked_users: # Получаем имя пользователя из таблицы users username = await BotDB.get_username(user_id) full_name = await BotDB.get_full_name_by_id(user_id) user_name = username or full_name or f"User_{user_id}" # Экранируем user_name для безопасного использования safe_user_name = html.escape(str(user_name)) message += f"ID: {user_id}, Имя: {safe_user_name}\n" # Отправка сообщения в канал await bot.send_message(GROUP_FOR_MESSAGE, message) @track_time("update_user_info", "helper_func") @track_errors("helper_func", "update_user_info") @db_query_time("update_user_info", "users", "update") async def update_user_info(source: str, message: types.Message): # Собираем данные full_name = message.from_user.full_name username = message.from_user.username first_name = get_first_name(message) is_bot = message.from_user.is_bot language_code = message.from_user.language_code user_id = message.from_user.id # Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было user_emoji = await get_random_emoji() if not await BotDB.user_exists(user_id): # Create User object with current timestamp from database.models import User current_timestamp = int(datetime.now().timestamp()) user = User( user_id=user_id, first_name=first_name, full_name=full_name, username=username, is_bot=is_bot, language_code=language_code, emoji=user_emoji, has_stickers=False, date_added=current_timestamp, date_changed=current_timestamp, voice_bot_welcome_received=False ) await BotDB.add_user(user) else: is_need_update = await check_username_and_full_name(user_id, username, full_name, BotDB) if is_need_update: await BotDB.update_user_info(user_id, username, full_name) if source != 'voice': await message.answer( f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}") await message.bot.send_message(chat_id=GROUP_FOR_LOGS, text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}') sleep(1) await BotDB.update_user_date(user_id) @track_time("check_user_emoji", "helper_func") @track_errors("helper_func", "check_user_emoji") @db_query_time("check_emoji_for_user", "users", "select") async def check_user_emoji(message: types.Message): user_id = message.from_user.id user_emoji = await BotDB.get_user_emoji(user_id=user_id) if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""): user_emoji = await get_random_emoji() await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji) return user_emoji @track_time("get_random_emoji", "helper_func") @track_errors("helper_func", "get_random_emoji") @db_query_time("check_emoji", "users", "select") async def get_random_emoji(): attempts = 0 while attempts < 100: user_emoji = random.choice(emoji_list) if not await BotDB.check_emoji_exists(user_emoji): return user_emoji attempts += 1 logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.") return "Эмоджи не определен"