Files
telegram-helper-bot/helper_bot/utils/helper_func.py
Andrey 1c6a37bc12 Enhance bot functionality and refactor database interactions
- Added `ca-certificates` installation to Dockerfile for improved network security.
- Updated health check command in Dockerfile to include better timeout handling.
- Refactored `run_helper.py` to implement proper signal handling and logging during shutdown.
- Transitioned database operations to an asynchronous model in `async_db.py`, improving performance and responsiveness.
- Updated database schema to support new foreign key relationships and optimized indexing for better query performance.
- Enhanced various bot handlers to utilize async database methods, improving overall efficiency and user experience.
- Removed obsolete database and fix scripts to streamline the project structure.
2025-09-02 18:22:02 +03:00

622 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import html
import os
import random
from datetime import datetime, timedelta
from time import sleep
from typing import List, Dict, Any, Optional, TYPE_CHECKING
try:
import emoji as _emoji_lib
_emoji_lib_available = True
except ImportError:
_emoji_lib = None
_emoji_lib_available = False
from aiogram import types
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance
from logs.custom_logger import logger
from database.models import TelegramPost
# Local imports - metrics
from .metrics import (
metrics,
track_time,
track_errors,
db_query_time
)
bdf = get_global_instance()
#TODO: поменять архитектуру и подключить правильный BotDB
BotDB = bdf.get_db()
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
if _emoji_lib_available and _emoji_lib is not None:
emoji_list = list(_emoji_lib.EMOJI_DATA.keys())
else:
# Fallback minimal emoji set for environments without the 'emoji' package (e.g., CI tests)
emoji_list = [
"🙂", "😀", "😉", "😎", "🤖", "🦄", "🐱", "🐶", "🍀", "🔥",
"🌟", "🎉", "💡", "🚀", "🌈"
]
def safe_html_escape(text: str) -> str:
"""
Безопасно экранирует текст для использования в HTML разметке.
Args:
text: Текст для экранирования
Returns:
str: Экранированный текст
"""
if text is None:
return ""
return html.escape(str(text))
@track_time("get_first_name", "helper_func")
@track_errors("helper_func", "get_first_name")
def get_first_name(message: types.Message) -> str:
"""
Безопасно получает и экранирует имя пользователя для использования в HTML разметке.
Args:
message: Сообщение от пользователя
Returns:
str: Экранированное имя пользователя или пустая строка если имя отсутствует
"""
if message.from_user.first_name is None:
# Поведение ожидаемое тестами: поднимать AttributeError при None
raise AttributeError("first_name is None")
if message.from_user.first_name:
# Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML
first_name = str(message.from_user.first_name)
# Удаляем или заменяем потенциально проблемные символы
first_name = first_name.replace('\u0cc0', '') # Убираем символ "ೀ" (U+0CC0)
first_name = first_name.replace('\u0cc1', '') # Убираем символ "ೀ" (U+0CC1)
first_name = html.escape(first_name)
return first_name
return ""
def get_text_message(post_text: str, first_name: str, username: str = None):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста (может быть None)
Returns:
str: - Сформированный текст сообщения.
"""
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
# Экранируем username для безопасного использования в HTML
safe_username = html.escape(username) if username else None
# Формируем строку с информацией об авторе
if safe_username:
author_info = f"{first_name} @{safe_username}"
else:
author_info = f"{first_name} (Ник не указан)"
if "неанон" in post_text or "не анон" in post_text:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
elif "анон" in post_text:
return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
async def download_file(message: types.Message, file_id: str):
"""
Скачивает файл по file_id из Telegram.
Args:
message: сообщение
file_id: File ID фотографии
filename: Имя файла, под которым будет сохранено фото
Returns:
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
"""
try:
os.makedirs("files", exist_ok=True)
os.makedirs("files/photos", exist_ok=True)
os.makedirs("files/videos", exist_ok=True)
os.makedirs("files/music", exist_ok=True)
os.makedirs("files/voice", exist_ok=True)
os.makedirs("files/video_notes", exist_ok=True)
file = await message.bot.get_file(file_id)
file_path = os.path.join("files", file.file_path)
await message.bot.download_file(file_path=file.file_path, destination=file_path)
return file_path
except Exception as e:
logger.error(f"Ошибка скачивания фотографии: {e}")
return None
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
"""
Создает MediaGroup.
Args:
album: Album объект из Telegram API.
post_caption: Текст подписи к первому фото.
Returns:
Список InputMediaPhoto (MediaGroup).
"""
# Экранируем post_caption для безопасного использования в HTML
safe_post_caption = html.escape(str(post_caption)) if post_caption else ""
media_group = []
for i, message in enumerate(album):
if message.photo:
file_id = message.photo[-1].file_id
media_type = 'photo'
elif message.video:
file_id = message.video.file_id
media_type = 'video'
elif message.audio:
file_id = message.audio.file_id
media_type = 'audio'
else:
# Если нет фото, видео или аудио, пропускаем сообщение
continue
# Формируем объект MediaGroup с учетом типа медиа
if i == len(album) - 1:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id, caption=safe_post_caption))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id, caption=safe_post_caption))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id, caption=safe_post_caption))
else:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id))
return media_group # Возвращаем MediaGroup
async def add_in_db_media_mediagroup(sent_message, bot_db):
"""
Добавляет контент медиа-группы в базу данных
Args:
sent_message: sent_message объект из Telegram API
bot_db: Экземпляр базы данных
Returns:
None
"""
post_id = sent_message[-1].message_id # ID поста (первое сообщение в медиа-группе)
for i, message in enumerate(sent_message):
if message.photo:
file_id = message.photo[-1].file_id
file_path = await download_file(message, file_id=file_id)
await bot_db.add_post_content(post_id, message.message_id, file_path, 'photo')
elif message.video:
file_id = message.video.file_id
file_path = await download_file(message, file_id=file_id)
await bot_db.add_post_content(post_id, message.message_id, file_path, 'video')
else:
# Если нет фото, видео или аудио, или другой контент, пропускаем сообщение
continue
async def add_in_db_media(sent_message, bot_db):
"""
Добавляет контент одиночного сообщения в базу данных
Args:
sent_message: sent_message объект из Telegram API
bot_db: Экземпляр базы данных
Returns:
None
"""
post_id = sent_message.message_id # ID поста (это же сообщение)
if sent_message.photo:
file_id = sent_message.photo[-1].file_id
file_path = await download_file(sent_message, file_id=file_id)
await bot_db.add_post_content(post_id, sent_message.message_id, file_path, 'photo')
elif sent_message.video:
file_id = sent_message.video.file_id
file_path = await download_file(sent_message, file_id=file_id)
await bot_db.add_post_content(post_id, sent_message.message_id, file_path, 'video')
elif sent_message.voice:
file_id = sent_message.voice.file_id
file_path = await download_file(sent_message, file_id=file_id)
await bot_db.add_post_content(post_id, sent_message.message_id, file_path, 'voice')
elif sent_message.audio:
file_id = sent_message.audio.file_id
file_path = await download_file(sent_message, file_id=file_id)
await bot_db.add_post_content(post_id, sent_message.message_id, file_path, 'audio')
elif sent_message.video_note:
file_id = sent_message.video_note.file_id
file_path = await download_file(sent_message, file_id=file_id)
await bot_db.add_post_content(post_id, sent_message.message_id, file_path, 'video_note')
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: List, bot_db):
sent_message = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
)
post = TelegramPost(
message_id=sent_message[-1].message_id,
text=sent_message[-1].caption or "",
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp())
)
await bot_db.add_post(post)
await add_in_db_media_mediagroup(sent_message, bot_db)
message_id = sent_message[-1].message_id
return message_id
async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам.
post_text: Текст подписи.
"""
media = []
for file_path in post_content:
try:
file = FSInputFile(path=file_path[0])
type = file_path[1]
if type == 'video':
media.append(types.InputMediaVideo(media=file))
if type == 'photo':
media.append(types.InputMediaPhoto(media=file))
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path[0]}")
return
# Добавляем подпись к последнему файлу
if media:
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
media[-1].caption = safe_post_text
await bot.send_media_group(chat_id=chat_id, media=media)
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text
)
message_id = sent_message.message_id
return message_id
else:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text,
reply_markup=markup
)
message_id = sent_message.message_id
return message_id
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=safe_post_text,
photo=photo
)
else:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=safe_post_text,
photo=photo,
reply_markup=markup
)
return sent_message
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=safe_post_text,
video=video
)
else:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=safe_post_text,
video=video,
reply_markup=markup
)
return sent_message
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note
)
else:
sent_message = await message.bot.send_video_note(
chat_id=chat_id,
video_note=video_note,
reply_markup=markup
)
return sent_message
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=safe_post_text,
audio=audio
)
else:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=safe_post_text,
audio=audio,
reply_markup=markup
)
return sent_message
async def send_voice_message(chat_id, message: types.Message, voice: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice
)
else:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice,
reply_markup=markup
)
return sent_message
async def check_access(user_id: int, bot_db):
"""Проверка прав на совершение действий"""
from logs.custom_logger import logger
result = await bot_db.is_admin(user_id)
logger.info(f"check_access: пользователь {user_id} - результат: {result}")
return result
def add_days_to_date(days: int):
"""Прибавляет указанное количество дней к текущей дате и возвращает UNIX timestamp."""
current_date = datetime.now()
future_date = current_date + timedelta(days=days)
return int(future_date.timestamp())
async def get_banned_users_list(offset: int, bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
bot_db: Экземпляр базы данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = await bot_db.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
user_id, ban_reason, unban_date = user
# Получаем имя пользователя из таблицы users
username = await bot_db.get_username(user_id)
full_name = await bot_db.get_full_name_by_id(user_id)
safe_user_name = username or full_name or f"User_{user_id}"
# Экранируем пользовательские данные для безопасного использования
safe_user_name = html.escape(str(safe_user_name))
safe_ban_reason = html.escape(str(ban_reason)) if ban_reason else "Причина не указана"
# Форматируем дату разбана в человекочитаемый формат
if unban_date:
try:
# Предполагаем, что unban_date это UNIX timestamp
if isinstance(unban_date, (int, float)):
unban_datetime = datetime.fromtimestamp(unban_date)
safe_unban_date = unban_datetime.strftime("%d-%m-%Y %H:%M")
else:
# Если это уже datetime объект
safe_unban_date = unban_date.strftime("%d-%m-%Y %H:%M")
except (ValueError, TypeError, OSError):
# В случае ошибки показываем исходное значение
safe_unban_date = html.escape(str(unban_date))
else:
safe_unban_date = "Дата не указана"
message += f"**Пользователь:** {safe_user_name}\n"
message += f"**Причина бана:** {safe_ban_reason}\n"
message += f"**Дата разбана:** {safe_unban_date}\n\n"
return message
async def get_banned_users_buttons(bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
bot_db: Экземпляр базы данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = await bot_db.get_banned_users_from_db()
user_ids = []
for user in users:
user_id, ban_reason, unban_date = user
# Получаем имя пользователя из таблицы users
username = await bot_db.get_username(user_id)
full_name = await bot_db.get_full_name_by_id(user_id)
safe_user_name = username or full_name or f"User_{user_id}"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(safe_user_name))
user_ids.append((safe_user_name, user_id))
return user_ids
async def delete_user_blacklist(user_id: int, bot_db):
return await bot_db.delete_user_blacklist(user_id=user_id)
@track_time("check_username_and_full_name", "helper_func")
@track_errors("helper_func", "check_username_and_full_name")
@db_query_time("check_username_and_full_name", "users", "select")
async def check_username_and_full_name(user_id: int, username: str, full_name: str, bot_db):
"""Проверяет, изменились ли username или full_name пользователя"""
try:
username_db = await bot_db.get_username(user_id)
full_name_db = await bot_db.get_full_name_by_id(user_id)
return username != username_db or full_name != full_name_db
except Exception as e:
logger.error(f"Ошибка при проверке username и full_name: {e}")
return False
async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE):
# Получение текущего UNIX timestamp
current_date = datetime.now()
current_timestamp = int(current_date.timestamp())
# Получение списка разблокированных пользователей
unblocked_users = await BotDB.get_users_for_unblock_today(current_timestamp)
message = "Разблокированные пользователи:\n"
for user_id in unblocked_users:
# Получаем имя пользователя из таблицы users
username = await BotDB.get_username(user_id)
full_name = await BotDB.get_full_name_by_id(user_id)
user_name = username or full_name or f"User_{user_id}"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user_name))
message += f"ID: {user_id}, Имя: {safe_user_name}\n"
# Отправка сообщения в канал
await bot.send_message(GROUP_FOR_MESSAGE, message)
@track_time("update_user_info", "helper_func")
@track_errors("helper_func", "update_user_info")
async def update_user_info(source: str, message: types.Message):
# Собираем данные
full_name = message.from_user.full_name
username = message.from_user.username
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
user_id = message.from_user.id
# Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было
user_emoji = await get_random_emoji()
if not await BotDB.user_exists(user_id):
# Create User object with current timestamp
from database.models import User
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username,
is_bot=is_bot,
language_code=language_code,
emoji=user_emoji,
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False
)
await BotDB.add_user(user)
metrics.record_db_query("add_user", 0.0, "users", "insert")
else:
is_need_update = await check_username_and_full_name(user_id, username, full_name, BotDB)
if is_need_update:
await BotDB.update_user_info(user_id, username, full_name)
metrics.record_db_query("update_user_info", 0.0, "users", "update")
if source != 'voice':
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}")
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}')
sleep(1)
await BotDB.update_user_date(user_id)
metrics.record_db_query("update_user_date", 0.0, "users", "update")
@track_time("check_user_emoji", "helper_func")
@track_errors("helper_func", "check_user_emoji")
@db_query_time("check_emoji_for_user", "users", "select")
async def check_user_emoji(message: types.Message):
user_id = message.from_user.id
user_emoji = await BotDB.get_stickers_info(user_id=user_id)
if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""):
user_emoji = await get_random_emoji()
await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji)
metrics.record_db_query("update_user_emoji", 0.0, "users", "update")
return user_emoji
@track_time("get_random_emoji", "helper_func")
@track_errors("helper_func", "get_random_emoji")
@db_query_time("check_emoji", "users", "select")
async def get_random_emoji():
attempts = 0
while attempts < 100:
user_emoji = random.choice(emoji_list)
if not await BotDB.check_emoji_exists(user_emoji):
return user_emoji
attempts += 1
logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.")
return "Эмоджи не определен"