"""Service classes for private handlers""" # Standard library imports import asyncio import html import random from dataclasses import dataclass from datetime import datetime from pathlib import Path from typing import Any, Callable, Dict, Protocol, Union # Third-party imports from aiogram import types from aiogram.types import FSInputFile from database.models import TelegramPost, User from helper_bot.keyboards import get_reply_keyboard_for_post # Local imports - utilities from helper_bot.utils.helper_func import ( add_in_db_media, check_username_and_full_name, determine_anonymity, get_first_name, get_publish_text, get_text_message, prepare_media_group_from_middlewares, send_audio_message, send_media_group_message_to_private_chat, send_photo_message, send_text_message, send_video_message, send_video_note_message, send_voice_message, ) # Local imports - metrics from helper_bot.utils.metrics import ( db_query_time, track_errors, track_file_operations, track_media_processing, track_time, ) from logs.custom_logger import logger class DatabaseProtocol(Protocol): """Protocol for database operations""" async def user_exists(self, user_id: int) -> bool: ... async def add_user(self, user: User) -> None: ... async def update_user_info( self, user_id: int, username: str = None, full_name: str = None ) -> None: ... async def update_user_date(self, user_id: int) -> None: ... async def add_post(self, post: TelegramPost) -> None: ... async def update_stickers_info(self, user_id: int) -> None: ... async def add_message( self, message_text: str, user_id: int, message_id: int, date: int = None ) -> None: ... async def update_helper_message( self, message_id: int, helper_message_id: int ) -> None: ... @dataclass class BotSettings: """Bot configuration settings""" group_for_posts: str group_for_message: str main_public: str group_for_logs: str important_logs: str preview_link: str logs: str test: str class UserService: """Service for user-related operations""" def __init__(self, db: DatabaseProtocol, settings: BotSettings) -> None: self.db = db self.settings = settings @track_time("update_user_activity", "user_service") @track_errors("user_service", "update_user_activity") @db_query_time("update_user_activity", "users", "update") async def update_user_activity(self, user_id: int) -> None: """Update user's last activity timestamp with metrics tracking""" await self.db.update_user_date(user_id) @track_time("ensure_user_exists", "user_service") @track_errors("user_service", "ensure_user_exists") @db_query_time("ensure_user_exists", "users", "insert") async def ensure_user_exists(self, message: types.Message) -> None: """Ensure user exists in database, create if needed with metrics tracking""" user_id = message.from_user.id full_name = message.from_user.full_name # Сохраняем только реальный username, если его нет - сохраняем None/пустую строку username = message.from_user.username first_name = get_first_name(message) is_bot = message.from_user.is_bot language_code = message.from_user.language_code # Create User object with current timestamp current_timestamp = int(datetime.now().timestamp()) user = User( user_id=user_id, first_name=first_name, full_name=full_name, username=username, # Может быть None - это нормально is_bot=is_bot, language_code=language_code, emoji="", has_stickers=False, date_added=current_timestamp, date_changed=current_timestamp, voice_bot_welcome_received=False, ) # Пытаемся создать пользователя (если уже существует - игнорируем) # Это устраняет race condition и упрощает логику await self.db.add_user(user) # Проверяем, нужно ли обновить информацию о существующем пользователе is_need_update = await check_username_and_full_name( user_id, username, full_name, self.db ) if is_need_update: await self.db.update_user_info(user_id, username, full_name) safe_full_name = ( html.escape(full_name) if full_name else "Неизвестный пользователь" ) # Для отображения используем подстановочное значение, но в БД сохраняем только реальный username safe_username = html.escape(username) if username else "Без никнейма" await message.answer( f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}" ) await message.bot.send_message( chat_id=self.settings.group_for_logs, text=f"Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}", ) await self.db.update_user_date(user_id) async def log_user_message(self, message: types.Message) -> None: """Forward user message to logs group with metrics tracking""" await message.forward(chat_id=self.settings.group_for_logs) def get_safe_user_info(self, message: types.Message) -> tuple[str, str]: """Get safely escaped user information for logging""" full_name = message.from_user.full_name or "Неизвестный пользователь" username = message.from_user.username or "Без никнейма" return html.escape(full_name), html.escape(username) async def format_user_message_for_admins( self, user_id: int, full_name: str, username: str, message_text: str ) -> str: """ Форматирует сообщение пользователя для отправки админам с обогащёнными данными. Args: user_id: ID пользователя full_name: Полное имя пользователя username: Username пользователя (может быть None) message_text: Текст сообщения пользователя Returns: Отформатированное сообщение для админов """ safe_full_name = ( html.escape(full_name) if full_name else "Неизвестный пользователь" ) safe_username = html.escape(username) if username else None safe_message_text = html.escape(message_text) if message_text else "" # Формируем строку с информацией об авторе if safe_username: author_info = f"{safe_full_name} (@{safe_username})" else: author_info = f"{safe_full_name} (Ник не указан)" # Получаем статистику постов approved, declined, suggest = await self.db.get_user_posts_stats(user_id) total_posts = approved + declined + suggest # Получаем последний пост last_post = await self.db.get_last_post_by_author(user_id) if last_post: if len(last_post) > 80: last_post_display = f'"{html.escape(last_post[:80])}..."' else: last_post_display = f'"{html.escape(last_post)}"' else: last_post_display = "Нет постов" # Получаем дату регистрации user_info = await self.db.get_user_by_id(user_id) if user_info and user_info.date_added: date_added = datetime.fromtimestamp(user_info.date_added).strftime( "%d.%m.%Y" ) else: date_added = "Неизвестно" # Получаем информацию о банах ban_count = await self.db.get_user_ban_count(user_id) ban_section = "" if ban_count > 0: last_ban = await self.db.get_last_ban_info(user_id) if last_ban: date_ban, reason, date_unban = last_ban ban_date_str = datetime.fromtimestamp(date_ban).strftime("%d.%m.%Y") reason_display = html.escape(reason) if reason else "Не указана" if date_unban: unban_date_str = datetime.fromtimestamp(date_unban).strftime( "%d.%m.%Y %H:%M" ) last_ban_info = ( f" Последний: {ban_date_str}, причина «{reason_display}», " f"истёк {unban_date_str}" ) else: last_ban_info = ( f" Последний: {ban_date_str}, причина «{reason_display}», " f"активен" ) ban_section = f"\n\n🚫 Банов: {ban_count}\n{last_ban_info}" # Формируем итоговое сообщение formatted_message = ( f"👤 От: {author_info} | ID: {user_id}\n\n" f"📊 Постов в базе: {total_posts}\n" f"📝 Последний пост: {last_post_display}\n" f"📅 В боте с: {date_added}" f"{ban_section}\n\n" f"---\n" f"Сообщение пользователя:\n\n" f"{safe_message_text}" ) return formatted_message class PostService: """Service for post-related operations""" def __init__( self, db: DatabaseProtocol, settings: BotSettings, s3_storage=None, scoring_manager=None, auto_moderation_service: "AutoModerationService" = None, ) -> None: self.db = db self.settings = settings self.s3_storage = s3_storage self.scoring_manager = scoring_manager self.auto_moderation = auto_moderation_service async def _save_media_background( self, sent_message: types.Message, bot_db: Any, s3_storage ) -> None: """Сохраняет медиа в фоне, чтобы не блокировать ответ пользователю""" try: success = await add_in_db_media(sent_message, bot_db, s3_storage) if not success: logger.warning( f"_save_media_background: Не удалось сохранить медиа для поста {sent_message.message_id}" ) except Exception as e: logger.error( f"_save_media_background: Ошибка при сохранении медиа для поста {sent_message.message_id}: {e}" ) async def _get_scores(self, text: str) -> tuple: """ Получает скоры для текста поста. Returns: Tuple (deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json) """ if not self.scoring_manager or not text or not text.strip(): return None, None, None, None, None try: scores = await self.scoring_manager.score_post(text) # Формируем JSON для сохранения в БД import json ml_scores_json = ( json.dumps(scores.to_json_dict()) if scores.has_any_score() else None ) # Получаем данные от RAG rag_confidence = scores.rag.confidence if scores.rag else None rag_score_pos_only = ( scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None ) return ( scores.deepseek_score, scores.rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, ) except Exception as e: logger.error(f"PostService: Ошибка получения скоров: {e}") return None, None, None, None, None async def _save_scores_background( self, message_id: int, ml_scores_json: str ) -> None: """Сохраняет скоры в БД в фоне.""" if ml_scores_json: try: await self.db.update_ml_scores(message_id, ml_scores_json) except Exception as e: logger.error( f"PostService: Ошибка сохранения скоров для {message_id}: {e}" ) async def _add_submitted_post_background( self, text: str, post_id: int, rag_score: float = None ) -> None: """Индексирует пост в RAG submitted collection в фоне.""" try: if self.scoring_manager: await self.scoring_manager.add_submitted_post(text, post_id, rag_score) except Exception as e: logger.warning(f"PostService: Ошибка добавления поста в submitted: {e}") async def _get_scores_with_error_handling(self, text: str) -> tuple: """ Получает скоры для текста поста с обработкой ошибок. Returns: Tuple (deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, error_message) error_message будет None если все ок, или строка с описанием ошибки """ if not self.scoring_manager: # Скоры выключены в .env - это нормально return None, None, None, None, None, None if not text or not text.strip(): return None, None, None, None, None, None try: scores = await self.scoring_manager.score_post(text) # Формируем JSON для сохранения в БД import json ml_scores_json = ( json.dumps(scores.to_json_dict()) if scores.has_any_score() else None ) # Получаем данные от RAG rag_confidence = scores.rag.confidence if scores.rag else None rag_score_pos_only = ( scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None ) return ( scores.deepseek_score, scores.rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, None, ) except Exception as e: logger.error(f"PostService: Ошибка получения скоров: {e}") # Возвращаем частичные скоры если есть, или сообщение об ошибке error_message = "Не удалось рассчитать скоры" return None, None, None, None, None, error_message @track_time("_handle_auto_action", "post_service") @track_errors("post_service", "_handle_auto_action") async def _handle_auto_action( self, auto_action: str, message: types.Message, content_type: str, original_raw_text: str, first_name: str, is_anonymous: bool, rag_score: float, ml_scores_json: str = None, album: Union[list, None] = None, ) -> None: """ Обрабатывает автоматическое действие (публикация или отклонение). Args: auto_action: 'publish' или 'decline' message: Сообщение пользователя content_type: Тип контента original_raw_text: Оригинальный текст поста first_name: Имя автора is_anonymous: Флаг анонимности rag_score: Скор RAG модели ml_scores_json: JSON со скорами для БД album: Медиагруппа (если есть) """ author_id = message.from_user.id author_name = message.from_user.full_name or first_name author_username = message.from_user.username or "" try: if auto_action == "publish": await self._auto_publish( message=message, content_type=content_type, original_raw_text=original_raw_text, first_name=first_name, is_anonymous=is_anonymous, rag_score=rag_score, ml_scores_json=ml_scores_json, album=album, ) else: # decline await self._auto_decline(message=message, author_id=author_id) # Логируем действие if self.auto_moderation: await self.auto_moderation.log_auto_action( bot=message.bot, action=auto_action, author_id=author_id, author_name=author_name, author_username=author_username, rag_score=rag_score, post_text=original_raw_text, ) except Exception as e: logger.error( f"PostService: Ошибка авто-{auto_action} для message_id={message.message_id}: {e}" ) raise @track_time("_auto_publish", "post_service") @track_errors("post_service", "_auto_publish") async def _auto_publish( self, message: types.Message, content_type: str, original_raw_text: str, first_name: str, is_anonymous: bool, rag_score: float, ml_scores_json: str = None, album: Union[list, None] = None, ) -> None: """Автоматически публикует пост в канал.""" author_id = message.from_user.id username = message.from_user.username # Формируем текст для публикации (без скоров и разметки) formatted_text = get_publish_text( original_raw_text, first_name, username, is_anonymous ) sent_message = None # Публикуем в зависимости от типа контента if content_type == "text": sent_message = await message.bot.send_message( chat_id=self.settings.main_public, text=formatted_text, ) elif content_type == "photo": sent_message = await message.bot.send_photo( chat_id=self.settings.main_public, photo=message.photo[-1].file_id, caption=formatted_text, ) elif content_type == "video": sent_message = await message.bot.send_video( chat_id=self.settings.main_public, video=message.video.file_id, caption=formatted_text, ) elif content_type == "audio": sent_message = await message.bot.send_audio( chat_id=self.settings.main_public, audio=message.audio.file_id, caption=formatted_text, ) elif content_type == "voice": sent_message = await message.bot.send_voice( chat_id=self.settings.main_public, voice=message.voice.file_id, ) elif content_type == "video_note": sent_message = await message.bot.send_video_note( chat_id=self.settings.main_public, video_note=message.video_note.file_id, ) elif content_type == "media_group" and album: # TODO: Реализовать авто-публикацию медиагрупп при необходимости logger.warning( "PostService: Авто-публикация медиагрупп пока не поддерживается" ) return if sent_message: # Сохраняем пост в БД со статусом approved post = TelegramPost( message_id=sent_message.message_id, text=original_raw_text, author_id=author_id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, status="approved", ) await self.db.add_post(post) # Сохраняем скоры если есть if ml_scores_json: asyncio.create_task( self._save_scores_background( sent_message.message_id, ml_scores_json ) ) # Индексируем пост в RAG if self.scoring_manager and original_raw_text and original_raw_text.strip(): asyncio.create_task( self._add_submitted_post_background( original_raw_text, sent_message.message_id, rag_score ) ) # Уведомляем автора try: await message.bot.send_message( chat_id=author_id, text="Твой пост был выложен🥰", ) except Exception as e: logger.warning( f"PostService: Не удалось уведомить автора {author_id}: {e}" ) logger.info( f"PostService: Пост авто-опубликован в {self.settings.main_public}, " f"author_id={author_id}, rag_score={rag_score:.2f}" ) @track_time("_auto_decline", "post_service") @track_errors("post_service", "_auto_decline") async def _auto_decline(self, message: types.Message, author_id: int) -> None: """Автоматически отклоняет пост.""" # Обучаем RAG на отклоненном посте if self.scoring_manager: original_text = message.text or message.caption or "" if original_text and original_text.strip(): try: await self.scoring_manager.on_post_declined(original_text) except Exception as e: logger.warning( f"PostService: Ошибка обучения RAG на отклоненном посте: {e}" ) # Уведомляем автора try: await message.bot.send_message( chat_id=author_id, text="Твой пост был отклонен😔", ) except Exception as e: logger.warning(f"PostService: Не удалось уведомить автора {author_id}: {e}") logger.info(f"PostService: Пост авто-отклонен, author_id={author_id}") @track_time("_process_post_background", "post_service") @track_errors("post_service", "_process_post_background") async def _process_post_background( self, message: types.Message, first_name: str, content_type: str, album: Union[list, None] = None, ) -> None: """ Обрабатывает пост в фоне: получает скоры, отправляет в группу модерации, сохраняет в БД. Args: message: Сообщение от пользователя first_name: Имя пользователя content_type: Тип контента ('text', 'photo', 'video', 'audio', 'voice', 'video_note', 'media_group') album: Список сообщений медиагруппы (только для media_group) """ try: # Определяем исходный текст для скоринга и определения анонимности original_raw_text = "" if content_type == "text": original_raw_text = message.text or "" elif content_type == "media_group": original_raw_text = ( album[0].caption or "" if album and album[0].caption else "" ) else: original_raw_text = message.caption or "" # Получаем скоры с обработкой ошибок ( deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, error_message, ) = await self._get_scores_with_error_handling(original_raw_text) # Проверяем похожие посты (до добавления текущего в submitted) similar_warning = "" if self.scoring_manager and original_raw_text and original_raw_text.strip(): try: similar_result = await self.scoring_manager.find_similar_posts( original_raw_text, threshold=0.9, hours=24 ) if similar_result and similar_result.similar_count > 0: # Формируем предупреждение с текстом похожего поста similar_text = "" if similar_result.similar_posts: first_similar = similar_result.similar_posts[0] if first_similar.text: truncated_text = first_similar.text[:150] if len(first_similar.text) > 150: truncated_text += "..." similar_text = f'\nТекст поста:\n"{html.escape(truncated_text)}"' similar_warning = ( f"\n\n⚠️ Похожий пост за последние 24ч " f"(совпадение {similar_result.max_similarity:.0%})" f"{similar_text}" ) logger.info( f"PostService: Найден похожий пост для message_id={message.message_id}, " f"similar_count={similar_result.similar_count}, " f"max_similarity={similar_result.max_similarity:.2%}" ) except Exception as e: logger.warning(f"PostService: Ошибка поиска похожих постов: {e}") # Формируем текст для поста (с сообщением об ошибке если есть) text_for_post = original_raw_text if error_message: # Для текстовых постов добавляем в конец текста if content_type == "text": text_for_post = f"{original_raw_text}\n\n⚠️ {error_message}" # Для медиа добавляем в caption elif content_type in ("photo", "video", "audio") and original_raw_text: text_for_post = f"{original_raw_text}\n\n⚠️ {error_message}" # Формируем текст/caption с учетом скоров post_text = "" if text_for_post or content_type == "text": logger.debug( f"PostService._process_post_background: Передача скоров в get_text_message - " f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), " f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), " f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), " f"content_type={content_type}, message_id={message.message_id}" ) post_text = get_text_message( text_for_post.lower() if text_for_post else "", first_name, message.from_user.username, deepseek_score=deepseek_score, rag_score=rag_score, user_id=message.from_user.id, ) # Добавляем предупреждение о похожем посте if similar_warning: post_text += similar_warning # Определяем анонимность по исходному тексту (без сообщения об ошибке) is_anonymous = determine_anonymity(original_raw_text) # Проверяем авто-модерацию logger.debug( f"PostService: Проверка авто-модерации - " f"auto_moderation={self.auto_moderation is not None}, " f"rag_score={rag_score}" ) if self.auto_moderation and rag_score is not None: auto_action = await self.auto_moderation.check_auto_action(rag_score) logger.info( f"PostService: Авто-модерация решение - " f"rag_score={rag_score:.2f}, action={auto_action}" ) if auto_action in ("publish", "decline"): await self._handle_auto_action( auto_action=auto_action, message=message, content_type=content_type, original_raw_text=original_raw_text, first_name=first_name, is_anonymous=is_anonymous, rag_score=rag_score, ml_scores_json=ml_scores_json, album=album, ) return markup = get_reply_keyboard_for_post() sent_message = None # Отправляем пост в группу модерации в зависимости от типа if content_type == "text": sent_message = await send_text_message( self.settings.group_for_posts, message, post_text, markup ) elif content_type == "photo": sent_message = await send_photo_message( self.settings.group_for_posts, message, message.photo[-1].file_id, post_text, markup, ) elif content_type == "video": sent_message = await send_video_message( self.settings.group_for_posts, message, message.video.file_id, post_text, markup, ) elif content_type == "audio": sent_message = await send_audio_message( self.settings.group_for_posts, message, message.audio.file_id, post_text, markup, ) elif content_type == "voice": sent_message = await send_voice_message( self.settings.group_for_posts, message, message.voice.file_id, markup, ) elif content_type == "video_note": sent_message = await send_video_note_message( self.settings.group_for_posts, message, message.video_note.file_id, markup, ) elif content_type == "media_group": # Добавляем предупреждение о похожем посте в caption медиагруппы if similar_warning: post_text += similar_warning # Для медиагруппы используем специальную обработку # Передаем ml_scores_json и rag_score для сохранения в БД await self._process_media_group_background( message, album, first_name, post_text, is_anonymous, original_raw_text, ml_scores_json, rag_score, ) return else: logger.error( f"PostService: Неподдерживаемый тип контента: {content_type}" ) return if not sent_message: logger.error( f"PostService: Не удалось отправить пост типа {content_type}" ) return # Сохраняем пост в БД (сохраняем исходный текст, без сообщения об ошибке) post = TelegramPost( message_id=sent_message.message_id, text=original_raw_text, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем медиа и скоры в фоне if content_type in ("photo", "video", "audio", "voice", "video_note"): asyncio.create_task( self._save_media_background(sent_message, self.db, self.s3_storage) ) if ml_scores_json: asyncio.create_task( self._save_scores_background( sent_message.message_id, ml_scores_json ) ) # Индексируем пост в RAG submitted collection (после успешной отправки) if self.scoring_manager and original_raw_text and original_raw_text.strip(): asyncio.create_task( self._add_submitted_post_background( original_raw_text, sent_message.message_id, rag_score ) ) except Exception as e: logger.error( f"PostService: Критическая ошибка в _process_post_background для {content_type}: {e}" ) async def _process_media_group_background( self, message: types.Message, album: list, first_name: str, post_caption: str, is_anonymous: bool, original_raw_text: str, ml_scores_json: str = None, rag_score: float = None, ) -> None: """Обрабатывает медиагруппу в фоне""" try: media_group = await prepare_media_group_from_middlewares( album, post_caption ) media_group_message_ids = await send_media_group_message_to_private_chat( self.settings.group_for_posts, message, media_group, self.db, None, self.s3_storage, ) main_post_id = media_group_message_ids[-1] main_post = TelegramPost( message_id=main_post_id, text=original_raw_text, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(main_post) # Сохраняем скоры в фоне (если они были получены) if ml_scores_json: asyncio.create_task( self._save_scores_background(main_post_id, ml_scores_json) ) # Индексируем пост в RAG submitted collection if self.scoring_manager and original_raw_text and original_raw_text.strip(): asyncio.create_task( self._add_submitted_post_background( original_raw_text, main_post_id, rag_score ) ) for msg_id in media_group_message_ids: await self.db.add_message_link(main_post_id, msg_id) await asyncio.sleep(0.2) markup = get_reply_keyboard_for_post() helper_message = await send_text_message( self.settings.group_for_posts, message, "^", markup ) helper_message_id = helper_message.message_id helper_post = TelegramPost( message_id=helper_message_id, text="^", author_id=message.from_user.id, helper_text_message_id=main_post_id, created_at=int(datetime.now().timestamp()), ) await self.db.add_post(helper_post) await self.db.update_helper_message( message_id=main_post_id, helper_message_id=helper_message_id ) except Exception as e: logger.error(f"PostService: Ошибка в _process_media_group_background: {e}") @track_time("handle_text_post", "post_service") @track_errors("post_service", "handle_text_post") @db_query_time("handle_text_post", "posts", "insert") async def handle_text_post(self, message: types.Message, first_name: str) -> None: """Handle text post submission""" raw_text = message.text or "" # Получаем скоры для текста ( deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, ) = await self._get_scores(raw_text) logger.debug( f"PostService.handle_text_post: Передача скоров в get_text_message - " f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), " f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), " f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), " f"message_id={message.message_id}" ) # Формируем текст с учетом скоров post_text = get_text_message( message.text.lower(), first_name, message.from_user.username, deepseek_score=deepseek_score, rag_score=rag_score, user_id=message.from_user.id, ) markup = get_reply_keyboard_for_post() sent_message = await send_text_message( self.settings.group_for_posts, message, post_text, markup ) # Определяем анонимность is_anonymous = determine_anonymity(raw_text) post = TelegramPost( message_id=sent_message.message_id, text=raw_text, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем скоры в фоне if ml_scores_json: asyncio.create_task( self._save_scores_background(sent_message.message_id, ml_scores_json) ) @track_time("handle_photo_post", "post_service") @track_errors("post_service", "handle_photo_post") @db_query_time("handle_photo_post", "posts", "insert") async def handle_photo_post(self, message: types.Message, first_name: str) -> None: """Handle photo post submission""" raw_caption = message.caption or "" # Получаем скоры для текста ( deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, ) = await self._get_scores(raw_caption) logger.debug( f"PostService.handle_photo_post: Передача скоров в get_text_message - " f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), " f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), " f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), " f"message_id={message.message_id}" ) post_caption = "" if message.caption: post_caption = get_text_message( message.caption.lower(), first_name, message.from_user.username, deepseek_score=deepseek_score, rag_score=rag_score, user_id=message.from_user.id, ) markup = get_reply_keyboard_for_post() sent_message = await send_photo_message( self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup, ) # Определяем анонимность is_anonymous = determine_anonymity(raw_caption) post = TelegramPost( message_id=sent_message.message_id, text=raw_caption, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем медиа и скоры в фоне asyncio.create_task( self._save_media_background(sent_message, self.db, self.s3_storage) ) if ml_scores_json: asyncio.create_task( self._save_scores_background(sent_message.message_id, ml_scores_json) ) @track_time("handle_video_post", "post_service") @track_errors("post_service", "handle_video_post") @db_query_time("handle_video_post", "posts", "insert") async def handle_video_post(self, message: types.Message, first_name: str) -> None: """Handle video post submission""" raw_caption = message.caption or "" # Получаем скоры для текста ( deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, ) = await self._get_scores(raw_caption) logger.debug( f"PostService.handle_video_post: Передача скоров в get_text_message - " f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), " f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), " f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), " f"message_id={message.message_id}" ) post_caption = "" if message.caption: post_caption = get_text_message( message.caption.lower(), first_name, message.from_user.username, deepseek_score=deepseek_score, rag_score=rag_score, user_id=message.from_user.id, ) markup = get_reply_keyboard_for_post() sent_message = await send_video_message( self.settings.group_for_posts, message, message.video.file_id, post_caption, markup, ) # Определяем анонимность is_anonymous = determine_anonymity(raw_caption) post = TelegramPost( message_id=sent_message.message_id, text=raw_caption, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем медиа и скоры в фоне asyncio.create_task( self._save_media_background(sent_message, self.db, self.s3_storage) ) if ml_scores_json: asyncio.create_task( self._save_scores_background(sent_message.message_id, ml_scores_json) ) @track_time("handle_video_note_post", "post_service") @track_errors("post_service", "handle_video_note_post") @db_query_time("handle_video_note_post", "posts", "insert") async def handle_video_note_post(self, message: types.Message) -> None: """Handle video note post submission""" markup = get_reply_keyboard_for_post() sent_message = await send_video_note_message( self.settings.group_for_posts, message, message.video_note.file_id, markup ) # Сохраняем пустую строку, так как video_note не имеет caption raw_caption = "" is_anonymous = determine_anonymity(raw_caption) post = TelegramPost( message_id=sent_message.message_id, text=raw_caption, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю asyncio.create_task( self._save_media_background(sent_message, self.db, self.s3_storage) ) @track_time("handle_audio_post", "post_service") @track_errors("post_service", "handle_audio_post") @db_query_time("handle_audio_post", "posts", "insert") async def handle_audio_post(self, message: types.Message, first_name: str) -> None: """Handle audio post submission""" raw_caption = message.caption or "" # Получаем скоры для текста ( deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, ) = await self._get_scores(raw_caption) logger.debug( f"PostService.handle_audio_post: Передача скоров в get_text_message - " f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), " f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), " f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), " f"message_id={message.message_id}" ) post_caption = "" if message.caption: post_caption = get_text_message( message.caption.lower(), first_name, message.from_user.username, deepseek_score=deepseek_score, rag_score=rag_score, user_id=message.from_user.id, ) markup = get_reply_keyboard_for_post() sent_message = await send_audio_message( self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup, ) # Определяем анонимность is_anonymous = determine_anonymity(raw_caption) post = TelegramPost( message_id=sent_message.message_id, text=raw_caption, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем медиа и скоры в фоне asyncio.create_task( self._save_media_background(sent_message, self.db, self.s3_storage) ) if ml_scores_json: asyncio.create_task( self._save_scores_background(sent_message.message_id, ml_scores_json) ) @track_time("handle_voice_post", "post_service") @track_errors("post_service", "handle_voice_post") @db_query_time("handle_voice_post", "posts", "insert") async def handle_voice_post(self, message: types.Message) -> None: """Handle voice post submission""" markup = get_reply_keyboard_for_post() sent_message = await send_voice_message( self.settings.group_for_posts, message, message.voice.file_id, markup ) # Сохраняем пустую строку, так как voice не имеет caption raw_caption = "" is_anonymous = determine_anonymity(raw_caption) post = TelegramPost( message_id=sent_message.message_id, text=raw_caption, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(post) # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю asyncio.create_task( self._save_media_background(sent_message, self.db, self.s3_storage) ) @track_time("handle_media_group_post", "post_service") @track_errors("post_service", "handle_media_group_post") @db_query_time("handle_media_group_post", "posts", "insert") @track_media_processing("media_group") async def handle_media_group_post( self, message: types.Message, album: list, first_name: str ) -> None: """Handle media group post submission""" post_caption = " " raw_caption = "" ml_scores_json = None if album and album[0].caption: raw_caption = album[0].caption or "" # Получаем скоры для текста ( deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, ) = await self._get_scores(raw_caption) logger.debug( f"PostService.handle_media_group_post: Передача скоров в get_text_message - " f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), " f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), " f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), " f"message_id={message.message_id}" ) post_caption = get_text_message( album[0].caption.lower(), first_name, message.from_user.username, deepseek_score=deepseek_score, rag_score=rag_score, user_id=message.from_user.id, ) is_anonymous = determine_anonymity(raw_caption) media_group = await prepare_media_group_from_middlewares(album, post_caption) media_group_message_ids = await send_media_group_message_to_private_chat( self.settings.group_for_posts, message, media_group, self.db, None, self.s3_storage, ) main_post_id = media_group_message_ids[-1] main_post = TelegramPost( message_id=main_post_id, text=raw_caption, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), is_anonymous=is_anonymous, ) await self.db.add_post(main_post) # Сохраняем скоры в фоне if ml_scores_json: asyncio.create_task( self._save_scores_background(main_post_id, ml_scores_json) ) for msg_id in media_group_message_ids: await self.db.add_message_link(main_post_id, msg_id) await asyncio.sleep(0.2) markup = get_reply_keyboard_for_post() helper_message = await send_text_message( self.settings.group_for_posts, message, "^", markup ) helper_message_id = helper_message.message_id helper_post = TelegramPost( message_id=helper_message_id, text="^", author_id=message.from_user.id, helper_text_message_id=main_post_id, created_at=int(datetime.now().timestamp()), ) await self.db.add_post(helper_post) await self.db.update_helper_message( message_id=main_post_id, helper_message_id=helper_message_id ) @track_time("process_post", "post_service") @track_errors("post_service", "process_post") @track_media_processing("media_group") async def process_post( self, message: types.Message, album: Union[list, None] = None ) -> None: """ Запускает обработку поста в фоне. Не блокирует выполнение - сразу возвращает управление. """ first_name = get_first_name(message) # Определяем тип контента content_type = ( "media_group" if message.media_group_id is not None else message.content_type ) # Запускаем фоновую обработку asyncio.create_task( self._process_post_background(message, first_name, content_type, album) ) class StickerService: """Service for sticker-related operations""" def __init__(self, settings: BotSettings) -> None: self.settings = settings @track_time("send_random_hello_sticker", "sticker_service") @track_errors("sticker_service", "send_random_hello_sticker") @track_file_operations("sticker") async def send_random_hello_sticker(self, message: types.Message) -> None: """Send random hello sticker with metrics tracking""" name_stick_hello = list(Path("Stick").rglob("Hello_*")) if not name_stick_hello: return random_stick_hello = random.choice(name_stick_hello) random_stick_hello = FSInputFile(path=random_stick_hello) await message.answer_sticker(random_stick_hello) await asyncio.sleep(0.3) @track_time("send_random_goodbye_sticker", "sticker_service") @track_errors("sticker_service", "send_random_goodbye_sticker") @track_file_operations("sticker") async def send_random_goodbye_sticker(self, message: types.Message) -> None: """Send random goodbye sticker with metrics tracking""" name_stick_bye = list(Path("Stick").rglob("Universal_*")) if not name_stick_bye: return random_stick_bye = random.choice(name_stick_bye) random_stick_bye = FSInputFile(path=random_stick_bye) await message.answer_sticker(random_stick_bye) class AutoModerationService: """ Сервис автоматической модерации постов на основе RAG score. Автоматически публикует посты с высоким скором и отклоняет с низким. """ def __init__( self, db: DatabaseProtocol, settings: BotSettings, scoring_manager=None, s3_storage=None, ) -> None: self.db = db self.settings = settings self.scoring_manager = scoring_manager self.s3_storage = s3_storage @track_time("check_auto_action", "auto_moderation_service") async def check_auto_action(self, rag_score: float) -> str: """ Проверяет, требуется ли автоматическое действие. Args: rag_score: Скор от RAG модели (0.0 - 1.0) Returns: 'publish' - автопубликация 'decline' - автоотклонение 'manual' - ручная модерация """ if rag_score is None: return "manual" settings = await self.db.get_auto_moderation_settings() auto_publish_enabled = settings.get("auto_publish_enabled", False) auto_decline_enabled = settings.get("auto_decline_enabled", False) auto_publish_threshold = settings.get("auto_publish_threshold", 0.8) auto_decline_threshold = settings.get("auto_decline_threshold", 0.4) logger.info( f"AutoModeration: Настройки из БД - " f"publish_enabled={auto_publish_enabled}, decline_enabled={auto_decline_enabled}, " f"publish_threshold={auto_publish_threshold}, decline_threshold={auto_decline_threshold}, " f"rag_score={rag_score:.2f}" ) if auto_publish_enabled and rag_score >= auto_publish_threshold: logger.info( f"AutoModeration: score {rag_score:.2f} >= {auto_publish_threshold} → auto_publish" ) return "publish" if auto_decline_enabled and rag_score <= auto_decline_threshold: logger.info( f"AutoModeration: score {rag_score:.2f} <= {auto_decline_threshold} → auto_decline" ) return "decline" return "manual" @track_time("log_auto_action", "auto_moderation_service") async def log_auto_action( self, bot, action: str, author_id: int, author_name: str, author_username: str, rag_score: float, post_text: str, ) -> None: """ Отправляет лог автоматического действия в IMPORTANT_LOGS. Args: bot: Экземпляр бота для отправки сообщений action: Тип действия ('publish' или 'decline') author_id: ID автора поста author_name: Имя автора author_username: Username автора rag_score: Скор модели post_text: Текст поста """ try: safe_name = html.escape(author_name or "Без имени") safe_username = html.escape(author_username or "нет") truncated_text = post_text[:200] if post_text else "" if len(post_text or "") > 200: truncated_text += "..." safe_text = html.escape(truncated_text) if action == "publish": emoji = "🤖" action_title = "АВТО-ПУБЛИКАЦИЯ" action_result = "✅ Пост автоматически опубликован" else: emoji = "🚫" action_title = "АВТО-ОТКЛОНЕНИЕ" action_result = "❌ Пост автоматически отклонён" message_text = ( f"{emoji} {action_title}\n\n" f"👤 Автор: {safe_name} (@{safe_username}) | ID: {author_id}\n" f"📊 RAG Score: {rag_score:.2f}\n\n" f"📝 Текст поста:\n" f'"{safe_text}"\n\n' f"{action_result}" ) await bot.send_message( chat_id=self.settings.important_logs, text=message_text, parse_mode="HTML", ) logger.info(f"AutoModeration: Лог отправлен в IMPORTANT_LOGS ({action})") except Exception as e: logger.error(f"AutoModeration: Ошибка отправки лога: {e}")