import asyncio
import html
import os
import random
import tempfile
import time
from datetime import datetime, timedelta
from time import sleep
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
try:
import emoji as _emoji_lib
_emoji_lib_available = True
except ImportError:
_emoji_lib = None
_emoji_lib_available = False
from aiogram import types
from aiogram.types import (
FSInputFile,
InputMediaAudio,
InputMediaDocument,
InputMediaPhoto,
InputMediaVideo,
)
from database.models import TelegramPost
from helper_bot.utils.base_dependency_factory import (
BaseDependencyFactory,
get_global_instance,
)
from logs.custom_logger import logger
# Local imports - metrics
from .metrics import (
db_query_time,
track_errors,
track_file_operations,
track_media_processing,
track_time,
)
bdf = get_global_instance()
# TODO: поменять архитектуру и подключить правильный BotDB
BotDB = bdf.get_db()
GROUP_FOR_LOGS = bdf.settings["Telegram"]["group_for_logs"]
if _emoji_lib_available and _emoji_lib is not None:
emoji_list = list(_emoji_lib.EMOJI_DATA.keys())
else:
# Fallback minimal emoji set for environments without the 'emoji' package (e.g., CI tests)
emoji_list = [
"🙂",
"😀",
"😉",
"😎",
"🤖",
"🦄",
"🐱",
"🐶",
"🍀",
"🔥",
"🌟",
"🎉",
"💡",
"🚀",
"🌈",
]
def safe_html_escape(text: str) -> str:
"""
Безопасно экранирует текст для использования в HTML разметке.
Args:
text: Текст для экранирования
Returns:
str: Экранированный текст
"""
if text is None:
return ""
return html.escape(str(text))
@track_time("get_first_name", "helper_func")
@track_errors("helper_func", "get_first_name")
def get_first_name(message: types.Message) -> str:
"""
Безопасно получает и экранирует имя пользователя для использования в HTML разметке.
Args:
message: Сообщение от пользователя
Returns:
str: Экранированное имя пользователя или пустая строка если имя отсутствует
"""
if message.from_user.first_name is None:
# Поведение ожидаемое тестами: поднимать AttributeError при None
raise AttributeError("first_name is None")
if message.from_user.first_name:
# Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML
first_name = str(message.from_user.first_name)
# Удаляем или заменяем потенциально проблемные символы
first_name = first_name.replace("\u0cc0", "") # Убираем символ "ೀ" (U+0CC0)
first_name = first_name.replace("\u0cc1", "") # Убираем символ "ೀ" (U+0CC1)
first_name = html.escape(first_name)
return first_name
return ""
def determine_anonymity(post_text: str) -> bool:
"""
Определяет, является ли пост анонимным на основе ключевых слов в тексте.
Args:
post_text: Текст сообщения
Returns:
bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте;
False по умолчанию (если нет ключевых слов)
"""
if not post_text:
return False
post_text_lower = post_text.lower()
# Сначала проверяем "неанон" или "не анон" (более специфичное условие)
if "неанон" in post_text_lower or "не анон" in post_text_lower:
return False
# Проверяем "анон"
if "анон" in post_text_lower:
return True
# По умолчанию False
return False
def get_publish_text(
post_text: str,
first_name: str,
username: str = None,
is_anonymous: Optional[bool] = None,
) -> str:
"""
Форматирует текст для финальной публикации в канал.
Только текст поста + подпись автора или анон.
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy)
Returns:
str: Текст для публикации в канал
"""
safe_post_text = post_text or ""
safe_first_name = first_name or "Пользователь"
# Формируем строку с информацией об авторе
if username:
author_info = f"{safe_first_name} @{username}"
else:
author_info = f"{safe_first_name}"
# Определяем анонимность и формируем финальный текст
if is_anonymous is not None:
if is_anonymous:
final_text = f"{safe_post_text}\n\nПост опубликован анонимно"
else:
final_text = f"{safe_post_text}\n\nАвтор поста: {author_info}"
else:
# Legacy: определяем по тексту
if "неанон" in post_text.lower() or "не анон" in post_text.lower():
final_text = f"{safe_post_text}\n\nАвтор поста: {author_info}"
elif "анон" in post_text.lower():
final_text = f"{safe_post_text}\n\nПост опубликован анонимно"
else:
final_text = f"{safe_post_text}\n\nАвтор поста: {author_info}"
return final_text
def get_text_message(
post_text: str,
first_name: str,
username: str = None,
is_anonymous: Optional[bool] = None,
deepseek_score: Optional[float] = None,
rag_score: Optional[float] = None,
rag_confidence: Optional[float] = None,
rag_score_pos_only: Optional[float] = None,
user_id: Optional[int] = None,
):
"""
Форматирует текст сообщения для модерации (с полной информацией об авторе и скорами).
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту)
deepseek_score: Скор от DeepSeek API (0.0-1.0, опционально)
rag_score: Скор от RAG/ruBERT neg/pos (0.0-1.0, опционально)
rag_confidence: Уверенность RAG модели (0.0-1.0, зависит от количества примеров)
rag_score_pos_only: Скор RAG только по положительным примерам (0.0-1.0, опционально)
user_id: ID пользователя Telegram (опционально)
Returns:
str: - Сформированный текст сообщения для модерации.
"""
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
# Экранируем username для безопасного использования в HTML
safe_username = html.escape(username) if username else None
safe_first_name = html.escape(first_name) if first_name else "Пользователь"
# Формируем шапку с информацией об авторе
if safe_username:
header = f"👤 От: {safe_first_name} (@{safe_username})"
else:
header = f"👤 От: {safe_first_name} (Ник не указан)"
if user_id:
header += f" | ID: {user_id}"
# Формируем строку с информацией об авторе для подвала
if safe_username:
author_info = f"{safe_first_name} @{safe_username}"
else:
author_info = f"{safe_first_name} (Ник не указан)"
# Формируем блок с текстом поста
separator = "=" * 32
post_block = f"{header}\nТекст поста:\n{separator}\n{safe_post_text}"
# Определяем анонимность и формируем подвал
if is_anonymous is not None:
if is_anonymous:
post_block += f"\n\nПост опубликован анонимно"
else:
post_block += f"\n\nАвтор поста: {author_info}"
else:
# Legacy: определяем по тексту
if "неанон" in post_text or "не анон" in post_text:
post_block += f"\n\nАвтор поста: {author_info}"
elif "анон" in post_text:
post_block += f"\n\nПост опубликован анонимно"
else:
post_block += f"\n\nАвтор поста: {author_info}"
post_block += f"\n{separator}"
# Добавляем блок со скорами если есть (без RAG pos only и уверенности)
if deepseek_score is not None or rag_score is not None:
scores_lines = ["📊 Уверенность в одобрении:"]
if deepseek_score is not None:
scores_lines.append(f"DeepSeek: {deepseek_score:.2f}")
if rag_score is not None:
logger.debug(
f"get_text_message: Форматирование rag_score - "
f"rag_score={rag_score} (type: {type(rag_score).__name__}), "
f"formatted_value={rag_score:.2f}"
)
scores_lines.append(f"RAG neg/pos: {rag_score:.2f}")
post_block += "\n" + "\n".join(scores_lines)
return post_block
@track_time("download_file", "helper_func")
@track_errors("helper_func", "download_file")
@track_file_operations("unknown")
async def download_file(
message: types.Message, file_id: str, content_type: str = None, s3_storage=None
) -> Optional[str]:
"""
Скачивает файл по file_id из Telegram и сохраняет в S3 или на локальный диск.
Args:
message: сообщение
file_id: File ID файла
content_type: тип контента (photo, video, audio, voice, video_note)
s3_storage: опциональный S3StorageService для сохранения в S3
Returns:
S3 ключ (если s3_storage указан) или локальный путь к файлу, иначе None
"""
start_time = time.time()
try:
# Валидация параметров
if not file_id or not message or not message.bot:
logger.error(
"download_file: Неверные параметры - file_id, message или bot отсутствуют"
)
return None
# Получаем информацию о файле
file = await message.bot.get_file(file_id)
if not file or not file.file_path:
logger.error(
f"download_file: Не удалось получить информацию о файле {file_id}"
)
return None
# Определяем расширение
original_filename = os.path.basename(file.file_path)
file_extension = os.path.splitext(original_filename)[1] or ".bin"
if s3_storage:
# Сохраняем в S3
# Скачиваем во временный файл
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension)
temp_path = temp_file.name
temp_file.close()
try:
# Скачиваем из Telegram
await message.bot.download_file(
file_path=file.file_path, destination=temp_path
)
# Генерируем S3 ключ
s3_key = s3_storage.generate_s3_key(content_type, file_id)
# Загружаем в S3
success = await s3_storage.upload_file(temp_path, s3_key)
# Удаляем временный файл
try:
os.remove(temp_path)
except:
pass
if success:
file_size = file.file_size if hasattr(file, "file_size") else 0
download_time = time.time() - start_time
logger.info(
f"download_file: Файл загружен в S3 - {s3_key}, размер: {file_size} байт, время: {download_time:.2f}с"
)
return s3_key
else:
logger.error(
f"download_file: Не удалось загрузить файл в S3: {s3_key}"
)
return None
except Exception as e:
# Удаляем временный файл при ошибке
try:
os.remove(temp_path)
except:
pass
download_time = time.time() - start_time
logger.error(
f"download_file: Ошибка загрузки файла в S3 {file_id}: {e}, время: {download_time:.2f}с"
)
return None
else:
# Старая логика - сохраняем на локальный диск
# Определяем папку по типу контента
type_folders = {
"photo": "photos",
"video": "videos",
"audio": "music",
"voice": "voice",
"video_note": "video_notes",
}
folder = type_folders.get(content_type, "other")
base_path = "files"
full_folder_path = os.path.join(base_path, folder)
# Создаем необходимые папки
os.makedirs(base_path, exist_ok=True)
os.makedirs(full_folder_path, exist_ok=True)
logger.debug(
f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}"
)
# Генерируем уникальное имя файла
safe_filename = f"{file_id}{file_extension}"
file_path = os.path.join(full_folder_path, safe_filename)
# Скачиваем файл
await message.bot.download_file(
file_path=file.file_path, destination=file_path
)
# Проверяем, что файл действительно скачался
if not os.path.exists(file_path):
logger.error(f"download_file: Файл не был скачан - {file_path}")
return None
file_size = os.path.getsize(file_path)
download_time = time.time() - start_time
logger.info(
f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с"
)
return file_path
except Exception as e:
download_time = time.time() - start_time
logger.error(
f"download_file: Ошибка скачивания файла {file_id}: {e}, время: {download_time:.2f}с"
)
return None
@track_time("prepare_media_group_from_middlewares", "helper_func")
@track_errors("helper_func", "prepare_media_group_from_middlewares")
@track_media_processing("media_group")
async def prepare_media_group_from_middlewares(album, post_caption: str = ""):
"""
Создает MediaGroup согласно best practices aiogram 3.x.
Args:
album: Album объект из Telegram API (список сообщений).
post_caption: Текст подписи к первому медиа файлу.
Returns:
Список InputMedia объектов для MediaGroup.
"""
# Экранируем post_caption для безопасного использования в HTML
safe_post_caption = html.escape(str(post_caption)) if post_caption else ""
media_group = []
for i, message in enumerate(album):
if message.photo:
file_id = message.photo[-1].file_id
# Для фото используем InputMediaPhoto
if i == 0: # Первое фото получает подпись
media_group.append(
InputMediaPhoto(media=file_id, caption=safe_post_caption)
)
else:
media_group.append(InputMediaPhoto(media=file_id))
elif message.video:
file_id = message.video.file_id
# Для видео используем InputMediaVideo
if i == 0: # Первое видео получает подпись
media_group.append(
InputMediaVideo(media=file_id, caption=safe_post_caption)
)
else:
media_group.append(InputMediaVideo(media=file_id))
elif message.audio:
file_id = message.audio.file_id
# Для аудио используем InputMediaAudio
if i == 0: # Первое аудио получает подпись
media_group.append(
InputMediaAudio(media=file_id, caption=safe_post_caption)
)
else:
media_group.append(InputMediaAudio(media=file_id))
elif message.document:
file_id = message.document.file_id
# Для документов используем InputMediaDocument (если поддерживается)
if i == 0: # Первый документ получает подпись
media_group.append(
InputMediaDocument(media=file_id, caption=safe_post_caption)
)
else:
media_group.append(InputMediaDocument(media=file_id))
else:
# Если нет поддерживаемого медиа, пропускаем сообщение
continue
return media_group
async def _save_media_group_background(
sent_message: List[types.Message],
bot_db: Any,
main_post_id: Optional[int],
s3_storage,
) -> None:
"""Сохраняет медиагруппу в фоне, чтобы не блокировать ответ пользователю"""
try:
success = await add_in_db_media_mediagroup(
sent_message, bot_db, main_post_id, s3_storage
)
if not success:
logger.warning(
f"_save_media_group_background: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}"
)
except Exception as e:
logger.error(
f"_save_media_group_background: Ошибка при сохранении медиа для медиагруппы {sent_message[-1].message_id}: {e}"
)
@track_time("add_in_db_media_mediagroup", "helper_func")
@track_errors("helper_func", "add_in_db_media_mediagroup")
@track_media_processing("media_group")
@db_query_time("add_in_db_media_mediagroup", "posts", "insert")
async def add_in_db_media_mediagroup(
sent_message: List[types.Message],
bot_db: Any,
main_post_id: Optional[int] = None,
s3_storage=None,
) -> bool:
"""
Добавляет контент медиа-группы в базу данных
Args:
sent_message: sent_message объект из Telegram API
bot_db: Экземпляр базы данных
main_post_id: ID основного поста медиагруппы (если не указан, используется последний message_id)
Returns:
bool: True если весь контент успешно добавлен, False в случае ошибки
"""
start_time = time.time()
try:
# Валидация параметров
if not sent_message or not bot_db or not isinstance(sent_message, list):
logger.error(
"add_in_db_media_mediagroup: Неверные параметры - sent_message, bot_db или sent_message не является списком"
)
return False
if len(sent_message) == 0:
logger.warning("add_in_db_media_mediagroup: Пустая медиагруппа")
return False
post_id = main_post_id or sent_message[-1].message_id
processed_count = 0
failed_count = 0
for i, message in enumerate(sent_message):
try:
content_type = None
file_id = None
if message.photo:
content_type = "photo"
file_id = message.photo[-1].file_id
elif message.video:
content_type = "video"
file_id = message.video.file_id
elif message.audio:
content_type = "audio"
file_id = message.audio.file_id
elif message.voice:
content_type = "voice"
file_id = message.voice.file_id
elif message.video_note:
content_type = "video_note"
file_id = message.video_note.file_id
else:
logger.warning(
f"add_in_db_media_mediagroup: Неподдерживаемый тип контента в сообщении {i+1}/{len(sent_message)}"
)
failed_count += 1
continue
if not file_id:
logger.error(
f"add_in_db_media_mediagroup: file_id отсутствует в сообщении {i+1}/{len(sent_message)}"
)
failed_count += 1
continue
if s3_storage is None:
bdf = get_global_instance()
s3_storage = bdf.get_s3_storage()
file_path = await download_file(
message,
file_id=file_id,
content_type=content_type,
s3_storage=s3_storage,
)
if not file_path:
logger.error(
f"add_in_db_media_mediagroup: Не удалось скачать файл {file_id} в сообщении {i+1}/{len(sent_message)}"
)
failed_count += 1
continue
success = await bot_db.add_post_content(
post_id, post_id, file_path, content_type
)
if not success:
logger.error(
f"add_in_db_media_mediagroup: Не удалось добавить контент в БД для сообщения {i+1}/{len(sent_message)}"
)
if file_path.startswith("files/"):
try:
os.remove(file_path)
except Exception as e:
logger.warning(
f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}"
)
failed_count += 1
continue
processed_count += 1
except Exception as e:
logger.error(
f"add_in_db_media_mediagroup: Ошибка обработки сообщения {i+1}/{len(sent_message)}: {e}"
)
failed_count += 1
continue
if processed_count == 0:
logger.error(
f"add_in_db_media_mediagroup: Не удалось обработать ни одного сообщения из медиагруппы {post_id}"
)
return False
if failed_count > 0:
logger.warning(
f"add_in_db_media_mediagroup: Обработано {processed_count}/{len(sent_message)} сообщений медиагруппы {post_id}, ошибок: {failed_count}"
)
return failed_count == 0
except Exception as e:
processing_time = time.time() - start_time
logger.error(
f"add_in_db_media_mediagroup: Критическая ошибка обработки медиагруппы: {e}, время: {processing_time:.2f}с"
)
return False
@track_time("add_in_db_media", "helper_func")
@track_errors("helper_func", "add_in_db_media")
@track_media_processing("media_group")
@db_query_time("add_in_db_media", "posts", "insert")
@track_file_operations("media")
async def add_in_db_media(
sent_message: types.Message, bot_db: Any, s3_storage=None
) -> bool:
"""
Добавляет контент одиночного сообщения в базу данных
Args:
sent_message: sent_message объект из Telegram API
bot_db: Экземпляр базы данных
Returns:
bool: True если контент успешно добавлен, False в случае ошибки
"""
start_time = time.time()
try:
# Валидация параметров
if not sent_message or not bot_db:
logger.error(
"add_in_db_media: Неверные параметры - sent_message или bot_db отсутствуют"
)
return False
post_id = sent_message.message_id # ID поста (это же сообщение)
content_type = None
file_id = None
# Определяем тип контента и file_id
if sent_message.photo:
content_type = "photo"
file_id = sent_message.photo[-1].file_id
elif sent_message.video:
content_type = "video"
file_id = sent_message.video.file_id
elif sent_message.voice:
content_type = "voice"
file_id = sent_message.voice.file_id
elif sent_message.audio:
content_type = "audio"
file_id = sent_message.audio.file_id
elif sent_message.video_note:
content_type = "video_note"
file_id = sent_message.video_note.file_id
else:
logger.warning(
f"add_in_db_media: Неподдерживаемый тип контента для сообщения {post_id}"
)
return False
if not file_id:
logger.error(
f"add_in_db_media: file_id отсутствует для сообщения {post_id}"
)
return False
logger.debug(
f"add_in_db_media: Обрабатываю {content_type} для сообщения {post_id}"
)
# Получаем s3_storage если не передан
if s3_storage is None:
bdf = get_global_instance()
s3_storage = bdf.get_s3_storage()
# Скачиваем файл (в S3 или на локальный диск)
file_path = await download_file(
sent_message,
file_id=file_id,
content_type=content_type,
s3_storage=s3_storage,
)
if not file_path:
logger.error(
f"add_in_db_media: Не удалось скачать файл {file_id} для сообщения {post_id}"
)
return False
# Добавляем в базу данных
success = await bot_db.add_post_content(
post_id, sent_message.message_id, file_path, content_type
)
if not success:
logger.error(
f"add_in_db_media: Не удалось добавить контент в БД для сообщения {post_id}"
)
# Удаляем скачанный файл при ошибке БД (только если это локальный файл, не S3)
if file_path.startswith("files/"):
try:
os.remove(file_path)
logger.debug(
f"add_in_db_media: Удален файл {file_path} после ошибки БД"
)
except Exception as e:
logger.warning(
f"add_in_db_media: Не удалось удалить файл {file_path}: {e}"
)
return False
processing_time = time.time() - start_time
logger.info(
f"add_in_db_media: Контент успешно добавлен для сообщения {post_id}, тип: {content_type}, время: {processing_time:.2f}с"
)
return True
except Exception as e:
processing_time = time.time() - start_time
logger.error(
f"add_in_db_media: Ошибка обработки медиа для сообщения {post_id}: {e}, время: {processing_time:.2f}с"
)
return False
@track_time("send_media_group_message_to_private_chat", "helper_func")
@track_errors("helper_func", "send_media_group_message_to_private_chat")
@track_media_processing("media_group")
@db_query_time("send_media_group_message_to_private_chat", "posts", "insert")
async def send_media_group_message_to_private_chat(
chat_id: int,
message: types.Message,
media_group: List,
bot_db: Any,
main_post_id: Optional[int] = None,
s3_storage=None,
) -> List[int]:
"""
Отправляет медиагруппу в чат и возвращает все message_id отправленных сообщений.
Args:
chat_id: ID чата для отправки
message: Оригинальное сообщение от пользователя
media_group: Список InputMedia объектов
bot_db: Экземпляр базы данных
main_post_id: ID основного поста в БД (опционально)
s3_storage: S3StorageService для сохранения медиа
Returns:
List[int]: Список всех message_id отправленных сообщений медиагруппы
"""
sent_messages = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
)
sent_message_ids = [msg.message_id for msg in sent_messages]
main_message_id = sent_message_ids[-1]
asyncio.create_task(
_save_media_group_background(sent_messages, bot_db, main_message_id, s3_storage)
)
return sent_message_ids
@track_time("send_media_group_to_channel", "helper_func")
@track_errors("helper_func", "send_media_group_to_channel")
@track_media_processing("media_group")
async def send_media_group_to_channel(
bot, chat_id: int, post_content: List, post_text: str, s3_storage=None
):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам (локальные пути или S3 ключи).
post_text: Текст подписи.
s3_storage: опциональный S3StorageService для работы с S3.
"""
logger.info(
f"Начинаю отправку медиа-группы в чат {chat_id}, количество файлов: {len(post_content)}"
)
# Получаем s3_storage если не передан
if s3_storage is None:
bdf = get_global_instance()
s3_storage = bdf.get_s3_storage()
media = []
temp_files = [] # Для хранения путей к временным файлам
try:
for i, file_path_tuple in enumerate(post_content):
try:
file_path, content_type = file_path_tuple
logger.debug(
f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path} (тип: {content_type})"
)
# Проверяем, это S3 ключ или локальный путь
actual_path = file_path
if (
s3_storage
and not file_path.startswith("files/")
and not os.path.exists(file_path)
):
# Это S3 ключ, скачиваем во временный файл
temp_path = await s3_storage.download_to_temp(file_path)
if not temp_path:
logger.error(f"Не удалось скачать файл из S3: {file_path}")
continue
temp_files.append(temp_path)
actual_path = temp_path
elif not os.path.exists(file_path):
logger.error(f"Файл не найден: {file_path}")
continue
file = FSInputFile(path=actual_path)
if content_type == "video":
media.append(types.InputMediaVideo(media=file))
elif content_type == "photo":
media.append(types.InputMediaPhoto(media=file))
else:
logger.warning(
f"Неизвестный тип файла: {content_type} для {file_path}"
)
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path_tuple[0]}")
continue
except Exception as e:
logger.error(f"Ошибка при обработке файла {file_path_tuple[0]}: {e}")
continue
logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки")
# Добавляем подпись к последнему файлу
if media:
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
media[-1].caption = safe_post_text
logger.debug(
f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}"
)
try:
sent_messages = await bot.send_media_group(chat_id=chat_id, media=media)
logger.info(
f"Медиа-группа успешно отправлена в чат {chat_id}, количество сообщений: {len(sent_messages)}"
)
return sent_messages
except Exception as e:
logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}")
raise
finally:
# Удаляем временные файлы
for temp_file in temp_files:
try:
os.remove(temp_file)
except:
pass
@track_time("send_text_message", "helper_func")
@track_errors("helper_func", "send_text_message")
async def send_text_message(
chat_id,
message: types.Message,
post_text: str,
markup: types.ReplyKeyboardMarkup = None,
):
from .rate_limiter import send_with_rate_limit
async def _send_message():
if markup is None:
return await message.bot.send_message(
chat_id=chat_id, text=post_text, parse_mode="HTML"
)
else:
return await message.bot.send_message(
chat_id=chat_id, text=post_text, reply_markup=markup, parse_mode="HTML"
)
sent_message = await send_with_rate_limit(_send_message, chat_id)
return sent_message
@track_time("send_photo_message", "helper_func")
@track_errors("helper_func", "send_photo_message")
async def send_photo_message(
chat_id,
message: types.Message,
photo: str,
post_text: str,
markup: types.ReplyKeyboardMarkup = None,
):
if markup is None:
sent_message = await message.bot.send_photo(
chat_id=chat_id, caption=post_text, photo=photo, parse_mode="HTML"
)
else:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo,
reply_markup=markup,
parse_mode="HTML",
)
return sent_message
@track_time("send_video_message", "helper_func")
@track_errors("helper_func", "send_video_message")
async def send_video_message(
chat_id,
message: types.Message,
video: str,
post_text: str = "",
markup: types.ReplyKeyboardMarkup = None,
):
if markup is None:
sent_message = await message.bot.send_video(
chat_id=chat_id, caption=post_text, video=video, parse_mode="HTML"
)
else:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
video=video,
reply_markup=markup,
parse_mode="HTML",
)
return sent_message
@track_time("send_video_note_message", "helper_func")
@track_errors("helper_func", "send_video_note_message")
async def send_video_note_message(
chat_id,
message: types.Message,
video_note: str,
markup: types.ReplyKeyboardMarkup = None,
):
if markup is None:
sent_message = await message.bot.send_video_note(
chat_id=chat_id, video_note=video_note
)
else:
sent_message = await message.bot.send_video_note(
chat_id=chat_id, video_note=video_note, reply_markup=markup
)
return sent_message
@track_time("send_audio_message", "helper_func")
@track_errors("helper_func", "send_audio_message")
async def send_audio_message(
chat_id,
message: types.Message,
audio: str,
post_text: str,
markup: types.ReplyKeyboardMarkup = None,
):
if markup is None:
sent_message = await message.bot.send_audio(
chat_id=chat_id, caption=post_text, audio=audio, parse_mode="HTML"
)
else:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
audio=audio,
reply_markup=markup,
parse_mode="HTML",
)
return sent_message
@track_time("send_voice_message", "helper_func")
@track_errors("helper_func", "send_voice_message")
async def send_voice_message(
chat_id,
message: types.Message,
voice: str,
markup: types.ReplyKeyboardMarkup = None,
):
from .rate_limiter import send_with_rate_limit
async def _send_voice():
if markup is None:
return await message.bot.send_voice(chat_id=chat_id, voice=voice)
else:
return await message.bot.send_voice(
chat_id=chat_id, voice=voice, reply_markup=markup
)
return await send_with_rate_limit(_send_voice, chat_id)
@track_time("check_access", "helper_func")
@track_errors("helper_func", "check_access")
@db_query_time("check_access", "users", "select")
async def check_access(user_id: int, bot_db):
"""Проверка прав на совершение действий"""
from logs.custom_logger import logger
result = await bot_db.is_admin(user_id)
logger.info(f"check_access: пользователь {user_id} - результат: {result}")
return result
def add_days_to_date(days: int):
"""Прибавляет указанное количество дней к текущей дате и возвращает UNIX timestamp."""
current_date = datetime.now()
future_date = current_date + timedelta(days=days)
return int(future_date.timestamp())
@track_time("get_banned_users_list", "helper_func")
@track_errors("helper_func", "get_banned_users_list")
@db_query_time("get_banned_users_list", "users", "select")
async def get_banned_users_list(offset: int, bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
bot_db: Экземпляр базы данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
items_per_page = 9
users = await bot_db.get_banned_users_from_db_with_limits(
limit=items_per_page, offset=offset
)
message = "Список заблокированных пользователей:\n"
for user in users:
user_id, ban_reason, unban_date, ban_date = user
# Получаем имя пользователя из таблицы users
username = await bot_db.get_username(user_id)
full_name = await bot_db.get_full_name_by_id(user_id)
safe_user_name = username or full_name or f"User_{user_id}"
# Экранируем пользовательские данные для безопасного использования
safe_user_name = html.escape(str(safe_user_name))
safe_ban_reason = (
html.escape(str(ban_reason)) if ban_reason else "Причина не указана"
)
# Форматируем дату бана в человекочитаемый формат
safe_ban_date = _format_timestamp_to_date(ban_date)
# Форматируем дату разбана в человекочитаемый формат
safe_unban_date = _format_timestamp_to_date(unban_date, default="Навсегда")
message += f"Пользователь: {safe_user_name}\n"
message += f"Причина бана: {safe_ban_reason}\n"
message += f"Дата бана: {safe_ban_date}\n"
message += f"Дата разбана: {safe_unban_date}\n\n"
return message
def _format_timestamp_to_date(timestamp, default: str = "Дата не указана") -> str:
"""Форматирует timestamp в читаемую дату."""
if not timestamp:
return default
try:
if isinstance(timestamp, (int, float)):
dt = datetime.fromtimestamp(timestamp)
return dt.strftime("%d.%m.%Y %H:%M")
elif isinstance(timestamp, str):
try:
ts = int(timestamp)
dt = datetime.fromtimestamp(ts)
return dt.strftime("%d.%m.%Y %H:%M")
except (ValueError, TypeError):
return html.escape(str(timestamp))
elif hasattr(timestamp, "strftime"):
return timestamp.strftime("%d.%m.%Y %H:%M")
else:
return html.escape(str(timestamp))
except (ValueError, TypeError, OSError):
return html.escape(str(timestamp))
@track_time("get_banned_users_buttons", "helper_func")
@track_errors("helper_func", "get_banned_users_buttons")
@db_query_time("get_banned_users_buttons", "users", "select")
async def get_banned_users_buttons(bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
bot_db: Экземпляр базы данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = await bot_db.get_banned_users_from_db()
user_ids = []
for user in users:
user_id, ban_reason, unban_date = user
# Получаем имя пользователя из таблицы users
username = await bot_db.get_username(user_id)
full_name = await bot_db.get_full_name_by_id(user_id)
safe_user_name = username or full_name or f"User_{user_id}"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(safe_user_name))
user_ids.append((safe_user_name, user_id))
return user_ids
@track_time("delete_user_blacklist", "helper_func")
@track_errors("helper_func", "delete_user_blacklist")
@db_query_time("delete_user_blacklist", "users", "delete")
async def delete_user_blacklist(user_id: int, bot_db):
return await bot_db.delete_user_blacklist(user_id=user_id)
@track_time("check_username_and_full_name", "helper_func")
@track_errors("helper_func", "check_username_and_full_name")
@db_query_time("check_username_and_full_name", "users", "select")
async def check_username_and_full_name(
user_id: int, username: str, full_name: str, bot_db
):
"""Проверяет, изменились ли username или full_name пользователя"""
try:
username_db = await bot_db.get_username(user_id)
full_name_db = await bot_db.get_full_name_by_id(user_id)
return username != username_db or full_name != full_name_db
except Exception as e:
logger.error(f"Ошибка при проверке username и full_name: {e}")
return False
@track_time("unban_notifier", "helper_func")
@track_errors("helper_func", "unban_notifier")
@db_query_time("unban_notifier", "users", "select")
async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE):
# Получение текущего UNIX timestamp
current_date = datetime.now()
current_timestamp = int(current_date.timestamp())
# Получение списка разблокированных пользователей
unblocked_users = await BotDB.get_users_for_unblock_today(current_timestamp)
message = "Разблокированные пользователи:\n"
for user_id in unblocked_users:
# Получаем имя пользователя из таблицы users
username = await BotDB.get_username(user_id)
full_name = await BotDB.get_full_name_by_id(user_id)
user_name = username or full_name or f"User_{user_id}"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user_name))
message += f"ID: {user_id}, Имя: {safe_user_name}\n"
# Отправка сообщения в канал
await bot.send_message(GROUP_FOR_MESSAGE, message)
@track_time("update_user_info", "helper_func")
@track_errors("helper_func", "update_user_info")
@db_query_time("update_user_info", "users", "update")
async def update_user_info(source: str, message: types.Message):
# Собираем данные
full_name = message.from_user.full_name
username = message.from_user.username
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
user_id = message.from_user.id
# Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было
user_emoji = await get_random_emoji()
if not await BotDB.user_exists(user_id):
# Create User object with current timestamp
from database.models import User
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username,
is_bot=is_bot,
language_code=language_code,
emoji=user_emoji,
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False,
)
await BotDB.add_user(user)
else:
is_need_update = await check_username_and_full_name(
user_id, username, full_name, BotDB
)
if is_need_update:
await BotDB.update_user_info(user_id, username, full_name)
if source != "voice":
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}"
)
await message.bot.send_message(
chat_id=GROUP_FOR_LOGS,
text=f"Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}",
)
sleep(1)
await BotDB.update_user_date(user_id)
@track_time("check_user_emoji", "helper_func")
@track_errors("helper_func", "check_user_emoji")
@db_query_time("check_emoji_for_user", "users", "select")
async def check_user_emoji(message: types.Message):
user_id = message.from_user.id
user_emoji = await BotDB.get_user_emoji(user_id=user_id)
if user_emoji is None or user_emoji in (
"Смайл еще не определен",
"Эмоджи не определен",
"",
):
user_emoji = await get_random_emoji()
await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji)
return user_emoji
@track_time("get_random_emoji", "helper_func")
@track_errors("helper_func", "get_random_emoji")
@db_query_time("check_emoji", "users", "select")
async def get_random_emoji():
attempts = 0
while attempts < 100:
user_emoji = random.choice(emoji_list)
if not await BotDB.check_emoji_exists(user_emoji):
return user_emoji
attempts += 1
logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.")
return "Эмоджи не определен"