Files
telegram-helper-bot/helper_bot/handlers/callback/services.py
2026-01-23 00:37:09 +03:00

425 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from datetime import datetime, timedelta
import html
from typing import Dict, Any
from aiogram import Bot
from aiogram.types import CallbackQuery
from helper_bot.utils.helper_func import (
send_text_message, send_photo_message, send_video_message,
send_video_note_message, send_audio_message, send_voice_message,
send_media_group_to_channel, delete_user_blacklist
)
from helper_bot.keyboards.keyboards import create_keyboard_for_ban_reason
from .exceptions import (
UserBlockedBotError, PostNotFoundError, UserNotFoundError,
PublishError, BanError
)
from .constants import (
CONTENT_TYPE_TEXT, CONTENT_TYPE_PHOTO, CONTENT_TYPE_VIDEO,
CONTENT_TYPE_VIDEO_NOTE, CONTENT_TYPE_AUDIO, CONTENT_TYPE_VOICE,
CONTENT_TYPE_MEDIA_GROUP, MESSAGE_POST_PUBLISHED, MESSAGE_POST_DECLINED,
MESSAGE_USER_BANNED_SPAM, ERROR_BOT_BLOCKED
)
from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
track_media_processing,
track_time,
track_errors,
db_query_time
)
class PostPublishService:
def __init__(self, bot: Bot, db, settings: Dict[str, Any]):
# bot может быть None - в этом случае используем бота из контекста сообщения
self.bot = bot
self.db = db
self.settings = settings
self.group_for_posts = settings['Telegram']['group_for_posts']
self.main_public = settings['Telegram']['main_public']
self.important_logs = settings['Telegram']['important_logs']
def _get_bot(self, message) -> Bot:
"""Получает бота из контекста сообщения или использует переданного"""
if self.bot:
return self.bot
return message.bot
@track_time("publish_post", "post_publish_service")
@track_errors("post_publish_service", "publish_post")
async def publish_post(self, call: CallbackQuery) -> None:
"""Основной метод публикации поста"""
# Проверяем, является ли сообщение частью медиагруппы
if call.message.media_group_id:
await self._publish_media_group(call)
return
content_type = call.message.content_type
if content_type == CONTENT_TYPE_TEXT:
await self._publish_text_post(call)
elif content_type == CONTENT_TYPE_PHOTO:
await self._publish_photo_post(call)
elif content_type == CONTENT_TYPE_VIDEO:
await self._publish_video_post(call)
elif content_type == CONTENT_TYPE_VIDEO_NOTE:
await self._publish_video_note_post(call)
elif content_type == CONTENT_TYPE_AUDIO:
await self._publish_audio_post(call)
elif content_type == CONTENT_TYPE_VOICE:
await self._publish_voice_post(call)
else:
raise PublishError(f"Неподдерживаемый тип контента: {content_type}")
@track_time("_publish_text_post", "post_publish_service")
@track_errors("post_publish_service", "_publish_text_post")
async def _publish_text_post(self, call: CallbackQuery) -> None:
"""Публикация текстового поста"""
text_post = html.escape(str(call.message.text))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_text_message(self.main_public, call.message, text_post)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Текст сообщения опубликован в канале {self.main_public}.')
@track_time("_publish_photo_post", "post_publish_service")
@track_errors("post_publish_service", "_publish_photo_post")
async def _publish_photo_post(self, call: CallbackQuery) -> None:
"""Публикация поста с фото"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, text_post_with_photo)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с фото опубликован в канале {self.main_public}.')
@track_time("_publish_video_post", "post_publish_service")
@track_errors("post_publish_service", "_publish_video_post")
async def _publish_video_post(self, call: CallbackQuery) -> None:
"""Публикация поста с видео"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_video_message(self.main_public, call.message, call.message.video.file_id, text_post_with_photo)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с видео опубликован в канале {self.main_public}.')
@track_time("_publish_video_note_post", "post_publish_service")
@track_errors("post_publish_service", "_publish_video_note_post")
async def _publish_video_note_post(self, call: CallbackQuery) -> None:
"""Публикация поста с кружком"""
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_video_note_message(self.main_public, call.message, call.message.video_note.file_id)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с кружком опубликован в канале {self.main_public}.')
@track_time("_publish_audio_post", "post_publish_service")
@track_errors("post_publish_service", "_publish_audio_post")
async def _publish_audio_post(self, call: CallbackQuery) -> None:
"""Публикация поста с аудио"""
text_post_with_photo = html.escape(str(call.message.caption))
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_audio_message(self.main_public, call.message, call.message.audio.file_id, text_post_with_photo)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с аудио опубликован в канале {self.main_public}.')
@track_time("_publish_voice_post", "post_publish_service")
@track_errors("post_publish_service", "_publish_voice_post")
async def _publish_voice_post(self, call: CallbackQuery) -> None:
"""Публикация поста с войсом"""
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "approved")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await send_voice_message(self.main_public, call.message, call.message.voice.file_id)
await self._delete_post_and_notify_author(call, author_id)
logger.info(f'Пост с войсом опубликован в канале {self.main_public}.')
@track_time("_publish_media_group", "post_publish_service")
@track_errors("post_publish_service", "_publish_media_group")
@track_media_processing("media_group")
async def _publish_media_group(self, call: CallbackQuery) -> None:
"""Публикация медиагруппы"""
logger.info(f"Начинаю публикацию медиагруппы. Helper message ID: {call.message.message_id}")
try:
# call.message.message_id - это ID helper сообщения
helper_message_id = call.message.message_id
# Получаем контент медиагруппы по helper_message_id
logger.debug(f"Получаю контент медиагруппы для helper_message_id: {helper_message_id}")
post_content = await self.db.get_post_content_by_helper_id(helper_message_id)
if not post_content:
logger.error(f"Контент медиагруппы не найден в базе данных для helper_message_id: {helper_message_id}")
raise PublishError("Контент медиагруппы не найден в базе данных")
# Получаем текст поста по helper_message_id
logger.debug(f"Получаю текст поста для helper_message_id: {helper_message_id}")
pre_text = await self.db.get_post_text_by_helper_id(helper_message_id)
post_text = html.escape(str(pre_text)) if pre_text else ""
logger.debug(f"Текст поста получен: {'пустой' if not post_text else f'длина: {len(post_text)} символов'}")
# Получаем ID автора по helper_message_id
logger.debug(f"Получаю ID автора для helper_message_id: {helper_message_id}")
author_id = await self.db.get_author_id_by_helper_message_id(helper_message_id)
if not author_id:
logger.error(f"Автор не найден для медиагруппы {helper_message_id}")
raise PostNotFoundError(f"Автор не найден для медиагруппы {helper_message_id}")
logger.debug(f"ID автора получен: {author_id}")
# Отправляем медиагруппу в канал
logger.info(f"Отправляю медиагруппу в канал {self.main_public}")
await send_media_group_to_channel(
bot=self._get_bot(call.message),
chat_id=self.main_public,
post_content=post_content,
post_text=post_text
)
await self.db.update_status_for_media_group_by_helper_id(helper_message_id, "approved")
logger.debug(f"Удаляю медиагруппу и уведомляю автора {author_id}")
await self._delete_media_group_and_notify_author(call, author_id)
logger.info(f'Медиагруппа опубликована в канале {self.main_public}.')
except Exception as e:
logger.error(f"Ошибка при публикации медиагруппы: {e}")
raise PublishError(f"Не удалось опубликовать медиагруппу: {str(e)}")
@track_time("decline_post", "post_publish_service")
@track_errors("post_publish_service", "decline_post")
async def decline_post(self, call: CallbackQuery) -> None:
"""Отклонение поста"""
# Проверяем, является ли сообщение частью медиагруппы (осознанный костыль, т.к. сообщение к которому прикреплен коллбек без медиагруппы)
if call.message.text == CONTENT_TYPE_MEDIA_GROUP:
await self._decline_media_group(call)
return
content_type = call.message.content_type
if content_type in [CONTENT_TYPE_TEXT, CONTENT_TYPE_PHOTO, CONTENT_TYPE_AUDIO,
CONTENT_TYPE_VOICE, CONTENT_TYPE_VIDEO, CONTENT_TYPE_VIDEO_NOTE]:
await self._decline_single_post(call)
else:
logger.error(f"Неподдерживаемый тип контента для отклонения: {content_type}")
raise PublishError(f"Неподдерживаемый тип контента для отклонения: {content_type}")
@track_time("_decline_single_post", "post_publish_service")
@track_errors("post_publish_service", "_decline_single_post")
async def _decline_single_post(self, call: CallbackQuery) -> None:
"""Отклонение одиночного поста"""
author_id = await self._get_author_id(call.message.message_id)
updated_rows = await self.db.update_status_by_message_id(call.message.message_id, "declined")
if updated_rows == 0:
logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'declined'")
raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных")
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)
try:
await send_text_message(author_id, call.message, MESSAGE_POST_DECLINED)
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
logger.warning(f"Пользователь {author_id} заблокировал бота")
raise UserBlockedBotError("Пользователь заблокировал бота")
logger.error(f"Ошибка при отправке уведомления автору {author_id}: {e}")
raise
logger.info(f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
@track_time("_decline_media_group", "post_publish_service")
@track_errors("post_publish_service", "_decline_media_group")
@track_media_processing("media_group")
async def _decline_media_group(self, call: CallbackQuery) -> None:
"""Отклонение медиагруппы"""
await self.db.update_status_for_media_group_by_helper_id(call.message.message_id, "declined")
post_ids = await self.db.get_post_ids_from_telegram_by_last_id(call.message.message_id)
message_ids = post_ids.copy()
message_ids.append(call.message.message_id)
logger.debug(f"Получены ID сообщений для удаления: {message_ids}")
author_id = await self._get_author_id_for_media_group(call.message.message_id)
logger.debug(f"ID автора медиагруппы получен: {author_id}")
logger.debug(f"Удаляю {len(message_ids)} сообщений из группы {self.group_for_posts}")
await self._get_bot(call.message).delete_messages(chat_id=self.group_for_posts, message_ids=message_ids)
try:
logger.debug(f"Отправляю уведомление об отклонении автору медиагруппы {author_id}")
await send_text_message(author_id, call.message, MESSAGE_POST_DECLINED)
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
logger.warning(f"Пользователь {author_id} заблокировал бота")
raise UserBlockedBotError("Пользователь заблокировал бота")
logger.error(f"Ошибка при отправке уведомления автору медиагруппы {author_id}: {e}")
raise
@track_time("_get_author_id", "post_publish_service")
@track_errors("post_publish_service", "_get_author_id")
async def _get_author_id(self, message_id: int) -> int:
"""Получение ID автора по ID сообщения"""
author_id = await self.db.get_author_id_by_message_id(message_id)
if not author_id:
raise PostNotFoundError(f"Автор не найден для сообщения {message_id}")
return author_id
@track_time("_get_author_id_for_media_group", "post_publish_service")
@track_errors("post_publish_service", "_get_author_id_for_media_group")
async def _get_author_id_for_media_group(self, message_id: int) -> int:
"""Получение ID автора для медиагруппы"""
# Сначала пытаемся найти автора по helper_message_id
author_id = await self.db.get_author_id_by_helper_message_id(message_id)
if author_id:
return author_id
# Если не найден, ищем по основному message_id медиагруппы
# Для этого нужно найти связанные сообщения медиагруппы
try:
# Получаем все ID сообщений медиагруппы
post_ids = await self.db.get_post_ids_from_telegram_by_last_id(message_id)
if post_ids:
# Берем первый ID (основное сообщение медиагруппы)
main_message_id = post_ids[0]
author_id = await self.db.get_author_id_by_message_id(main_message_id)
if author_id:
return author_id
except Exception as e:
logger.warning(f"Не удалось найти автора через связанные сообщения: {e}")
# Если все способы не сработали, ищем напрямую
author_id = await self.db.get_author_id_by_message_id(message_id)
if not author_id:
raise PostNotFoundError(f"Автор не найден для медиагруппы {message_id}")
return author_id
@track_time("_delete_post_and_notify_author", "post_publish_service")
@track_errors("post_publish_service", "_delete_post_and_notify_author")
async def _delete_post_and_notify_author(self, call: CallbackQuery, author_id: int) -> None:
"""Удаление поста и уведомление автора"""
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)
try:
await send_text_message(author_id, call.message, MESSAGE_POST_PUBLISHED)
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
raise UserBlockedBotError("Пользователь заблокировал бота")
raise
@track_time("_delete_media_group_and_notify_author", "post_publish_service")
@track_errors("post_publish_service", "_delete_media_group_and_notify_author")
@track_media_processing("media_group")
async def _delete_media_group_and_notify_author(self, call: CallbackQuery, author_id: int) -> None:
"""Удаление медиагруппы и уведомление автора"""
post_ids = await self.db.get_post_ids_from_telegram_by_last_id(call.message.message_id)
#message_ids = post_ids.copy()
post_ids.append(call.message.message_id)
await self._get_bot(call.message).delete_messages(chat_id=self.group_for_posts, message_ids=post_ids)
try:
await send_text_message(author_id, call.message, MESSAGE_POST_PUBLISHED)
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
raise UserBlockedBotError("Пользователь заблокировал бота")
raise
class BanService:
def __init__(self, bot: Bot, db, settings: Dict[str, Any]):
self.bot = bot
self.db = db
self.settings = settings
self.group_for_posts = settings['Telegram']['group_for_posts']
self.important_logs = settings['Telegram']['important_logs']
def _get_bot(self, message) -> Bot:
"""Получает бота из контекста сообщения или использует переданного"""
if self.bot:
return self.bot
return message.bot
@track_time("ban_user_from_post", "ban_service")
@track_errors("ban_service", "ban_user_from_post")
@db_query_time("ban_user_from_post", "users", "mixed")
async def ban_user_from_post(self, call: CallbackQuery) -> None:
"""Бан пользователя за спам"""
author_id = await self.db.get_author_id_by_message_id(call.message.message_id)
if not author_id:
raise UserNotFoundError(f"Автор не найден для сообщения {call.message.message_id}")
current_date = datetime.now()
date_to_unban = int((current_date + timedelta(days=7)).timestamp())
await self.db.set_user_blacklist(
user_id=author_id,
user_name=None,
message_for_user="Спам",
date_to_unban=date_to_unban
)
await self._get_bot(call.message).delete_message(chat_id=self.group_for_posts, message_id=call.message.message_id)
date_str = (current_date + timedelta(days=7)).strftime("%d.%m.%Y %H:%M")
try:
await send_text_message(author_id, call.message, MESSAGE_USER_BANNED_SPAM.format(date=date_str))
except Exception as e:
if str(e) == ERROR_BOT_BLOCKED:
raise UserBlockedBotError("Пользователь заблокировал бота")
raise
logger.info(f"Пользователь {author_id} заблокирован за спам до {date_str}")
@track_time("ban_user", "ban_service")
@track_errors("ban_service", "ban_user")
async def ban_user(self, user_id: str, user_name: str) -> str:
"""Бан пользователя по ID"""
user_name = await self.db.get_username(int(user_id))
if not user_name:
raise UserNotFoundError(f"Пользователь с ID {user_id} не найден в базе")
return user_name
@track_time("unlock_user", "ban_service")
@track_errors("ban_service", "unlock_user")
@db_query_time("unlock_user", "users", "delete")
async def unlock_user(self, user_id: str) -> str:
"""Разблокировка пользователя"""
user_name = await self.db.get_username(int(user_id))
if not user_name:
raise UserNotFoundError(f"Пользователь с ID {user_id} не найден в базе")
await delete_user_blacklist(int(user_id), self.db)
logger.info(f"Разблокирован пользователь с ID: {user_id} username:{user_name}")
return user_name