- Modified `change_page` function in `callback_handlers.py` to use async methods for retrieving banned users and their buttons. - Enhanced `get_banned_users_list` in `helper_func.py` to handle string timestamps and various date formats, ensuring robust date parsing. - Added a new test case in `test_utils.py` to validate the handling of string timestamps in the banned users list retrieval.
868 lines
40 KiB
Python
868 lines
40 KiB
Python
import html
|
||
import os
|
||
import random
|
||
import time
|
||
from datetime import datetime, timedelta
|
||
from time import sleep
|
||
from typing import List, Dict, Any, Optional, TYPE_CHECKING, Union
|
||
|
||
try:
|
||
import emoji as _emoji_lib
|
||
_emoji_lib_available = True
|
||
except ImportError:
|
||
_emoji_lib = None
|
||
_emoji_lib_available = False
|
||
|
||
from aiogram import types
|
||
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio, InputMediaDocument
|
||
|
||
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance
|
||
from logs.custom_logger import logger
|
||
from database.models import TelegramPost
|
||
|
||
# Local imports - metrics
|
||
from .metrics import (
|
||
track_time,
|
||
track_errors,
|
||
db_query_time,
|
||
track_media_processing,
|
||
track_file_operations,
|
||
)
|
||
|
||
bdf = get_global_instance()
|
||
#TODO: поменять архитектуру и подключить правильный BotDB
|
||
BotDB = bdf.get_db()
|
||
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
|
||
|
||
if _emoji_lib_available and _emoji_lib is not None:
|
||
emoji_list = list(_emoji_lib.EMOJI_DATA.keys())
|
||
else:
|
||
# Fallback minimal emoji set for environments without the 'emoji' package (e.g., CI tests)
|
||
emoji_list = [
|
||
"🙂", "😀", "😉", "😎", "🤖", "🦄", "🐱", "🐶", "🍀", "🔥",
|
||
"🌟", "🎉", "💡", "🚀", "🌈"
|
||
]
|
||
|
||
|
||
def safe_html_escape(text: str) -> str:
|
||
"""
|
||
Безопасно экранирует текст для использования в HTML разметке.
|
||
|
||
Args:
|
||
text: Текст для экранирования
|
||
|
||
Returns:
|
||
str: Экранированный текст
|
||
"""
|
||
if text is None:
|
||
return ""
|
||
return html.escape(str(text))
|
||
|
||
|
||
@track_time("get_first_name", "helper_func")
|
||
@track_errors("helper_func", "get_first_name")
|
||
def get_first_name(message: types.Message) -> str:
|
||
"""
|
||
Безопасно получает и экранирует имя пользователя для использования в HTML разметке.
|
||
|
||
Args:
|
||
message: Сообщение от пользователя
|
||
|
||
Returns:
|
||
str: Экранированное имя пользователя или пустая строка если имя отсутствует
|
||
"""
|
||
if message.from_user.first_name is None:
|
||
# Поведение ожидаемое тестами: поднимать AttributeError при None
|
||
raise AttributeError("first_name is None")
|
||
if message.from_user.first_name:
|
||
# Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML
|
||
first_name = str(message.from_user.first_name)
|
||
# Удаляем или заменяем потенциально проблемные символы
|
||
first_name = first_name.replace('\u0cc0', '') # Убираем символ "ೀ" (U+0CC0)
|
||
first_name = first_name.replace('\u0cc1', '') # Убираем символ "ೀ" (U+0CC1)
|
||
first_name = html.escape(first_name)
|
||
return first_name
|
||
return ""
|
||
|
||
|
||
def get_text_message(post_text: str, first_name: str, username: str = None):
|
||
"""
|
||
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
|
||
|
||
Args:
|
||
post_text: Текст сообщения
|
||
first_name: Имя автора поста
|
||
username: Юзернейм автора поста (может быть None)
|
||
|
||
Returns:
|
||
str: - Сформированный текст сообщения.
|
||
"""
|
||
# Экранируем post_text для безопасного использования в HTML
|
||
safe_post_text = html.escape(str(post_text)) if post_text else ""
|
||
|
||
# Экранируем username для безопасного использования в HTML
|
||
safe_username = html.escape(username) if username else None
|
||
|
||
# Формируем строку с информацией об авторе
|
||
if safe_username:
|
||
author_info = f"{first_name} @{safe_username}"
|
||
else:
|
||
author_info = f"{first_name} (Ник не указан)"
|
||
|
||
if "неанон" in post_text or "не анон" in post_text:
|
||
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
|
||
elif "анон" in post_text:
|
||
return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно'
|
||
else:
|
||
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
|
||
|
||
@track_time("download_file", "helper_func")
|
||
@track_errors("helper_func", "download_file")
|
||
@track_file_operations("unknown")
|
||
async def download_file(message: types.Message, file_id: str, content_type: str = None) -> Optional[str]:
|
||
"""
|
||
Скачивает файл по file_id из Telegram и сохраняет в соответствующую папку.
|
||
|
||
Args:
|
||
message: сообщение
|
||
file_id: File ID файла
|
||
content_type: тип контента (photo, video, audio, voice, video_note)
|
||
|
||
Returns:
|
||
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
|
||
"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# Валидация параметров
|
||
if not file_id or not message or not message.bot:
|
||
logger.error("download_file: Неверные параметры - file_id, message или bot отсутствуют")
|
||
return None
|
||
|
||
# Определяем папку по типу контента
|
||
type_folders = {
|
||
'photo': 'photos',
|
||
'video': 'videos',
|
||
'audio': 'music',
|
||
'voice': 'voice',
|
||
'video_note': 'video_notes'
|
||
}
|
||
|
||
folder = type_folders.get(content_type, 'other')
|
||
base_path = "files"
|
||
full_folder_path = os.path.join(base_path, folder)
|
||
|
||
# Создаем необходимые папки
|
||
os.makedirs(base_path, exist_ok=True)
|
||
os.makedirs(full_folder_path, exist_ok=True)
|
||
|
||
logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}")
|
||
|
||
# Получаем информацию о файле
|
||
file = await message.bot.get_file(file_id)
|
||
if not file or not file.file_path:
|
||
logger.error(f"download_file: Не удалось получить информацию о файле {file_id}")
|
||
return None
|
||
|
||
# Генерируем уникальное имя файла
|
||
original_filename = os.path.basename(file.file_path)
|
||
file_extension = os.path.splitext(original_filename)[1] or '.bin'
|
||
safe_filename = f"{file_id}{file_extension}"
|
||
file_path = os.path.join(full_folder_path, safe_filename)
|
||
|
||
# Скачиваем файл
|
||
await message.bot.download_file(file_path=file.file_path, destination=file_path)
|
||
|
||
# Проверяем, что файл действительно скачался
|
||
if not os.path.exists(file_path):
|
||
logger.error(f"download_file: Файл не был скачан - {file_path}")
|
||
return None
|
||
|
||
file_size = os.path.getsize(file_path)
|
||
download_time = time.time() - start_time
|
||
|
||
logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с")
|
||
|
||
return file_path
|
||
|
||
except Exception as e:
|
||
download_time = time.time() - start_time
|
||
logger.error(f"download_file: Ошибка скачивания файла {file_id}: {e}, время: {download_time:.2f}с")
|
||
return None
|
||
|
||
@track_time("prepare_media_group_from_middlewares", "helper_func")
|
||
@track_errors("helper_func", "prepare_media_group_from_middlewares")
|
||
@track_media_processing("media_group")
|
||
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
|
||
"""
|
||
Создает MediaGroup согласно best practices aiogram 3.x.
|
||
|
||
Args:
|
||
album: Album объект из Telegram API (список сообщений).
|
||
post_caption: Текст подписи к первому медиа файлу.
|
||
|
||
Returns:
|
||
Список InputMedia объектов для MediaGroup.
|
||
"""
|
||
# Экранируем post_caption для безопасного использования в HTML
|
||
safe_post_caption = html.escape(str(post_caption)) if post_caption else ""
|
||
|
||
media_group = []
|
||
|
||
for i, message in enumerate(album):
|
||
if message.photo:
|
||
file_id = message.photo[-1].file_id
|
||
# Для фото используем InputMediaPhoto
|
||
if i == 0: # Первое фото получает подпись
|
||
media_group.append(InputMediaPhoto(media=file_id, caption=safe_post_caption))
|
||
else:
|
||
media_group.append(InputMediaPhoto(media=file_id))
|
||
elif message.video:
|
||
file_id = message.video.file_id
|
||
# Для видео используем InputMediaVideo
|
||
if i == 0: # Первое видео получает подпись
|
||
media_group.append(InputMediaVideo(media=file_id, caption=safe_post_caption))
|
||
else:
|
||
media_group.append(InputMediaVideo(media=file_id))
|
||
elif message.audio:
|
||
file_id = message.audio.file_id
|
||
# Для аудио используем InputMediaAudio
|
||
if i == 0: # Первое аудио получает подпись
|
||
media_group.append(InputMediaAudio(media=file_id, caption=safe_post_caption))
|
||
else:
|
||
media_group.append(InputMediaAudio(media=file_id))
|
||
elif message.document:
|
||
file_id = message.document.file_id
|
||
# Для документов используем InputMediaDocument (если поддерживается)
|
||
if i == 0: # Первый документ получает подпись
|
||
media_group.append(InputMediaDocument(media=file_id, caption=safe_post_caption))
|
||
else:
|
||
media_group.append(InputMediaDocument(media=file_id))
|
||
else:
|
||
# Если нет поддерживаемого медиа, пропускаем сообщение
|
||
continue
|
||
|
||
return media_group
|
||
|
||
@track_time("add_in_db_media_mediagroup", "helper_func")
|
||
@track_errors("helper_func", "add_in_db_media_mediagroup")
|
||
@track_media_processing("media_group")
|
||
@db_query_time("add_in_db_media_mediagroup", "posts", "insert")
|
||
async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None) -> bool:
|
||
"""
|
||
Добавляет контент медиа-группы в базу данных
|
||
|
||
Args:
|
||
sent_message: sent_message объект из Telegram API
|
||
bot_db: Экземпляр базы данных
|
||
main_post_id: ID основного поста медиагруппы (если не указан, используется последний message_id)
|
||
|
||
Returns:
|
||
bool: True если весь контент успешно добавлен, False в случае ошибки
|
||
"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# Валидация параметров
|
||
if not sent_message or not bot_db or not isinstance(sent_message, list):
|
||
logger.error("add_in_db_media_mediagroup: Неверные параметры - sent_message, bot_db или sent_message не является списком")
|
||
return False
|
||
|
||
if len(sent_message) == 0:
|
||
logger.warning("add_in_db_media_mediagroup: Пустая медиагруппа")
|
||
return False
|
||
|
||
# Используем переданный main_post_id или ID последнего сообщения
|
||
post_id = main_post_id or sent_message[-1].message_id
|
||
logger.debug(f"add_in_db_media_mediagroup: Обрабатываю медиагруппу из {len(sent_message)} сообщений, post_id: {post_id}")
|
||
|
||
processed_count = 0
|
||
failed_count = 0
|
||
|
||
for i, message in enumerate(sent_message):
|
||
try:
|
||
content_type = None
|
||
file_id = None
|
||
|
||
# Определяем тип контента и file_id
|
||
if message.photo:
|
||
content_type = 'photo'
|
||
file_id = message.photo[-1].file_id
|
||
elif message.video:
|
||
content_type = 'video'
|
||
file_id = message.video.file_id
|
||
elif message.audio:
|
||
content_type = 'audio'
|
||
file_id = message.audio.file_id
|
||
elif message.voice:
|
||
content_type = 'voice'
|
||
file_id = message.voice.file_id
|
||
elif message.video_note:
|
||
content_type = 'video_note'
|
||
file_id = message.video_note.file_id
|
||
else:
|
||
logger.warning(f"add_in_db_media_mediagroup: Неподдерживаемый тип контента в сообщении {i+1}/{len(sent_message)}")
|
||
failed_count += 1
|
||
continue
|
||
|
||
if not file_id:
|
||
logger.error(f"add_in_db_media_mediagroup: file_id отсутствует в сообщении {i+1}/{len(sent_message)}")
|
||
failed_count += 1
|
||
continue
|
||
|
||
logger.debug(f"add_in_db_media_mediagroup: Обрабатываю {content_type} в сообщении {i+1}/{len(sent_message)}")
|
||
|
||
# Скачиваем файл
|
||
file_path = await download_file(message, file_id=file_id, content_type=content_type)
|
||
if not file_path:
|
||
logger.error(f"add_in_db_media_mediagroup: Не удалось скачать файл {file_id} в сообщении {i+1}/{len(sent_message)}")
|
||
failed_count += 1
|
||
continue
|
||
|
||
# Добавляем в базу данных
|
||
success = await bot_db.add_post_content(post_id, message.message_id, file_path, content_type)
|
||
if not success:
|
||
logger.error(f"add_in_db_media_mediagroup: Не удалось добавить контент в БД для сообщения {i+1}/{len(sent_message)}")
|
||
# Удаляем скачанный файл при ошибке БД
|
||
try:
|
||
os.remove(file_path)
|
||
logger.debug(f"add_in_db_media_mediagroup: Удален файл {file_path} после ошибки БД")
|
||
except Exception as e:
|
||
logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}")
|
||
failed_count += 1
|
||
continue
|
||
|
||
processed_count += 1
|
||
logger.debug(f"add_in_db_media_mediagroup: Успешно обработано сообщение {i+1}/{len(sent_message)}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"add_in_db_media_mediagroup: Ошибка обработки сообщения {i+1}/{len(sent_message)}: {e}")
|
||
failed_count += 1
|
||
continue
|
||
|
||
processing_time = time.time() - start_time
|
||
|
||
if processed_count == 0:
|
||
logger.error(f"add_in_db_media_mediagroup: Не удалось обработать ни одного сообщения из медиагруппы {post_id}")
|
||
return False
|
||
|
||
if failed_count > 0:
|
||
logger.warning(f"add_in_db_media_mediagroup: Обработано {processed_count}/{len(sent_message)} сообщений медиагруппы {post_id}, ошибок: {failed_count}")
|
||
else:
|
||
logger.info(f"add_in_db_media_mediagroup: Успешно обработана медиагруппа {post_id} - {processed_count} сообщений, время: {processing_time:.2f}с")
|
||
|
||
return failed_count == 0
|
||
|
||
except Exception as e:
|
||
processing_time = time.time() - start_time
|
||
logger.error(f"add_in_db_media_mediagroup: Критическая ошибка обработки медиагруппы: {e}, время: {processing_time:.2f}с")
|
||
return False
|
||
|
||
@track_time("add_in_db_media", "helper_func")
|
||
@track_errors("helper_func", "add_in_db_media")
|
||
@track_media_processing("media_group")
|
||
@db_query_time("add_in_db_media", "posts", "insert")
|
||
@track_file_operations("media")
|
||
async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
|
||
"""
|
||
Добавляет контент одиночного сообщения в базу данных
|
||
|
||
Args:
|
||
sent_message: sent_message объект из Telegram API
|
||
bot_db: Экземпляр базы данных
|
||
|
||
Returns:
|
||
bool: True если контент успешно добавлен, False в случае ошибки
|
||
"""
|
||
start_time = time.time()
|
||
|
||
try:
|
||
# Валидация параметров
|
||
if not sent_message or not bot_db:
|
||
logger.error("add_in_db_media: Неверные параметры - sent_message или bot_db отсутствуют")
|
||
return False
|
||
|
||
post_id = sent_message.message_id # ID поста (это же сообщение)
|
||
content_type = None
|
||
file_id = None
|
||
|
||
# Определяем тип контента и file_id
|
||
if sent_message.photo:
|
||
content_type = 'photo'
|
||
file_id = sent_message.photo[-1].file_id
|
||
elif sent_message.video:
|
||
content_type = 'video'
|
||
file_id = sent_message.video.file_id
|
||
elif sent_message.voice:
|
||
content_type = 'voice'
|
||
file_id = sent_message.voice.file_id
|
||
elif sent_message.audio:
|
||
content_type = 'audio'
|
||
file_id = sent_message.audio.file_id
|
||
elif sent_message.video_note:
|
||
content_type = 'video_note'
|
||
file_id = sent_message.video_note.file_id
|
||
else:
|
||
logger.warning(f"add_in_db_media: Неподдерживаемый тип контента для сообщения {post_id}")
|
||
return False
|
||
|
||
if not file_id:
|
||
logger.error(f"add_in_db_media: file_id отсутствует для сообщения {post_id}")
|
||
return False
|
||
|
||
logger.debug(f"add_in_db_media: Обрабатываю {content_type} для сообщения {post_id}")
|
||
|
||
# Скачиваем файл
|
||
file_path = await download_file(sent_message, file_id=file_id, content_type=content_type)
|
||
if not file_path:
|
||
logger.error(f"add_in_db_media: Не удалось скачать файл {file_id} для сообщения {post_id}")
|
||
return False
|
||
|
||
# Добавляем в базу данных
|
||
success = await bot_db.add_post_content(post_id, sent_message.message_id, file_path, content_type)
|
||
if not success:
|
||
logger.error(f"add_in_db_media: Не удалось добавить контент в БД для сообщения {post_id}")
|
||
# Удаляем скачанный файл при ошибке БД
|
||
try:
|
||
os.remove(file_path)
|
||
logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД")
|
||
except Exception as e:
|
||
logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}")
|
||
return False
|
||
|
||
processing_time = time.time() - start_time
|
||
logger.info(f"add_in_db_media: Контент успешно добавлен для сообщения {post_id}, тип: {content_type}, время: {processing_time:.2f}с")
|
||
|
||
return True
|
||
|
||
except Exception as e:
|
||
processing_time = time.time() - start_time
|
||
logger.error(f"add_in_db_media: Ошибка обработки медиа для сообщения {post_id}: {e}, время: {processing_time:.2f}с")
|
||
return False
|
||
|
||
@track_time("send_media_group_message_to_private_chat", "helper_func")
|
||
@track_errors("helper_func", "send_media_group_message_to_private_chat")
|
||
@track_media_processing("media_group")
|
||
@db_query_time("send_media_group_message_to_private_chat", "posts", "insert")
|
||
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
|
||
media_group: List, bot_db: Any, main_post_id: Optional[int] = None) -> int:
|
||
sent_message = await message.bot.send_media_group(
|
||
chat_id=chat_id,
|
||
media=media_group,
|
||
)
|
||
post = TelegramPost(
|
||
message_id=sent_message[-1].message_id,
|
||
text=sent_message[-1].caption or "",
|
||
author_id=message.from_user.id,
|
||
created_at=int(datetime.now().timestamp())
|
||
)
|
||
await bot_db.add_post(post)
|
||
success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id)
|
||
if not success:
|
||
logger.warning(f"send_media_group_message_to_private_chat: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}")
|
||
message_id = sent_message[-1].message_id
|
||
return message_id
|
||
|
||
@track_time("send_media_group_to_channel", "helper_func")
|
||
@track_errors("helper_func", "send_media_group_to_channel")
|
||
@track_media_processing("media_group")
|
||
async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str):
|
||
"""
|
||
Отправляет медиа-группу с подписью к последнему файлу.
|
||
|
||
Args:
|
||
bot: Экземпляр бота aiogram.
|
||
chat_id: ID чата для отправки.
|
||
post_content: Список кортежей с путями к файлам.
|
||
post_text: Текст подписи.
|
||
"""
|
||
logger.info(f"Начинаю отправку медиа-группы в чат {chat_id}, количество файлов: {len(post_content)}")
|
||
|
||
media = []
|
||
for i, file_path in enumerate(post_content):
|
||
try:
|
||
file = FSInputFile(path=file_path[0])
|
||
type = file_path[1]
|
||
logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path[0]} (тип: {type})")
|
||
|
||
if type == 'video':
|
||
media.append(types.InputMediaVideo(media=file))
|
||
elif type == 'photo':
|
||
media.append(types.InputMediaPhoto(media=file))
|
||
else:
|
||
logger.warning(f"Неизвестный тип файла: {type} для {file_path[0]}")
|
||
except FileNotFoundError:
|
||
logger.error(f"Файл не найден: {file_path[0]}")
|
||
return
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при обработке файла {file_path[0]}: {e}")
|
||
return
|
||
|
||
logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки")
|
||
|
||
# Добавляем подпись к последнему файлу
|
||
if media:
|
||
# Экранируем post_text для безопасного использования в HTML
|
||
safe_post_text = html.escape(str(post_text)) if post_text else ""
|
||
media[-1].caption = safe_post_text
|
||
logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}")
|
||
|
||
try:
|
||
await bot.send_media_group(chat_id=chat_id, media=media)
|
||
logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}")
|
||
raise
|
||
|
||
@track_time("send_text_message", "helper_func")
|
||
@track_errors("helper_func", "send_text_message")
|
||
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
|
||
from .rate_limiter import send_with_rate_limit
|
||
|
||
# Экранируем post_text для безопасного использования в HTML
|
||
safe_post_text = html.escape(str(post_text)) if post_text else ""
|
||
|
||
async def _send_message():
|
||
if markup is None:
|
||
return await message.bot.send_message(
|
||
chat_id=chat_id,
|
||
text=safe_post_text
|
||
)
|
||
else:
|
||
return await message.bot.send_message(
|
||
chat_id=chat_id,
|
||
text=safe_post_text,
|
||
reply_markup=markup
|
||
)
|
||
|
||
sent_message = await send_with_rate_limit(_send_message, chat_id)
|
||
return sent_message.message_id
|
||
|
||
@track_time("send_photo_message", "helper_func")
|
||
@track_errors("helper_func", "send_photo_message")
|
||
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
|
||
markup: types.ReplyKeyboardMarkup = None):
|
||
# Экранируем post_text для безопасного использования в HTML
|
||
safe_post_text = html.escape(str(post_text)) if post_text else ""
|
||
|
||
if markup is None:
|
||
sent_message = await message.bot.send_photo(
|
||
chat_id=chat_id,
|
||
caption=safe_post_text,
|
||
photo=photo
|
||
)
|
||
else:
|
||
sent_message = await message.bot.send_photo(
|
||
chat_id=chat_id,
|
||
caption=safe_post_text,
|
||
photo=photo,
|
||
reply_markup=markup
|
||
)
|
||
return sent_message
|
||
|
||
@track_time("send_video_message", "helper_func")
|
||
@track_errors("helper_func", "send_video_message")
|
||
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
|
||
markup: types.ReplyKeyboardMarkup = None):
|
||
# Экранируем post_text для безопасного использования в HTML
|
||
safe_post_text = html.escape(str(post_text)) if post_text else ""
|
||
|
||
if markup is None:
|
||
sent_message = await message.bot.send_video(
|
||
chat_id=chat_id,
|
||
caption=safe_post_text,
|
||
video=video
|
||
)
|
||
else:
|
||
sent_message = await message.bot.send_video(
|
||
chat_id=chat_id,
|
||
caption=safe_post_text,
|
||
video=video,
|
||
reply_markup=markup
|
||
)
|
||
return sent_message
|
||
|
||
@track_time("send_video_note_message", "helper_func")
|
||
@track_errors("helper_func", "send_video_note_message")
|
||
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
|
||
markup: types.ReplyKeyboardMarkup = None):
|
||
if markup is None:
|
||
sent_message = await message.bot.send_video_note(
|
||
chat_id=chat_id,
|
||
video_note=video_note
|
||
)
|
||
else:
|
||
sent_message = await message.bot.send_video_note(
|
||
chat_id=chat_id,
|
||
video_note=video_note,
|
||
reply_markup=markup
|
||
)
|
||
return sent_message
|
||
|
||
@track_time("send_audio_message", "helper_func")
|
||
@track_errors("helper_func", "send_audio_message")
|
||
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
|
||
markup: types.ReplyKeyboardMarkup = None):
|
||
# Экранируем post_text для безопасного использования в HTML
|
||
safe_post_text = html.escape(str(post_text)) if post_text else ""
|
||
|
||
if markup is None:
|
||
sent_message = await message.bot.send_audio(
|
||
chat_id=chat_id,
|
||
caption=safe_post_text,
|
||
audio=audio
|
||
)
|
||
else:
|
||
sent_message = await message.bot.send_audio(
|
||
chat_id=chat_id,
|
||
caption=safe_post_text,
|
||
audio=audio,
|
||
reply_markup=markup
|
||
)
|
||
return sent_message
|
||
|
||
|
||
@track_time("send_voice_message", "helper_func")
|
||
@track_errors("helper_func", "send_voice_message")
|
||
async def send_voice_message(chat_id, message: types.Message, voice: str,
|
||
markup: types.ReplyKeyboardMarkup = None):
|
||
from .rate_limiter import send_with_rate_limit
|
||
|
||
async def _send_voice():
|
||
if markup is None:
|
||
return await message.bot.send_voice(
|
||
chat_id=chat_id,
|
||
voice=voice
|
||
)
|
||
else:
|
||
return await message.bot.send_voice(
|
||
chat_id=chat_id,
|
||
voice=voice,
|
||
reply_markup=markup
|
||
)
|
||
|
||
return await send_with_rate_limit(_send_voice, chat_id)
|
||
|
||
@track_time("check_access", "helper_func")
|
||
@track_errors("helper_func", "check_access")
|
||
@db_query_time("check_access", "users", "select")
|
||
async def check_access(user_id: int, bot_db):
|
||
"""Проверка прав на совершение действий"""
|
||
from logs.custom_logger import logger
|
||
result = await bot_db.is_admin(user_id)
|
||
logger.info(f"check_access: пользователь {user_id} - результат: {result}")
|
||
return result
|
||
|
||
|
||
def add_days_to_date(days: int):
|
||
"""Прибавляет указанное количество дней к текущей дате и возвращает UNIX timestamp."""
|
||
current_date = datetime.now()
|
||
future_date = current_date + timedelta(days=days)
|
||
return int(future_date.timestamp())
|
||
|
||
@track_time("get_banned_users_list", "helper_func")
|
||
@track_errors("helper_func", "get_banned_users_list")
|
||
@db_query_time("get_banned_users_list", "users", "select")
|
||
async def get_banned_users_list(offset: int, bot_db):
|
||
"""
|
||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||
|
||
Args:
|
||
offset: отступ для запроса в базу данных
|
||
bot_db: Экземпляр базы данных
|
||
|
||
Returns:
|
||
message - текст сообщения
|
||
user_ids - лист кортежей [(user_name: user_id)]
|
||
"""
|
||
users = await bot_db.get_banned_users_from_db_with_limits(limit=7, offset=offset)
|
||
message = "Список заблокированных пользователей:\n"
|
||
|
||
for user in users:
|
||
user_id, ban_reason, unban_date = user
|
||
# Получаем имя пользователя из таблицы users
|
||
username = await bot_db.get_username(user_id)
|
||
full_name = await bot_db.get_full_name_by_id(user_id)
|
||
safe_user_name = username or full_name or f"User_{user_id}"
|
||
|
||
# Экранируем пользовательские данные для безопасного использования
|
||
safe_user_name = html.escape(str(safe_user_name))
|
||
safe_ban_reason = html.escape(str(ban_reason)) if ban_reason else "Причина не указана"
|
||
|
||
# Форматируем дату разбана в человекочитаемый формат
|
||
if unban_date:
|
||
try:
|
||
# Предполагаем, что unban_date это UNIX timestamp
|
||
if isinstance(unban_date, (int, float)):
|
||
unban_datetime = datetime.fromtimestamp(unban_date)
|
||
safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M")
|
||
elif isinstance(unban_date, str):
|
||
# Если это строка, попытаемся её обработать
|
||
try:
|
||
# Попробуем преобразовать строку в timestamp
|
||
timestamp = int(unban_date)
|
||
unban_datetime = datetime.fromtimestamp(timestamp)
|
||
safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M")
|
||
except (ValueError, TypeError):
|
||
# Если не удалось, показываем как есть
|
||
safe_unban_date = html.escape(str(unban_date))
|
||
elif hasattr(unban_date, 'strftime'):
|
||
# Если это datetime объект
|
||
safe_unban_date = unban_date.strftime("%d-%m-%Y %H:%M")
|
||
else:
|
||
# Для всех остальных случаев
|
||
safe_unban_date = html.escape(str(unban_date))
|
||
except (ValueError, TypeError, OSError):
|
||
# В случае ошибки показываем исходное значение
|
||
safe_unban_date = html.escape(str(unban_date))
|
||
else:
|
||
safe_unban_date = "Дата не указана"
|
||
|
||
message += f"**Пользователь:** {safe_user_name}\n"
|
||
message += f"**Причина бана:** {safe_ban_reason}\n"
|
||
message += f"**Дата разбана:** {safe_unban_date}\n\n"
|
||
return message
|
||
|
||
@track_time("get_banned_users_buttons", "helper_func")
|
||
@track_errors("helper_func", "get_banned_users_buttons")
|
||
@db_query_time("get_banned_users_buttons", "users", "select")
|
||
async def get_banned_users_buttons(bot_db):
|
||
"""
|
||
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
|
||
|
||
Args:
|
||
bot_db: Экземпляр базы данных
|
||
|
||
Returns:
|
||
message - текст сообщения
|
||
user_ids - лист кортежей [(user_name: user_id)]
|
||
"""
|
||
users = await bot_db.get_banned_users_from_db()
|
||
user_ids = []
|
||
|
||
for user in users:
|
||
user_id, ban_reason, unban_date = user
|
||
# Получаем имя пользователя из таблицы users
|
||
username = await bot_db.get_username(user_id)
|
||
full_name = await bot_db.get_full_name_by_id(user_id)
|
||
safe_user_name = username or full_name or f"User_{user_id}"
|
||
|
||
# Экранируем user_name для безопасного использования
|
||
safe_user_name = html.escape(str(safe_user_name))
|
||
user_ids.append((safe_user_name, user_id))
|
||
return user_ids
|
||
|
||
@track_time("delete_user_blacklist", "helper_func")
|
||
@track_errors("helper_func", "delete_user_blacklist")
|
||
@db_query_time("delete_user_blacklist", "users", "delete")
|
||
async def delete_user_blacklist(user_id: int, bot_db):
|
||
return await bot_db.delete_user_blacklist(user_id=user_id)
|
||
|
||
|
||
@track_time("check_username_and_full_name", "helper_func")
|
||
@track_errors("helper_func", "check_username_and_full_name")
|
||
@db_query_time("check_username_and_full_name", "users", "select")
|
||
async def check_username_and_full_name(user_id: int, username: str, full_name: str, bot_db):
|
||
"""Проверяет, изменились ли username или full_name пользователя"""
|
||
try:
|
||
username_db = await bot_db.get_username(user_id)
|
||
full_name_db = await bot_db.get_full_name_by_id(user_id)
|
||
return username != username_db or full_name != full_name_db
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при проверке username и full_name: {e}")
|
||
return False
|
||
|
||
@track_time("unban_notifier", "helper_func")
|
||
@track_errors("helper_func", "unban_notifier")
|
||
@db_query_time("unban_notifier", "users", "select")
|
||
async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE):
|
||
# Получение текущего UNIX timestamp
|
||
current_date = datetime.now()
|
||
current_timestamp = int(current_date.timestamp())
|
||
# Получение списка разблокированных пользователей
|
||
unblocked_users = await BotDB.get_users_for_unblock_today(current_timestamp)
|
||
message = "Разблокированные пользователи:\n"
|
||
for user_id in unblocked_users:
|
||
# Получаем имя пользователя из таблицы users
|
||
username = await BotDB.get_username(user_id)
|
||
full_name = await BotDB.get_full_name_by_id(user_id)
|
||
user_name = username or full_name or f"User_{user_id}"
|
||
# Экранируем user_name для безопасного использования
|
||
safe_user_name = html.escape(str(user_name))
|
||
message += f"ID: {user_id}, Имя: {safe_user_name}\n"
|
||
|
||
# Отправка сообщения в канал
|
||
await bot.send_message(GROUP_FOR_MESSAGE, message)
|
||
|
||
|
||
@track_time("update_user_info", "helper_func")
|
||
@track_errors("helper_func", "update_user_info")
|
||
@db_query_time("update_user_info", "users", "update")
|
||
async def update_user_info(source: str, message: types.Message):
|
||
# Собираем данные
|
||
full_name = message.from_user.full_name
|
||
username = message.from_user.username
|
||
first_name = get_first_name(message)
|
||
is_bot = message.from_user.is_bot
|
||
language_code = message.from_user.language_code
|
||
user_id = message.from_user.id
|
||
|
||
# Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было
|
||
user_emoji = await get_random_emoji()
|
||
|
||
if not await BotDB.user_exists(user_id):
|
||
# Create User object with current timestamp
|
||
from database.models import User
|
||
current_timestamp = int(datetime.now().timestamp())
|
||
user = User(
|
||
user_id=user_id,
|
||
first_name=first_name,
|
||
full_name=full_name,
|
||
username=username,
|
||
is_bot=is_bot,
|
||
language_code=language_code,
|
||
emoji=user_emoji,
|
||
has_stickers=False,
|
||
date_added=current_timestamp,
|
||
date_changed=current_timestamp,
|
||
voice_bot_welcome_received=False
|
||
)
|
||
await BotDB.add_user(user)
|
||
else:
|
||
is_need_update = await check_username_and_full_name(user_id, username, full_name, BotDB)
|
||
if is_need_update:
|
||
await BotDB.update_user_info(user_id, username, full_name)
|
||
if source != 'voice':
|
||
await message.answer(
|
||
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}")
|
||
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
|
||
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}')
|
||
sleep(1)
|
||
await BotDB.update_user_date(user_id)
|
||
|
||
|
||
@track_time("check_user_emoji", "helper_func")
|
||
@track_errors("helper_func", "check_user_emoji")
|
||
@db_query_time("check_emoji_for_user", "users", "select")
|
||
async def check_user_emoji(message: types.Message):
|
||
user_id = message.from_user.id
|
||
user_emoji = await BotDB.get_user_emoji(user_id=user_id)
|
||
if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""):
|
||
user_emoji = await get_random_emoji()
|
||
await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji)
|
||
return user_emoji
|
||
|
||
|
||
@track_time("get_random_emoji", "helper_func")
|
||
@track_errors("helper_func", "get_random_emoji")
|
||
@db_query_time("check_emoji", "users", "select")
|
||
async def get_random_emoji():
|
||
attempts = 0
|
||
while attempts < 100:
|
||
user_emoji = random.choice(emoji_list)
|
||
if not await BotDB.check_emoji_exists(user_emoji):
|
||
return user_emoji
|
||
attempts += 1
|
||
logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.")
|
||
return "Эмоджи не определен"
|