Files
telegram-helper-bot/helper_bot/utils/helper_func.py

907 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import html
import os
import random
import time
from datetime import datetime, timedelta
from time import sleep
from typing import List, Dict, Any, Optional, TYPE_CHECKING, Union
try:
import emoji as _emoji_lib
_emoji_lib_available = True
except ImportError:
_emoji_lib = None
_emoji_lib_available = False
from aiogram import types
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio, InputMediaDocument
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance
from logs.custom_logger import logger
from database.models import TelegramPost
# Local imports - metrics
from .metrics import (
track_time,
track_errors,
db_query_time,
track_media_processing,
track_file_operations,
)
bdf = get_global_instance()
#TODO: поменять архитектуру и подключить правильный BotDB
BotDB = bdf.get_db()
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
if _emoji_lib_available and _emoji_lib is not None:
emoji_list = list(_emoji_lib.EMOJI_DATA.keys())
else:
# Fallback minimal emoji set for environments without the 'emoji' package (e.g., CI tests)
emoji_list = [
"🙂", "😀", "😉", "😎", "🤖", "🦄", "🐱", "🐶", "🍀", "🔥",
"🌟", "🎉", "💡", "🚀", "🌈"
]
def safe_html_escape(text: str) -> str:
"""
Безопасно экранирует текст для использования в HTML разметке.
Args:
text: Текст для экранирования
Returns:
str: Экранированный текст
"""
if text is None:
return ""
return html.escape(str(text))
@track_time("get_first_name", "helper_func")
@track_errors("helper_func", "get_first_name")
def get_first_name(message: types.Message) -> str:
"""
Безопасно получает и экранирует имя пользователя для использования в HTML разметке.
Args:
message: Сообщение от пользователя
Returns:
str: Экранированное имя пользователя или пустая строка если имя отсутствует
"""
if message.from_user.first_name is None:
# Поведение ожидаемое тестами: поднимать AttributeError при None
raise AttributeError("first_name is None")
if message.from_user.first_name:
# Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML
first_name = str(message.from_user.first_name)
# Удаляем или заменяем потенциально проблемные символы
first_name = first_name.replace('\u0cc0', '') # Убираем символ "ೀ" (U+0CC0)
first_name = first_name.replace('\u0cc1', '') # Убираем символ "ೀ" (U+0CC1)
first_name = html.escape(first_name)
return first_name
return ""
def determine_anonymity(post_text: str) -> bool:
"""
Определяет, является ли пост анонимным на основе ключевых слов в тексте.
Args:
post_text: Текст сообщения
Returns:
bool: True, если "анон" в тексте; False, если "неанон" или "не анон" в тексте;
False по умолчанию (если нет ключевых слов)
"""
if not post_text:
return False
post_text_lower = post_text.lower()
# Сначала проверяем "неанон" или "не анон" (более специфичное условие)
if "неанон" in post_text_lower or "не анон" in post_text_lower:
return False
# Проверяем "анон"
if "анон" in post_text_lower:
return True
# По умолчанию False
return False
def get_text_message(post_text: str, first_name: str, username: str = None, is_anonymous: Optional[bool] = None):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон"
или переданного параметра is_anonymous.
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
is_anonymous: Флаг анонимности (True - анонимно, False - не анонимно, None - legacy, определяется по тексту)
Returns:
str: - Сформированный текст сообщения.
"""
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
# Экранируем username для безопасного использования в HTML
safe_username = html.escape(username) if username else None
# Формируем строку с информацией об авторе
if safe_username:
author_info = f"{first_name} @{safe_username}"
else:
author_info = f"{first_name} (Ник не указан)"
# Если передан is_anonymous, используем его, иначе определяем по тексту (legacy)
# TODO: Уверен можно укоротить
if is_anonymous is not None:
if is_anonymous:
return f'{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'{safe_post_text}\n\nАвтор поста: {author_info}'
else:
# Legacy: определяем по тексту
if "неанон" in post_text or "не анон" in post_text:
return f'{safe_post_text}\n\nАвтор поста: {author_info}'
elif "анон" in post_text:
return f'{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'{safe_post_text}\n\nАвтор поста: {author_info}'
@track_time("download_file", "helper_func")
@track_errors("helper_func", "download_file")
@track_file_operations("unknown")
async def download_file(message: types.Message, file_id: str, content_type: str = None) -> Optional[str]:
"""
Скачивает файл по file_id из Telegram и сохраняет в соответствующую папку.
Args:
message: сообщение
file_id: File ID файла
content_type: тип контента (photo, video, audio, voice, video_note)
Returns:
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
"""
start_time = time.time()
try:
# Валидация параметров
if not file_id or not message or not message.bot:
logger.error("download_file: Неверные параметры - file_id, message или bot отсутствуют")
return None
# Определяем папку по типу контента
type_folders = {
'photo': 'photos',
'video': 'videos',
'audio': 'music',
'voice': 'voice',
'video_note': 'video_notes'
}
folder = type_folders.get(content_type, 'other')
base_path = "files"
full_folder_path = os.path.join(base_path, folder)
# Создаем необходимые папки
os.makedirs(base_path, exist_ok=True)
os.makedirs(full_folder_path, exist_ok=True)
logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}")
# Получаем информацию о файле
file = await message.bot.get_file(file_id)
if not file or not file.file_path:
logger.error(f"download_file: Не удалось получить информацию о файле {file_id}")
return None
# Генерируем уникальное имя файла
original_filename = os.path.basename(file.file_path)
file_extension = os.path.splitext(original_filename)[1] or '.bin'
safe_filename = f"{file_id}{file_extension}"
file_path = os.path.join(full_folder_path, safe_filename)
# Скачиваем файл
await message.bot.download_file(file_path=file.file_path, destination=file_path)
# Проверяем, что файл действительно скачался
if not os.path.exists(file_path):
logger.error(f"download_file: Файл не был скачан - {file_path}")
return None
file_size = os.path.getsize(file_path)
download_time = time.time() - start_time
logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с")
return file_path
except Exception as e:
download_time = time.time() - start_time
logger.error(f"download_file: Ошибка скачивания файла {file_id}: {e}, время: {download_time:.2f}с")
return None
@track_time("prepare_media_group_from_middlewares", "helper_func")
@track_errors("helper_func", "prepare_media_group_from_middlewares")
@track_media_processing("media_group")
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
"""
Создает MediaGroup согласно best practices aiogram 3.x.
Args:
album: Album объект из Telegram API (список сообщений).
post_caption: Текст подписи к первому медиа файлу.
Returns:
Список InputMedia объектов для MediaGroup.
"""
# Экранируем post_caption для безопасного использования в HTML
safe_post_caption = html.escape(str(post_caption)) if post_caption else ""
media_group = []
for i, message in enumerate(album):
if message.photo:
file_id = message.photo[-1].file_id
# Для фото используем InputMediaPhoto
if i == 0: # Первое фото получает подпись
media_group.append(InputMediaPhoto(media=file_id, caption=safe_post_caption))
else:
media_group.append(InputMediaPhoto(media=file_id))
elif message.video:
file_id = message.video.file_id
# Для видео используем InputMediaVideo
if i == 0: # Первое видео получает подпись
media_group.append(InputMediaVideo(media=file_id, caption=safe_post_caption))
else:
media_group.append(InputMediaVideo(media=file_id))
elif message.audio:
file_id = message.audio.file_id
# Для аудио используем InputMediaAudio
if i == 0: # Первое аудио получает подпись
media_group.append(InputMediaAudio(media=file_id, caption=safe_post_caption))
else:
media_group.append(InputMediaAudio(media=file_id))
elif message.document:
file_id = message.document.file_id
# Для документов используем InputMediaDocument (если поддерживается)
if i == 0: # Первый документ получает подпись
media_group.append(InputMediaDocument(media=file_id, caption=safe_post_caption))
else:
media_group.append(InputMediaDocument(media=file_id))
else:
# Если нет поддерживаемого медиа, пропускаем сообщение
continue
return media_group
@track_time("add_in_db_media_mediagroup", "helper_func")
@track_errors("helper_func", "add_in_db_media_mediagroup")
@track_media_processing("media_group")
@db_query_time("add_in_db_media_mediagroup", "posts", "insert")
async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None) -> bool:
"""
Добавляет контент медиа-группы в базу данных
Args:
sent_message: sent_message объект из Telegram API
bot_db: Экземпляр базы данных
main_post_id: ID основного поста медиагруппы (если не указан, используется последний message_id)
Returns:
bool: True если весь контент успешно добавлен, False в случае ошибки
"""
start_time = time.time()
try:
# Валидация параметров
if not sent_message or not bot_db or not isinstance(sent_message, list):
logger.error("add_in_db_media_mediagroup: Неверные параметры - sent_message, bot_db или sent_message не является списком")
return False
if len(sent_message) == 0:
logger.warning("add_in_db_media_mediagroup: Пустая медиагруппа")
return False
# Используем переданный main_post_id или ID последнего сообщения
post_id = main_post_id or sent_message[-1].message_id
logger.debug(f"add_in_db_media_mediagroup: Обрабатываю медиагруппу из {len(sent_message)} сообщений, post_id: {post_id}")
processed_count = 0
failed_count = 0
for i, message in enumerate(sent_message):
try:
content_type = None
file_id = None
# Определяем тип контента и file_id
if message.photo:
content_type = 'photo'
file_id = message.photo[-1].file_id
elif message.video:
content_type = 'video'
file_id = message.video.file_id
elif message.audio:
content_type = 'audio'
file_id = message.audio.file_id
elif message.voice:
content_type = 'voice'
file_id = message.voice.file_id
elif message.video_note:
content_type = 'video_note'
file_id = message.video_note.file_id
else:
logger.warning(f"add_in_db_media_mediagroup: Неподдерживаемый тип контента в сообщении {i+1}/{len(sent_message)}")
failed_count += 1
continue
if not file_id:
logger.error(f"add_in_db_media_mediagroup: file_id отсутствует в сообщении {i+1}/{len(sent_message)}")
failed_count += 1
continue
logger.debug(f"add_in_db_media_mediagroup: Обрабатываю {content_type} в сообщении {i+1}/{len(sent_message)}")
# Скачиваем файл
file_path = await download_file(message, file_id=file_id, content_type=content_type)
if not file_path:
logger.error(f"add_in_db_media_mediagroup: Не удалось скачать файл {file_id} в сообщении {i+1}/{len(sent_message)}")
failed_count += 1
continue
# Добавляем в базу данных
success = await bot_db.add_post_content(post_id, message.message_id, file_path, content_type)
if not success:
logger.error(f"add_in_db_media_mediagroup: Не удалось добавить контент в БД для сообщения {i+1}/{len(sent_message)}")
# Удаляем скачанный файл при ошибке БД
try:
os.remove(file_path)
logger.debug(f"add_in_db_media_mediagroup: Удален файл {file_path} после ошибки БД")
except Exception as e:
logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}")
failed_count += 1
continue
processed_count += 1
logger.debug(f"add_in_db_media_mediagroup: Успешно обработано сообщение {i+1}/{len(sent_message)}")
except Exception as e:
logger.error(f"add_in_db_media_mediagroup: Ошибка обработки сообщения {i+1}/{len(sent_message)}: {e}")
failed_count += 1
continue
processing_time = time.time() - start_time
if processed_count == 0:
logger.error(f"add_in_db_media_mediagroup: Не удалось обработать ни одного сообщения из медиагруппы {post_id}")
return False
if failed_count > 0:
logger.warning(f"add_in_db_media_mediagroup: Обработано {processed_count}/{len(sent_message)} сообщений медиагруппы {post_id}, ошибок: {failed_count}")
else:
logger.info(f"add_in_db_media_mediagroup: Успешно обработана медиагруппа {post_id} - {processed_count} сообщений, время: {processing_time:.2f}с")
return failed_count == 0
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"add_in_db_media_mediagroup: Критическая ошибка обработки медиагруппы: {e}, время: {processing_time:.2f}с")
return False
@track_time("add_in_db_media", "helper_func")
@track_errors("helper_func", "add_in_db_media")
@track_media_processing("media_group")
@db_query_time("add_in_db_media", "posts", "insert")
@track_file_operations("media")
async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
"""
Добавляет контент одиночного сообщения в базу данных
Args:
sent_message: sent_message объект из Telegram API
bot_db: Экземпляр базы данных
Returns:
bool: True если контент успешно добавлен, False в случае ошибки
"""
start_time = time.time()
try:
# Валидация параметров
if not sent_message or not bot_db:
logger.error("add_in_db_media: Неверные параметры - sent_message или bot_db отсутствуют")
return False
post_id = sent_message.message_id # ID поста (это же сообщение)
content_type = None
file_id = None
# Определяем тип контента и file_id
if sent_message.photo:
content_type = 'photo'
file_id = sent_message.photo[-1].file_id
elif sent_message.video:
content_type = 'video'
file_id = sent_message.video.file_id
elif sent_message.voice:
content_type = 'voice'
file_id = sent_message.voice.file_id
elif sent_message.audio:
content_type = 'audio'
file_id = sent_message.audio.file_id
elif sent_message.video_note:
content_type = 'video_note'
file_id = sent_message.video_note.file_id
else:
logger.warning(f"add_in_db_media: Неподдерживаемый тип контента для сообщения {post_id}")
return False
if not file_id:
logger.error(f"add_in_db_media: file_id отсутствует для сообщения {post_id}")
return False
logger.debug(f"add_in_db_media: Обрабатываю {content_type} для сообщения {post_id}")
# Скачиваем файл
file_path = await download_file(sent_message, file_id=file_id, content_type=content_type)
if not file_path:
logger.error(f"add_in_db_media: Не удалось скачать файл {file_id} для сообщения {post_id}")
return False
# Добавляем в базу данных
success = await bot_db.add_post_content(post_id, sent_message.message_id, file_path, content_type)
if not success:
logger.error(f"add_in_db_media: Не удалось добавить контент в БД для сообщения {post_id}")
# Удаляем скачанный файл при ошибке БД
try:
os.remove(file_path)
logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД")
except Exception as e:
logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}")
return False
processing_time = time.time() - start_time
logger.info(f"add_in_db_media: Контент успешно добавлен для сообщения {post_id}, тип: {content_type}, время: {processing_time:.2f}с")
return True
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"add_in_db_media: Ошибка обработки медиа для сообщения {post_id}: {e}, время: {processing_time:.2f}с")
return False
@track_time("send_media_group_message_to_private_chat", "helper_func")
@track_errors("helper_func", "send_media_group_message_to_private_chat")
@track_media_processing("media_group")
@db_query_time("send_media_group_message_to_private_chat", "posts", "insert")
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: List, bot_db: Any, main_post_id: Optional[int] = None) -> int:
sent_message = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
)
post = TelegramPost(
message_id=sent_message[-1].message_id,
text=sent_message[-1].caption or "",
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
)
await bot_db.add_post(post)
success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id)
if not success:
logger.warning(f"send_media_group_message_to_private_chat: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}")
message_id = sent_message[-1].message_id
return message_id
@track_time("send_media_group_to_channel", "helper_func")
@track_errors("helper_func", "send_media_group_to_channel")
@track_media_processing("media_group")
async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам.
post_text: Текст подписи.
"""
logger.info(f"Начинаю отправку медиа-группы в чат {chat_id}, количество файлов: {len(post_content)}")
media = []
for i, file_path in enumerate(post_content):
try:
file = FSInputFile(path=file_path[0])
type = file_path[1]
logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path[0]} (тип: {type})")
if type == 'video':
media.append(types.InputMediaVideo(media=file))
elif type == 'photo':
media.append(types.InputMediaPhoto(media=file))
else:
logger.warning(f"Неизвестный тип файла: {type} для {file_path[0]}")
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path[0]}")
return
except Exception as e:
logger.error(f"Ошибка при обработке файла {file_path[0]}: {e}")
return
logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки")
# Добавляем подпись к последнему файлу
if media:
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
media[-1].caption = safe_post_text
logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}")
try:
await bot.send_media_group(chat_id=chat_id, media=media)
logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}")
except Exception as e:
logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}")
raise
@track_time("send_text_message", "helper_func")
@track_errors("helper_func", "send_text_message")
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
from .rate_limiter import send_with_rate_limit
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
async def _send_message():
if markup is None:
return await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text
)
else:
return await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text,
reply_markup=markup
)
sent_message = await send_with_rate_limit(_send_message, chat_id)
return sent_message.message_id
@track_time("send_photo_message", "helper_func")
@track_errors("helper_func", "send_photo_message")
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=safe_post_text,
photo=photo
)
else:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=safe_post_text,
photo=photo,
reply_markup=markup
)
return sent_message
@track_time("send_video_message", "helper_func")
@track_errors("helper_func", "send_video_message")
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=safe_post_text,
video=video
)
else:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=safe_post_text,
video=video,
reply_markup=markup
)
return sent_message
@track_time("send_video_note_message", "helper_func")
@track_errors("helper_func", "send_video_note_message")
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note
)
else:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note,
reply_markup=markup
)
return sent_message
@track_time("send_audio_message", "helper_func")
@track_errors("helper_func", "send_audio_message")
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=safe_post_text,
audio=audio
)
else:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=safe_post_text,
audio=audio,
reply_markup=markup
)
return sent_message
@track_time("send_voice_message", "helper_func")
@track_errors("helper_func", "send_voice_message")
async def send_voice_message(chat_id, message: types.Message, voice: str,
markup: types.ReplyKeyboardMarkup = None):
from .rate_limiter import send_with_rate_limit
async def _send_voice():
if markup is None:
return await message.bot.send_voice(
chat_id=chat_id,
voice=voice
)
else:
return await message.bot.send_voice(
chat_id=chat_id,
voice=voice,
reply_markup=markup
)
return await send_with_rate_limit(_send_voice, chat_id)
@track_time("check_access", "helper_func")
@track_errors("helper_func", "check_access")
@db_query_time("check_access", "users", "select")
async def check_access(user_id: int, bot_db):
"""Проверка прав на совершение действий"""
from logs.custom_logger import logger
result = await bot_db.is_admin(user_id)
logger.info(f"check_access: пользователь {user_id} - результат: {result}")
return result
def add_days_to_date(days: int):
"""Прибавляет указанное количество дней к текущей дате и возвращает UNIX timestamp."""
current_date = datetime.now()
future_date = current_date + timedelta(days=days)
return int(future_date.timestamp())
@track_time("get_banned_users_list", "helper_func")
@track_errors("helper_func", "get_banned_users_list")
@db_query_time("get_banned_users_list", "users", "select")
async def get_banned_users_list(offset: int, bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
bot_db: Экземпляр базы данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = await bot_db.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
user_id, ban_reason, unban_date = user
# Получаем имя пользователя из таблицы users
username = await bot_db.get_username(user_id)
full_name = await bot_db.get_full_name_by_id(user_id)
safe_user_name = username or full_name or f"User_{user_id}"
# Экранируем пользовательские данные для безопасного использования
safe_user_name = html.escape(str(safe_user_name))
safe_ban_reason = html.escape(str(ban_reason)) if ban_reason else "Причина не указана"
# Форматируем дату разбана в человекочитаемый формат
if unban_date:
try:
# Предполагаем, что unban_date это UNIX timestamp
if isinstance(unban_date, (int, float)):
unban_datetime = datetime.fromtimestamp(unban_date)
safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M")
elif isinstance(unban_date, str):
# Если это строка, попытаемся её обработать
try:
# Попробуем преобразовать строку в timestamp
timestamp = int(unban_date)
unban_datetime = datetime.fromtimestamp(timestamp)
safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M")
except (ValueError, TypeError):
# Если не удалось, показываем как есть
safe_unban_date = html.escape(str(unban_date))
elif hasattr(unban_date, 'strftime'):
# Если это datetime объект
safe_unban_date = unban_date.strftime("%d-%m-%Y %H:%M")
else:
# Для всех остальных случаев
safe_unban_date = html.escape(str(unban_date))
except (ValueError, TypeError, OSError):
# В случае ошибки показываем исходное значение
safe_unban_date = html.escape(str(unban_date))
else:
safe_unban_date = "Дата не указана"
message += f"**Пользователь:** {safe_user_name}\n"
message += f"**Причина бана:** {safe_ban_reason}\n"
message += f"**Дата разбана:** {safe_unban_date}\n\n"
return message
@track_time("get_banned_users_buttons", "helper_func")
@track_errors("helper_func", "get_banned_users_buttons")
@db_query_time("get_banned_users_buttons", "users", "select")
async def get_banned_users_buttons(bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
bot_db: Экземпляр базы данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = await bot_db.get_banned_users_from_db()
user_ids = []
for user in users:
user_id, ban_reason, unban_date = user
# Получаем имя пользователя из таблицы users
username = await bot_db.get_username(user_id)
full_name = await bot_db.get_full_name_by_id(user_id)
safe_user_name = username or full_name or f"User_{user_id}"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(safe_user_name))
user_ids.append((safe_user_name, user_id))
return user_ids
@track_time("delete_user_blacklist", "helper_func")
@track_errors("helper_func", "delete_user_blacklist")
@db_query_time("delete_user_blacklist", "users", "delete")
async def delete_user_blacklist(user_id: int, bot_db):
return await bot_db.delete_user_blacklist(user_id=user_id)
@track_time("check_username_and_full_name", "helper_func")
@track_errors("helper_func", "check_username_and_full_name")
@db_query_time("check_username_and_full_name", "users", "select")
async def check_username_and_full_name(user_id: int, username: str, full_name: str, bot_db):
"""Проверяет, изменились ли username или full_name пользователя"""
try:
username_db = await bot_db.get_username(user_id)
full_name_db = await bot_db.get_full_name_by_id(user_id)
return username != username_db or full_name != full_name_db
except Exception as e:
logger.error(f"Ошибка при проверке username и full_name: {e}")
return False
@track_time("unban_notifier", "helper_func")
@track_errors("helper_func", "unban_notifier")
@db_query_time("unban_notifier", "users", "select")
async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE):
# Получение текущего UNIX timestamp
current_date = datetime.now()
current_timestamp = int(current_date.timestamp())
# Получение списка разблокированных пользователей
unblocked_users = await BotDB.get_users_for_unblock_today(current_timestamp)
message = "Разблокированные пользователи:\n"
for user_id in unblocked_users:
# Получаем имя пользователя из таблицы users
username = await BotDB.get_username(user_id)
full_name = await BotDB.get_full_name_by_id(user_id)
user_name = username or full_name or f"User_{user_id}"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user_name))
message += f"ID: {user_id}, Имя: {safe_user_name}\n"
# Отправка сообщения в канал
await bot.send_message(GROUP_FOR_MESSAGE, message)
@track_time("update_user_info", "helper_func")
@track_errors("helper_func", "update_user_info")
@db_query_time("update_user_info", "users", "update")
async def update_user_info(source: str, message: types.Message):
# Собираем данные
full_name = message.from_user.full_name
username = message.from_user.username
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
user_id = message.from_user.id
# Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было
user_emoji = await get_random_emoji()
if not await BotDB.user_exists(user_id):
# Create User object with current timestamp
from database.models import User
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username,
is_bot=is_bot,
language_code=language_code,
emoji=user_emoji,
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False
)
await BotDB.add_user(user)
else:
is_need_update = await check_username_and_full_name(user_id, username, full_name, BotDB)
if is_need_update:
await BotDB.update_user_info(user_id, username, full_name)
if source != 'voice':
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}")
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}')
sleep(1)
await BotDB.update_user_date(user_id)
@track_time("check_user_emoji", "helper_func")
@track_errors("helper_func", "check_user_emoji")
@db_query_time("check_emoji_for_user", "users", "select")
async def check_user_emoji(message: types.Message):
user_id = message.from_user.id
user_emoji = await BotDB.get_user_emoji(user_id=user_id)
if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""):
user_emoji = await get_random_emoji()
await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji)
return user_emoji
@track_time("get_random_emoji", "helper_func")
@track_errors("helper_func", "get_random_emoji")
@db_query_time("check_emoji", "users", "select")
async def get_random_emoji():
attempts = 0
while attempts < 100:
user_emoji = random.choice(emoji_list)
if not await BotDB.check_emoji_exists(user_emoji):
return user_emoji
attempts += 1
logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.")
return "Эмоджи не определен"