"""Service classes for private handlers"""
# Standard library imports
import asyncio
import html
import random
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Dict, Protocol, Union
# Third-party imports
from aiogram import types
from aiogram.types import FSInputFile
from database.models import TelegramPost, User
from helper_bot.keyboards import get_reply_keyboard_for_post
# Local imports - utilities
from helper_bot.utils.helper_func import (
add_in_db_media,
check_username_and_full_name,
determine_anonymity,
get_first_name,
get_publish_text,
get_text_message,
prepare_media_group_from_middlewares,
send_audio_message,
send_media_group_message_to_private_chat,
send_photo_message,
send_text_message,
send_video_message,
send_video_note_message,
send_voice_message,
)
# Local imports - metrics
from helper_bot.utils.metrics import (
db_query_time,
track_errors,
track_file_operations,
track_media_processing,
track_time,
)
from logs.custom_logger import logger
class DatabaseProtocol(Protocol):
"""Protocol for database operations"""
async def user_exists(self, user_id: int) -> bool: ...
async def add_user(self, user: User) -> None: ...
async def update_user_info(
self, user_id: int, username: str = None, full_name: str = None
) -> None: ...
async def update_user_date(self, user_id: int) -> None: ...
async def add_post(self, post: TelegramPost) -> None: ...
async def update_stickers_info(self, user_id: int) -> None: ...
async def add_message(
self, message_text: str, user_id: int, message_id: int, date: int = None
) -> None: ...
async def update_helper_message(
self, message_id: int, helper_message_id: int
) -> None: ...
@dataclass
class BotSettings:
"""Bot configuration settings"""
group_for_posts: str
group_for_message: str
main_public: str
group_for_logs: str
important_logs: str
preview_link: str
logs: str
test: str
class UserService:
"""Service for user-related operations"""
def __init__(self, db: DatabaseProtocol, settings: BotSettings) -> None:
self.db = db
self.settings = settings
@track_time("update_user_activity", "user_service")
@track_errors("user_service", "update_user_activity")
@db_query_time("update_user_activity", "users", "update")
async def update_user_activity(self, user_id: int) -> None:
"""Update user's last activity timestamp with metrics tracking"""
await self.db.update_user_date(user_id)
@track_time("ensure_user_exists", "user_service")
@track_errors("user_service", "ensure_user_exists")
@db_query_time("ensure_user_exists", "users", "insert")
async def ensure_user_exists(self, message: types.Message) -> None:
"""Ensure user exists in database, create if needed with metrics tracking"""
user_id = message.from_user.id
full_name = message.from_user.full_name
# Сохраняем только реальный username, если его нет - сохраняем None/пустую строку
username = message.from_user.username
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
# Create User object with current timestamp
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username, # Может быть None - это нормально
is_bot=is_bot,
language_code=language_code,
emoji="",
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False,
)
# Пытаемся создать пользователя (если уже существует - игнорируем)
# Это устраняет race condition и упрощает логику
await self.db.add_user(user)
# Проверяем, нужно ли обновить информацию о существующем пользователе
is_need_update = await check_username_and_full_name(
user_id, username, full_name, self.db
)
if is_need_update:
await self.db.update_user_info(user_id, username, full_name)
safe_full_name = (
html.escape(full_name) if full_name else "Неизвестный пользователь"
)
# Для отображения используем подстановочное значение, но в БД сохраняем только реальный username
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}"
)
await message.bot.send_message(
chat_id=self.settings.group_for_logs,
text=f"Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}",
)
await self.db.update_user_date(user_id)
async def log_user_message(self, message: types.Message) -> None:
"""Forward user message to logs group with metrics tracking"""
await message.forward(chat_id=self.settings.group_for_logs)
def get_safe_user_info(self, message: types.Message) -> tuple[str, str]:
"""Get safely escaped user information for logging"""
full_name = message.from_user.full_name or "Неизвестный пользователь"
username = message.from_user.username or "Без никнейма"
return html.escape(full_name), html.escape(username)
async def format_user_message_for_admins(
self, user_id: int, full_name: str, username: str, message_text: str
) -> str:
"""
Форматирует сообщение пользователя для отправки админам с обогащёнными данными.
Args:
user_id: ID пользователя
full_name: Полное имя пользователя
username: Username пользователя (может быть None)
message_text: Текст сообщения пользователя
Returns:
Отформатированное сообщение для админов
"""
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
safe_username = html.escape(username) if username else None
safe_message_text = html.escape(message_text) if message_text else ""
# Формируем строку с информацией об авторе
if safe_username:
author_info = f"{safe_full_name} (@{safe_username})"
else:
author_info = f"{safe_full_name} (Ник не указан)"
# Получаем статистику постов
approved, declined, suggest = await self.db.get_user_posts_stats(user_id)
total_posts = approved + declined + suggest
# Получаем последний пост
last_post = await self.db.get_last_post_by_author(user_id)
if last_post:
if len(last_post) > 80:
last_post_display = f'"{html.escape(last_post[:80])}..."'
else:
last_post_display = f'"{html.escape(last_post)}"'
else:
last_post_display = "Нет постов"
# Получаем дату регистрации
user_info = await self.db.get_user_by_id(user_id)
if user_info and user_info.date_added:
date_added = datetime.fromtimestamp(user_info.date_added).strftime("%d.%m.%Y")
else:
date_added = "Неизвестно"
# Получаем информацию о банах
ban_count = await self.db.get_user_ban_count(user_id)
ban_section = ""
if ban_count > 0:
last_ban = await self.db.get_last_ban_info(user_id)
if last_ban:
date_ban, reason, date_unban = last_ban
ban_date_str = datetime.fromtimestamp(date_ban).strftime("%d.%m.%Y")
reason_display = html.escape(reason) if reason else "Не указана"
if date_unban:
unban_date_str = datetime.fromtimestamp(date_unban).strftime(
"%d.%m.%Y %H:%M"
)
last_ban_info = (
f" Последний: {ban_date_str}, причина «{reason_display}», "
f"истёк {unban_date_str}"
)
else:
last_ban_info = (
f" Последний: {ban_date_str}, причина «{reason_display}», "
f"активен"
)
ban_section = f"\n\n🚫 Банов: {ban_count}\n{last_ban_info}"
# Формируем итоговое сообщение
formatted_message = (
f"👤 От: {author_info} | ID: {user_id}\n\n"
f"📊 Постов в базе: {total_posts}\n"
f"📝 Последний пост: {last_post_display}\n"
f"📅 В боте с: {date_added}"
f"{ban_section}\n\n"
f"---\n"
f"Сообщение пользователя:\n\n"
f"{safe_message_text}"
)
return formatted_message
class PostService:
"""Service for post-related operations"""
def __init__(
self,
db: DatabaseProtocol,
settings: BotSettings,
s3_storage=None,
scoring_manager=None,
auto_moderation_service: "AutoModerationService" = None,
) -> None:
self.db = db
self.settings = settings
self.s3_storage = s3_storage
self.scoring_manager = scoring_manager
self.auto_moderation = auto_moderation_service
async def _save_media_background(
self, sent_message: types.Message, bot_db: Any, s3_storage
) -> None:
"""Сохраняет медиа в фоне, чтобы не блокировать ответ пользователю"""
try:
success = await add_in_db_media(sent_message, bot_db, s3_storage)
if not success:
logger.warning(
f"_save_media_background: Не удалось сохранить медиа для поста {sent_message.message_id}"
)
except Exception as e:
logger.error(
f"_save_media_background: Ошибка при сохранении медиа для поста {sent_message.message_id}: {e}"
)
async def _get_scores(self, text: str) -> tuple:
"""
Получает скоры для текста поста.
Returns:
Tuple (deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json)
"""
if not self.scoring_manager or not text or not text.strip():
return None, None, None, None, None
try:
scores = await self.scoring_manager.score_post(text)
# Формируем JSON для сохранения в БД
import json
ml_scores_json = (
json.dumps(scores.to_json_dict()) if scores.has_any_score() else None
)
# Получаем данные от RAG
rag_confidence = scores.rag.confidence if scores.rag else None
rag_score_pos_only = (
scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None
)
return (
scores.deepseek_score,
scores.rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
)
except Exception as e:
logger.error(f"PostService: Ошибка получения скоров: {e}")
return None, None, None, None, None
async def _save_scores_background(
self, message_id: int, ml_scores_json: str
) -> None:
"""Сохраняет скоры в БД в фоне."""
if ml_scores_json:
try:
await self.db.update_ml_scores(message_id, ml_scores_json)
except Exception as e:
logger.error(
f"PostService: Ошибка сохранения скоров для {message_id}: {e}"
)
async def _add_submitted_post_background(
self, text: str, post_id: int, rag_score: float = None
) -> None:
"""Индексирует пост в RAG submitted collection в фоне."""
try:
if self.scoring_manager:
await self.scoring_manager.add_submitted_post(text, post_id, rag_score)
except Exception as e:
logger.warning(
f"PostService: Ошибка добавления поста в submitted: {e}"
)
async def _get_scores_with_error_handling(self, text: str) -> tuple:
"""
Получает скоры для текста поста с обработкой ошибок.
Returns:
Tuple (deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, error_message)
error_message будет None если все ок, или строка с описанием ошибки
"""
if not self.scoring_manager:
# Скоры выключены в .env - это нормально
return None, None, None, None, None, None
if not text or not text.strip():
return None, None, None, None, None, None
try:
scores = await self.scoring_manager.score_post(text)
# Формируем JSON для сохранения в БД
import json
ml_scores_json = (
json.dumps(scores.to_json_dict()) if scores.has_any_score() else None
)
# Получаем данные от RAG
rag_confidence = scores.rag.confidence if scores.rag else None
rag_score_pos_only = (
scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None
)
return (
scores.deepseek_score,
scores.rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
None,
)
except Exception as e:
logger.error(f"PostService: Ошибка получения скоров: {e}")
# Возвращаем частичные скоры если есть, или сообщение об ошибке
error_message = "Не удалось рассчитать скоры"
return None, None, None, None, None, error_message
@track_time("_handle_auto_action", "post_service")
@track_errors("post_service", "_handle_auto_action")
async def _handle_auto_action(
self,
auto_action: str,
message: types.Message,
content_type: str,
original_raw_text: str,
first_name: str,
is_anonymous: bool,
rag_score: float,
ml_scores_json: str = None,
album: Union[list, None] = None,
) -> None:
"""
Обрабатывает автоматическое действие (публикация или отклонение).
Args:
auto_action: 'publish' или 'decline'
message: Сообщение пользователя
content_type: Тип контента
original_raw_text: Оригинальный текст поста
first_name: Имя автора
is_anonymous: Флаг анонимности
rag_score: Скор RAG модели
ml_scores_json: JSON со скорами для БД
album: Медиагруппа (если есть)
"""
author_id = message.from_user.id
author_name = message.from_user.full_name or first_name
author_username = message.from_user.username or ""
try:
if auto_action == "publish":
await self._auto_publish(
message=message,
content_type=content_type,
original_raw_text=original_raw_text,
first_name=first_name,
is_anonymous=is_anonymous,
rag_score=rag_score,
ml_scores_json=ml_scores_json,
album=album,
)
else: # decline
await self._auto_decline(message=message, author_id=author_id)
# Логируем действие
if self.auto_moderation:
await self.auto_moderation.log_auto_action(
bot=message.bot,
action=auto_action,
author_id=author_id,
author_name=author_name,
author_username=author_username,
rag_score=rag_score,
post_text=original_raw_text,
)
except Exception as e:
logger.error(
f"PostService: Ошибка авто-{auto_action} для message_id={message.message_id}: {e}"
)
raise
@track_time("_auto_publish", "post_service")
@track_errors("post_service", "_auto_publish")
async def _auto_publish(
self,
message: types.Message,
content_type: str,
original_raw_text: str,
first_name: str,
is_anonymous: bool,
rag_score: float,
ml_scores_json: str = None,
album: Union[list, None] = None,
) -> None:
"""Автоматически публикует пост в канал."""
author_id = message.from_user.id
username = message.from_user.username
# Формируем текст для публикации (без скоров и разметки)
formatted_text = get_publish_text(
original_raw_text, first_name, username, is_anonymous
)
sent_message = None
# Публикуем в зависимости от типа контента
if content_type == "text":
sent_message = await message.bot.send_message(
chat_id=self.settings.main_public,
text=formatted_text,
)
elif content_type == "photo":
sent_message = await message.bot.send_photo(
chat_id=self.settings.main_public,
photo=message.photo[-1].file_id,
caption=formatted_text,
)
elif content_type == "video":
sent_message = await message.bot.send_video(
chat_id=self.settings.main_public,
video=message.video.file_id,
caption=formatted_text,
)
elif content_type == "audio":
sent_message = await message.bot.send_audio(
chat_id=self.settings.main_public,
audio=message.audio.file_id,
caption=formatted_text,
)
elif content_type == "voice":
sent_message = await message.bot.send_voice(
chat_id=self.settings.main_public,
voice=message.voice.file_id,
)
elif content_type == "video_note":
sent_message = await message.bot.send_video_note(
chat_id=self.settings.main_public,
video_note=message.video_note.file_id,
)
elif content_type == "media_group" and album:
# TODO: Реализовать авто-публикацию медиагрупп при необходимости
logger.warning(
"PostService: Авто-публикация медиагрупп пока не поддерживается"
)
return
if sent_message:
# Сохраняем пост в БД со статусом approved
post = TelegramPost(
message_id=sent_message.message_id,
text=original_raw_text,
author_id=author_id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
status="approved",
)
await self.db.add_post(post)
# Сохраняем скоры если есть
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(sent_message.message_id, ml_scores_json)
)
# Индексируем пост в RAG
if self.scoring_manager and original_raw_text and original_raw_text.strip():
asyncio.create_task(
self._add_submitted_post_background(
original_raw_text, sent_message.message_id, rag_score
)
)
# Уведомляем автора
try:
await message.bot.send_message(
chat_id=author_id,
text="Твой пост был выложен🥰",
)
except Exception as e:
logger.warning(f"PostService: Не удалось уведомить автора {author_id}: {e}")
logger.info(
f"PostService: Пост авто-опубликован в {self.settings.main_public}, "
f"author_id={author_id}, rag_score={rag_score:.2f}"
)
@track_time("_auto_decline", "post_service")
@track_errors("post_service", "_auto_decline")
async def _auto_decline(self, message: types.Message, author_id: int) -> None:
"""Автоматически отклоняет пост."""
# Обучаем RAG на отклоненном посте
if self.scoring_manager:
original_text = message.text or message.caption or ""
if original_text and original_text.strip():
try:
await self.scoring_manager.on_post_declined(original_text)
except Exception as e:
logger.warning(f"PostService: Ошибка обучения RAG на отклоненном посте: {e}")
# Уведомляем автора
try:
await message.bot.send_message(
chat_id=author_id,
text="Твой пост был отклонен😔",
)
except Exception as e:
logger.warning(f"PostService: Не удалось уведомить автора {author_id}: {e}")
logger.info(f"PostService: Пост авто-отклонен, author_id={author_id}")
@track_time("_process_post_background", "post_service")
@track_errors("post_service", "_process_post_background")
async def _process_post_background(
self,
message: types.Message,
first_name: str,
content_type: str,
album: Union[list, None] = None,
) -> None:
"""
Обрабатывает пост в фоне: получает скоры, отправляет в группу модерации, сохраняет в БД.
Args:
message: Сообщение от пользователя
first_name: Имя пользователя
content_type: Тип контента ('text', 'photo', 'video', 'audio', 'voice', 'video_note', 'media_group')
album: Список сообщений медиагруппы (только для media_group)
"""
try:
# Определяем исходный текст для скоринга и определения анонимности
original_raw_text = ""
if content_type == "text":
original_raw_text = message.text or ""
elif content_type == "media_group":
original_raw_text = (
album[0].caption or "" if album and album[0].caption else ""
)
else:
original_raw_text = message.caption or ""
# Получаем скоры с обработкой ошибок
(
deepseek_score,
rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
error_message,
) = await self._get_scores_with_error_handling(original_raw_text)
# Проверяем похожие посты (до добавления текущего в submitted)
similar_warning = ""
if self.scoring_manager and original_raw_text and original_raw_text.strip():
try:
similar_result = await self.scoring_manager.find_similar_posts(
original_raw_text, threshold=0.9, hours=24
)
if similar_result and similar_result.similar_count > 0:
# Формируем предупреждение с текстом похожего поста
similar_text = ""
if similar_result.similar_posts:
first_similar = similar_result.similar_posts[0]
if first_similar.text:
truncated_text = first_similar.text[:150]
if len(first_similar.text) > 150:
truncated_text += "..."
similar_text = f'\nТекст поста:\n"{html.escape(truncated_text)}"'
similar_warning = (
f"\n\n⚠️ Похожий пост за последние 24ч "
f"(совпадение {similar_result.max_similarity:.0%})"
f"{similar_text}"
)
logger.info(
f"PostService: Найден похожий пост для message_id={message.message_id}, "
f"similar_count={similar_result.similar_count}, "
f"max_similarity={similar_result.max_similarity:.2%}"
)
except Exception as e:
logger.warning(f"PostService: Ошибка поиска похожих постов: {e}")
# Формируем текст для поста (с сообщением об ошибке если есть)
text_for_post = original_raw_text
if error_message:
# Для текстовых постов добавляем в конец текста
if content_type == "text":
text_for_post = f"{original_raw_text}\n\n⚠️ {error_message}"
# Для медиа добавляем в caption
elif content_type in ("photo", "video", "audio") and original_raw_text:
text_for_post = f"{original_raw_text}\n\n⚠️ {error_message}"
# Формируем текст/caption с учетом скоров
post_text = ""
if text_for_post or content_type == "text":
logger.debug(
f"PostService._process_post_background: Передача скоров в get_text_message - "
f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), "
f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), "
f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), "
f"content_type={content_type}, message_id={message.message_id}"
)
post_text = get_text_message(
text_for_post.lower() if text_for_post else "",
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
user_id=message.from_user.id,
)
# Добавляем предупреждение о похожем посте
if similar_warning:
post_text += similar_warning
# Определяем анонимность по исходному тексту (без сообщения об ошибке)
is_anonymous = determine_anonymity(original_raw_text)
# Проверяем авто-модерацию
logger.debug(
f"PostService: Проверка авто-модерации - "
f"auto_moderation={self.auto_moderation is not None}, "
f"rag_score={rag_score}"
)
if self.auto_moderation and rag_score is not None:
auto_action = await self.auto_moderation.check_auto_action(rag_score)
logger.info(
f"PostService: Авто-модерация решение - "
f"rag_score={rag_score:.2f}, action={auto_action}"
)
if auto_action in ("publish", "decline"):
await self._handle_auto_action(
auto_action=auto_action,
message=message,
content_type=content_type,
original_raw_text=original_raw_text,
first_name=first_name,
is_anonymous=is_anonymous,
rag_score=rag_score,
ml_scores_json=ml_scores_json,
album=album,
)
return
markup = get_reply_keyboard_for_post()
sent_message = None
# Отправляем пост в группу модерации в зависимости от типа
if content_type == "text":
sent_message = await send_text_message(
self.settings.group_for_posts, message, post_text, markup
)
elif content_type == "photo":
sent_message = await send_photo_message(
self.settings.group_for_posts,
message,
message.photo[-1].file_id,
post_text,
markup,
)
elif content_type == "video":
sent_message = await send_video_message(
self.settings.group_for_posts,
message,
message.video.file_id,
post_text,
markup,
)
elif content_type == "audio":
sent_message = await send_audio_message(
self.settings.group_for_posts,
message,
message.audio.file_id,
post_text,
markup,
)
elif content_type == "voice":
sent_message = await send_voice_message(
self.settings.group_for_posts,
message,
message.voice.file_id,
markup,
)
elif content_type == "video_note":
sent_message = await send_video_note_message(
self.settings.group_for_posts,
message,
message.video_note.file_id,
markup,
)
elif content_type == "media_group":
# Добавляем предупреждение о похожем посте в caption медиагруппы
if similar_warning:
post_text += similar_warning
# Для медиагруппы используем специальную обработку
# Передаем ml_scores_json и rag_score для сохранения в БД
await self._process_media_group_background(
message,
album,
first_name,
post_text,
is_anonymous,
original_raw_text,
ml_scores_json,
rag_score,
)
return
else:
logger.error(
f"PostService: Неподдерживаемый тип контента: {content_type}"
)
return
if not sent_message:
logger.error(
f"PostService: Не удалось отправить пост типа {content_type}"
)
return
# Сохраняем пост в БД (сохраняем исходный текст, без сообщения об ошибке)
post = TelegramPost(
message_id=sent_message.message_id,
text=original_raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем медиа и скоры в фоне
if content_type in ("photo", "video", "audio", "voice", "video_note"):
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(
sent_message.message_id, ml_scores_json
)
)
# Индексируем пост в RAG submitted collection (после успешной отправки)
if self.scoring_manager and original_raw_text and original_raw_text.strip():
asyncio.create_task(
self._add_submitted_post_background(
original_raw_text, sent_message.message_id, rag_score
)
)
except Exception as e:
logger.error(
f"PostService: Критическая ошибка в _process_post_background для {content_type}: {e}"
)
async def _process_media_group_background(
self,
message: types.Message,
album: list,
first_name: str,
post_caption: str,
is_anonymous: bool,
original_raw_text: str,
ml_scores_json: str = None,
rag_score: float = None,
) -> None:
"""Обрабатывает медиагруппу в фоне"""
try:
media_group = await prepare_media_group_from_middlewares(
album, post_caption
)
media_group_message_ids = await send_media_group_message_to_private_chat(
self.settings.group_for_posts,
message,
media_group,
self.db,
None,
self.s3_storage,
)
main_post_id = media_group_message_ids[-1]
main_post = TelegramPost(
message_id=main_post_id,
text=original_raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(main_post)
# Сохраняем скоры в фоне (если они были получены)
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(main_post_id, ml_scores_json)
)
# Индексируем пост в RAG submitted collection
if self.scoring_manager and original_raw_text and original_raw_text.strip():
asyncio.create_task(
self._add_submitted_post_background(
original_raw_text, main_post_id, rag_score
)
)
for msg_id in media_group_message_ids:
await self.db.add_message_link(main_post_id, msg_id)
await asyncio.sleep(0.2)
markup = get_reply_keyboard_for_post()
helper_message = await send_text_message(
self.settings.group_for_posts, message, "^", markup
)
helper_message_id = helper_message.message_id
helper_post = TelegramPost(
message_id=helper_message_id,
text="^",
author_id=message.from_user.id,
helper_text_message_id=main_post_id,
created_at=int(datetime.now().timestamp()),
)
await self.db.add_post(helper_post)
await self.db.update_helper_message(
message_id=main_post_id, helper_message_id=helper_message_id
)
except Exception as e:
logger.error(f"PostService: Ошибка в _process_media_group_background: {e}")
@track_time("handle_text_post", "post_service")
@track_errors("post_service", "handle_text_post")
@db_query_time("handle_text_post", "posts", "insert")
async def handle_text_post(self, message: types.Message, first_name: str) -> None:
"""Handle text post submission"""
raw_text = message.text or ""
# Получаем скоры для текста
(
deepseek_score,
rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
) = await self._get_scores(raw_text)
logger.debug(
f"PostService.handle_text_post: Передача скоров в get_text_message - "
f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), "
f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), "
f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), "
f"message_id={message.message_id}"
)
# Формируем текст с учетом скоров
post_text = get_text_message(
message.text.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
user_id=message.from_user.id,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_text_message(
self.settings.group_for_posts, message, post_text, markup
)
# Определяем анонимность
is_anonymous = determine_anonymity(raw_text)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем скоры в фоне
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(sent_message.message_id, ml_scores_json)
)
@track_time("handle_photo_post", "post_service")
@track_errors("post_service", "handle_photo_post")
@db_query_time("handle_photo_post", "posts", "insert")
async def handle_photo_post(self, message: types.Message, first_name: str) -> None:
"""Handle photo post submission"""
raw_caption = message.caption or ""
# Получаем скоры для текста
(
deepseek_score,
rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
) = await self._get_scores(raw_caption)
logger.debug(
f"PostService.handle_photo_post: Передача скоров в get_text_message - "
f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), "
f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), "
f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), "
f"message_id={message.message_id}"
)
post_caption = ""
if message.caption:
post_caption = get_text_message(
message.caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
user_id=message.from_user.id,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_photo_message(
self.settings.group_for_posts,
message,
message.photo[-1].file_id,
post_caption,
markup,
)
# Определяем анонимность
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем медиа и скоры в фоне
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(sent_message.message_id, ml_scores_json)
)
@track_time("handle_video_post", "post_service")
@track_errors("post_service", "handle_video_post")
@db_query_time("handle_video_post", "posts", "insert")
async def handle_video_post(self, message: types.Message, first_name: str) -> None:
"""Handle video post submission"""
raw_caption = message.caption or ""
# Получаем скоры для текста
(
deepseek_score,
rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
) = await self._get_scores(raw_caption)
logger.debug(
f"PostService.handle_video_post: Передача скоров в get_text_message - "
f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), "
f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), "
f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), "
f"message_id={message.message_id}"
)
post_caption = ""
if message.caption:
post_caption = get_text_message(
message.caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
user_id=message.from_user.id,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_video_message(
self.settings.group_for_posts,
message,
message.video.file_id,
post_caption,
markup,
)
# Определяем анонимность
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем медиа и скоры в фоне
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(sent_message.message_id, ml_scores_json)
)
@track_time("handle_video_note_post", "post_service")
@track_errors("post_service", "handle_video_note_post")
@db_query_time("handle_video_note_post", "posts", "insert")
async def handle_video_note_post(self, message: types.Message) -> None:
"""Handle video note post submission"""
markup = get_reply_keyboard_for_post()
sent_message = await send_video_note_message(
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
# Сохраняем пустую строку, так как video_note не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
@track_time("handle_audio_post", "post_service")
@track_errors("post_service", "handle_audio_post")
@db_query_time("handle_audio_post", "posts", "insert")
async def handle_audio_post(self, message: types.Message, first_name: str) -> None:
"""Handle audio post submission"""
raw_caption = message.caption or ""
# Получаем скоры для текста
(
deepseek_score,
rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
) = await self._get_scores(raw_caption)
logger.debug(
f"PostService.handle_audio_post: Передача скоров в get_text_message - "
f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), "
f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), "
f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), "
f"message_id={message.message_id}"
)
post_caption = ""
if message.caption:
post_caption = get_text_message(
message.caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
user_id=message.from_user.id,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_audio_message(
self.settings.group_for_posts,
message,
message.audio.file_id,
post_caption,
markup,
)
# Определяем анонимность
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем медиа и скоры в фоне
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(sent_message.message_id, ml_scores_json)
)
@track_time("handle_voice_post", "post_service")
@track_errors("post_service", "handle_voice_post")
@db_query_time("handle_voice_post", "posts", "insert")
async def handle_voice_post(self, message: types.Message) -> None:
"""Handle voice post submission"""
markup = get_reply_keyboard_for_post()
sent_message = await send_voice_message(
self.settings.group_for_posts, message, message.voice.file_id, markup
)
# Сохраняем пустую строку, так как voice не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
@track_time("handle_media_group_post", "post_service")
@track_errors("post_service", "handle_media_group_post")
@db_query_time("handle_media_group_post", "posts", "insert")
@track_media_processing("media_group")
async def handle_media_group_post(
self, message: types.Message, album: list, first_name: str
) -> None:
"""Handle media group post submission"""
post_caption = " "
raw_caption = ""
ml_scores_json = None
if album and album[0].caption:
raw_caption = album[0].caption or ""
# Получаем скоры для текста
(
deepseek_score,
rag_score,
rag_confidence,
rag_score_pos_only,
ml_scores_json,
) = await self._get_scores(raw_caption)
logger.debug(
f"PostService.handle_media_group_post: Передача скоров в get_text_message - "
f"rag_score={rag_score} (type: {type(rag_score).__name__ if rag_score is not None else 'None'}), "
f"rag_score_pos_only={rag_score_pos_only} (type: {type(rag_score_pos_only).__name__ if rag_score_pos_only is not None else 'None'}), "
f"rag_confidence={rag_confidence} (type: {type(rag_confidence).__name__ if rag_confidence is not None else 'None'}), "
f"message_id={message.message_id}"
)
post_caption = get_text_message(
album[0].caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
user_id=message.from_user.id,
)
is_anonymous = determine_anonymity(raw_caption)
media_group = await prepare_media_group_from_middlewares(album, post_caption)
media_group_message_ids = await send_media_group_message_to_private_chat(
self.settings.group_for_posts,
message,
media_group,
self.db,
None,
self.s3_storage,
)
main_post_id = media_group_message_ids[-1]
main_post = TelegramPost(
message_id=main_post_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
)
await self.db.add_post(main_post)
# Сохраняем скоры в фоне
if ml_scores_json:
asyncio.create_task(
self._save_scores_background(main_post_id, ml_scores_json)
)
for msg_id in media_group_message_ids:
await self.db.add_message_link(main_post_id, msg_id)
await asyncio.sleep(0.2)
markup = get_reply_keyboard_for_post()
helper_message = await send_text_message(
self.settings.group_for_posts, message, "^", markup
)
helper_message_id = helper_message.message_id
helper_post = TelegramPost(
message_id=helper_message_id,
text="^",
author_id=message.from_user.id,
helper_text_message_id=main_post_id,
created_at=int(datetime.now().timestamp()),
)
await self.db.add_post(helper_post)
await self.db.update_helper_message(
message_id=main_post_id, helper_message_id=helper_message_id
)
@track_time("process_post", "post_service")
@track_errors("post_service", "process_post")
@track_media_processing("media_group")
async def process_post(
self, message: types.Message, album: Union[list, None] = None
) -> None:
"""
Запускает обработку поста в фоне.
Не блокирует выполнение - сразу возвращает управление.
"""
first_name = get_first_name(message)
# Определяем тип контента
content_type = (
"media_group"
if message.media_group_id is not None
else message.content_type
)
# Запускаем фоновую обработку
asyncio.create_task(
self._process_post_background(message, first_name, content_type, album)
)
class StickerService:
"""Service for sticker-related operations"""
def __init__(self, settings: BotSettings) -> None:
self.settings = settings
@track_time("send_random_hello_sticker", "sticker_service")
@track_errors("sticker_service", "send_random_hello_sticker")
@track_file_operations("sticker")
async def send_random_hello_sticker(self, message: types.Message) -> None:
"""Send random hello sticker with metrics tracking"""
name_stick_hello = list(Path("Stick").rglob("Hello_*"))
if not name_stick_hello:
return
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
await message.answer_sticker(random_stick_hello)
await asyncio.sleep(0.3)
@track_time("send_random_goodbye_sticker", "sticker_service")
@track_errors("sticker_service", "send_random_goodbye_sticker")
@track_file_operations("sticker")
async def send_random_goodbye_sticker(self, message: types.Message) -> None:
"""Send random goodbye sticker with metrics tracking"""
name_stick_bye = list(Path("Stick").rglob("Universal_*"))
if not name_stick_bye:
return
random_stick_bye = random.choice(name_stick_bye)
random_stick_bye = FSInputFile(path=random_stick_bye)
await message.answer_sticker(random_stick_bye)
class AutoModerationService:
"""
Сервис автоматической модерации постов на основе RAG score.
Автоматически публикует посты с высоким скором и отклоняет с низким.
"""
def __init__(
self,
db: DatabaseProtocol,
settings: BotSettings,
scoring_manager=None,
s3_storage=None,
) -> None:
self.db = db
self.settings = settings
self.scoring_manager = scoring_manager
self.s3_storage = s3_storage
@track_time("check_auto_action", "auto_moderation_service")
async def check_auto_action(self, rag_score: float) -> str:
"""
Проверяет, требуется ли автоматическое действие.
Args:
rag_score: Скор от RAG модели (0.0 - 1.0)
Returns:
'publish' - автопубликация
'decline' - автоотклонение
'manual' - ручная модерация
"""
if rag_score is None:
return "manual"
settings = await self.db.get_auto_moderation_settings()
auto_publish_enabled = settings.get("auto_publish_enabled", False)
auto_decline_enabled = settings.get("auto_decline_enabled", False)
auto_publish_threshold = settings.get("auto_publish_threshold", 0.8)
auto_decline_threshold = settings.get("auto_decline_threshold", 0.4)
logger.info(
f"AutoModeration: Настройки из БД - "
f"publish_enabled={auto_publish_enabled}, decline_enabled={auto_decline_enabled}, "
f"publish_threshold={auto_publish_threshold}, decline_threshold={auto_decline_threshold}, "
f"rag_score={rag_score:.2f}"
)
if auto_publish_enabled and rag_score >= auto_publish_threshold:
logger.info(
f"AutoModeration: score {rag_score:.2f} >= {auto_publish_threshold} → auto_publish"
)
return "publish"
if auto_decline_enabled and rag_score <= auto_decline_threshold:
logger.info(
f"AutoModeration: score {rag_score:.2f} <= {auto_decline_threshold} → auto_decline"
)
return "decline"
return "manual"
@track_time("log_auto_action", "auto_moderation_service")
async def log_auto_action(
self,
bot,
action: str,
author_id: int,
author_name: str,
author_username: str,
rag_score: float,
post_text: str,
) -> None:
"""
Отправляет лог автоматического действия в IMPORTANT_LOGS.
Args:
bot: Экземпляр бота для отправки сообщений
action: Тип действия ('publish' или 'decline')
author_id: ID автора поста
author_name: Имя автора
author_username: Username автора
rag_score: Скор модели
post_text: Текст поста
"""
try:
safe_name = html.escape(author_name or "Без имени")
safe_username = html.escape(author_username or "нет")
truncated_text = post_text[:200] if post_text else ""
if len(post_text or "") > 200:
truncated_text += "..."
safe_text = html.escape(truncated_text)
if action == "publish":
emoji = "🤖"
action_title = "АВТО-ПУБЛИКАЦИЯ"
action_result = "✅ Пост автоматически опубликован"
else:
emoji = "🚫"
action_title = "АВТО-ОТКЛОНЕНИЕ"
action_result = "❌ Пост автоматически отклонён"
message_text = (
f"{emoji} {action_title}\n\n"
f"👤 Автор: {safe_name} (@{safe_username}) | ID: {author_id}\n"
f"📊 RAG Score: {rag_score:.2f}\n\n"
f"📝 Текст поста:\n"
f'"{safe_text}"\n\n'
f"{action_result}"
)
await bot.send_message(
chat_id=self.settings.important_logs,
text=message_text,
parse_mode="HTML",
)
logger.info(
f"AutoModeration: Лог отправлен в IMPORTANT_LOGS ({action})"
)
except Exception as e:
logger.error(f"AutoModeration: Ошибка отправки лога: {e}")