Merge remote-tracking branch 'origin/master' into dev-13

This commit is contained in:
2026-02-02 00:12:09 +03:00
105 changed files with 8845 additions and 8665 deletions

View File

@@ -4,25 +4,28 @@
# Local imports - constants and utilities
from .constants import BUTTON_TEXTS, ERROR_MESSAGES, FSM_STATES
from .decorators import error_handler
from .private_handlers import PrivateHandlers, create_private_handlers, private_router
from .private_handlers import (PrivateHandlers, create_private_handlers,
private_router)
# Local imports - services
from .services import BotSettings, PostService, StickerService, UserService
__all__ = [
# Main components
"private_router",
"create_private_handlers",
"PrivateHandlers",
'private_router',
'create_private_handlers',
'PrivateHandlers',
# Services
"BotSettings",
"UserService",
"PostService",
"StickerService",
'BotSettings',
'UserService',
'PostService',
'StickerService',
# Constants
"FSM_STATES",
"BUTTON_TEXTS",
"ERROR_MESSAGES",
'FSM_STATES',
'BUTTON_TEXTS',
'ERROR_MESSAGES',
# Utilities
"error_handler",
'error_handler'
]

View File

@@ -6,14 +6,12 @@ from typing import Any, Callable
# Third-party imports
from aiogram import types
# Local imports
from logs.custom_logger import logger
def error_handler(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator for centralized error handling"""
async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return await func(*args, **kwargs)
@@ -21,23 +19,18 @@ def error_handler(func: Callable[..., Any]) -> Callable[..., Any]:
logger.error(f"Error in {func.__name__}: {str(e)}")
# Try to send error to logs if possible
try:
message = next(
(arg for arg in args if isinstance(arg, types.Message)), None
)
if message and hasattr(message, "bot"):
from helper_bot.utils.base_dependency_factory import (
get_global_instance,
)
message = next((arg for arg in args if isinstance(arg, types.Message)), None)
if message and hasattr(message, 'bot'):
from helper_bot.utils.base_dependency_factory import \
get_global_instance
bdf = get_global_instance()
important_logs = bdf.settings["Telegram"]["important_logs"]
important_logs = bdf.settings['Telegram']['important_logs']
await message.bot.send_message(
chat_id=important_logs,
text=f"Произошла ошибка в {func.__name__}: {str(e)}\n\nTraceback:\n{traceback.format_exc()}",
text=f"Произошла ошибка в {func.__name__}: {str(e)}\n\nTraceback:\n{traceback.format_exc()}"
)
except Exception:
# If we can't log the error, at least it was logged to logger
pass
raise
return wrapper

View File

@@ -8,23 +8,18 @@ from datetime import datetime
from aiogram import F, Router, types
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
# Local imports - filters and middlewares
from database.async_db import AsyncBotDB
from helper_bot.filters.main import ChatTypeFilter
# Local imports - utilities
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
from helper_bot.keyboards import (get_reply_keyboard,
get_reply_keyboard_for_post)
from helper_bot.keyboards.keyboards import get_reply_keyboard_leave_chat
from helper_bot.middlewares.album_middleware import AlbumMiddleware
from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
from helper_bot.utils import messages
from helper_bot.utils.helper_func import (
check_user_emoji,
get_first_name,
update_user_info,
)
from helper_bot.utils.helper_func import (check_user_emoji, get_first_name,
update_user_info)
# Local imports - metrics
from helper_bot.utils.metrics import db_query_time, track_errors, track_time
@@ -39,138 +34,83 @@ sleep = asyncio.sleep
class PrivateHandlers:
"""Main handler class for private messages"""
def __init__(self, db: AsyncBotDB, settings: BotSettings, s3_storage=None):
def __init__(self, db: AsyncBotDB, settings: BotSettings, s3_storage=None, scoring_manager=None):
self.db = db
self.settings = settings
self.user_service = UserService(db, settings)
self.post_service = PostService(db, settings, s3_storage)
self.post_service = PostService(db, settings, s3_storage, scoring_manager)
self.sticker_service = StickerService(settings)
self.router = Router()
self.router.message.middleware(AlbumMiddleware(latency=5.0))
self.router.message.middleware(BlacklistMiddleware())
# Register handlers
self._register_handlers()
def _register_handlers(self):
"""Register all message handlers"""
# Command handlers
self.router.message.register(
self.handle_emoji_message,
ChatTypeFilter(chat_type=["private"]),
Command("emoji"),
)
self.router.message.register(
self.handle_restart_message,
ChatTypeFilter(chat_type=["private"]),
Command("restart"),
)
self.router.message.register(
self.handle_start_message,
ChatTypeFilter(chat_type=["private"]),
Command("start"),
)
self.router.message.register(
self.handle_start_message,
ChatTypeFilter(chat_type=["private"]),
F.text == BUTTON_TEXTS["RETURN_TO_BOT"],
)
self.router.message.register(self.handle_emoji_message, ChatTypeFilter(chat_type=["private"]), Command("emoji"))
self.router.message.register(self.handle_restart_message, ChatTypeFilter(chat_type=["private"]), Command("restart"))
self.router.message.register(self.handle_start_message, ChatTypeFilter(chat_type=["private"]), Command("start"))
self.router.message.register(self.handle_start_message, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["RETURN_TO_BOT"])
# Button handlers
self.router.message.register(
self.suggest_post,
StateFilter(FSM_STATES["START"]),
ChatTypeFilter(chat_type=["private"]),
F.text == BUTTON_TEXTS["SUGGEST_POST"],
)
self.router.message.register(
self.end_message,
ChatTypeFilter(chat_type=["private"]),
F.text == BUTTON_TEXTS["SAY_GOODBYE"],
)
self.router.message.register(
self.end_message,
ChatTypeFilter(chat_type=["private"]),
F.text == BUTTON_TEXTS["LEAVE_CHAT"],
)
self.router.message.register(
self.stickers,
ChatTypeFilter(chat_type=["private"]),
F.text == BUTTON_TEXTS["WANT_STICKERS"],
)
self.router.message.register(
self.connect_with_admin,
StateFilter(FSM_STATES["START"]),
ChatTypeFilter(chat_type=["private"]),
F.text == BUTTON_TEXTS["CONNECT_ADMIN"],
)
self.router.message.register(self.suggest_post, StateFilter(FSM_STATES["START"]), ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["SUGGEST_POST"])
self.router.message.register(self.end_message, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["SAY_GOODBYE"])
self.router.message.register(self.end_message, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["LEAVE_CHAT"])
self.router.message.register(self.stickers, ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["WANT_STICKERS"])
self.router.message.register(self.connect_with_admin, StateFilter(FSM_STATES["START"]), ChatTypeFilter(chat_type=["private"]), F.text == BUTTON_TEXTS["CONNECT_ADMIN"])
# State handlers
self.router.message.register(
self.suggest_router,
StateFilter(FSM_STATES["SUGGEST"]),
ChatTypeFilter(chat_type=["private"]),
)
self.router.message.register(
self.resend_message_in_group_for_message,
StateFilter(FSM_STATES["PRE_CHAT"]),
ChatTypeFilter(chat_type=["private"]),
)
self.router.message.register(
self.resend_message_in_group_for_message,
StateFilter(FSM_STATES["CHAT"]),
ChatTypeFilter(chat_type=["private"]),
)
self.router.message.register(self.suggest_router, StateFilter(FSM_STATES["SUGGEST"]), ChatTypeFilter(chat_type=["private"]))
self.router.message.register(self.resend_message_in_group_for_message, StateFilter(FSM_STATES["PRE_CHAT"]), ChatTypeFilter(chat_type=["private"]))
self.router.message.register(self.resend_message_in_group_for_message, StateFilter(FSM_STATES["CHAT"]), ChatTypeFilter(chat_type=["private"]))
@error_handler
@track_errors("private_handlers", "handle_emoji_message")
@track_time("handle_emoji_message", "private_handlers")
async def handle_emoji_message(
self, message: types.Message, state: FSMContext, **kwargs
):
async def handle_emoji_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle emoji command"""
await self.user_service.log_user_message(message)
user_emoji = await check_user_emoji(message)
await state.set_state(FSM_STATES["START"])
if user_emoji is not None:
await message.answer(f"Твоя эмодзя - {user_emoji}", parse_mode="HTML")
await message.answer(f'Твоя эмодзя - {user_emoji}', parse_mode='HTML')
@error_handler
@track_errors("private_handlers", "handle_restart_message")
@track_time("handle_restart_message", "private_handlers")
async def handle_restart_message(
self, message: types.Message, state: FSMContext, **kwargs
):
async def handle_restart_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle restart command"""
markup = await get_reply_keyboard(self.db, message.from_user.id)
await self.user_service.log_user_message(message)
await state.set_state(FSM_STATES["START"])
await update_user_info("love", message)
await update_user_info('love', message)
await check_user_emoji(message)
await message.answer("Я перезапущен!", reply_markup=markup, parse_mode="HTML")
await message.answer('Я перезапущен!', reply_markup=markup, parse_mode='HTML')
@error_handler
@track_errors("private_handlers", "handle_start_message")
@track_time("handle_start_message", "private_handlers")
async def handle_start_message(
self, message: types.Message, state: FSMContext, **kwargs
):
async def handle_start_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle start command and return to bot button with metrics tracking"""
# User service operations with metrics
await self.user_service.log_user_message(message)
await self.user_service.ensure_user_exists(message)
await state.set_state(FSM_STATES["START"])
# Send sticker with metrics
await self.sticker_service.send_random_hello_sticker(message)
# Send welcome message with metrics
markup = await get_reply_keyboard(self.db, message.from_user.id)
hello_message = messages.get_message(get_first_name(message), "HELLO_MESSAGE")
await message.answer(hello_message, reply_markup=markup, parse_mode="HTML")
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
await message.answer(hello_message, reply_markup=markup, parse_mode='HTML')
@error_handler
@track_errors("private_handlers", "suggest_post")
@track_time("suggest_post", "private_handlers")
@@ -180,11 +120,11 @@ class PrivateHandlers:
await self.user_service.update_user_activity(message.from_user.id)
await self.user_service.log_user_message(message)
await state.set_state(FSM_STATES["SUGGEST"])
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(get_first_name(message), "SUGGEST_NEWS")
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
await message.answer(suggest_news, reply_markup=markup)
@error_handler
@track_errors("private_handlers", "end_message")
@track_time("end_message", "private_handlers")
@@ -193,65 +133,54 @@ class PrivateHandlers:
# User service operations with metrics
await self.user_service.update_user_activity(message.from_user.id)
await self.user_service.log_user_message(message)
# Send sticker
await self.sticker_service.send_random_goodbye_sticker(message)
# Send goodbye message
markup = types.ReplyKeyboardRemove()
bye_message = messages.get_message(get_first_name(message), "BYE_MESSAGE")
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
await message.answer(bye_message, reply_markup=markup)
await state.set_state(FSM_STATES["START"])
@error_handler
@track_errors("private_handlers", "suggest_router")
@track_time("suggest_router", "private_handlers")
async def suggest_router(
self, message: types.Message, state: FSMContext, album: list = None, **kwargs
):
"""Handle post submission in suggest state"""
async def suggest_router(self, message: types.Message, state: FSMContext, album: list = None, **kwargs):
"""Handle post submission in suggest state - сразу отвечает пользователю, обработка в фоне"""
# Сразу отвечаем пользователю
markup_for_user = await get_reply_keyboard(self.db, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state(FSM_STATES["START"])
# Проверяем, есть ли механизм для получения полной медиагруппы (для медиагрупп)
album_getter = kwargs.get("album_getter")
if album_getter and message.media_group_id:
# Это медиагруппа - сразу отвечаем пользователю, обработку делаем в фоне
markup_for_user = await get_reply_keyboard(self.db, message.from_user.id)
success_send_message = messages.get_message(
get_first_name(message), "SUCCESS_SEND_MESSAGE"
)
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state(FSM_STATES["START"])
# В фоне ждем полную медиагруппу и обрабатываем пост
async def process_media_group_background():
try:
# Ждем полную медиагруппу
# В фоне обрабатываем пост
async def process_post_background():
try:
# Обновляем активность пользователя
await self.user_service.update_user_activity(message.from_user.id)
# Логируем сообщение (только для одиночных сообщений, не медиагрупп)
if message.media_group_id is None:
await self.user_service.log_user_message(message)
# Для медиагрупп ждем полную медиагруппу
if album_getter and message.media_group_id:
full_album = await album_getter.get_album(timeout=10.0)
if not full_album:
return
# Обрабатываем пост с полной медиагруппой
await self.user_service.update_user_activity(message.from_user.id)
await self.post_service.process_post(message, full_album)
except Exception as e:
from logs.custom_logger import logger
logger.error(f"Ошибка при фоновой обработке медиагруппы: {e}")
asyncio.create_task(process_media_group_background())
else:
# Обычное сообщение или медиагруппа уже собрана - обрабатываем синхронно
await self.user_service.update_user_activity(message.from_user.id)
if message.media_group_id is None:
await self.user_service.log_user_message(message)
await self.post_service.process_post(message, album)
markup_for_user = await get_reply_keyboard(self.db, message.from_user.id)
success_send_message = messages.get_message(
get_first_name(message), "SUCCESS_SEND_MESSAGE"
)
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state(FSM_STATES["START"])
if full_album:
await self.post_service.process_post(message, full_album)
else:
# Обычное сообщение или медиагруппа уже собрана
await self.post_service.process_post(message, album)
except Exception as e:
from logs.custom_logger import logger
logger.error(f"Ошибка при фоновой обработке поста: {e}")
asyncio.create_task(process_post_background())
@error_handler
@track_errors("private_handlers", "stickers")
@track_time("stickers", "private_handlers")
@@ -262,46 +191,41 @@ class PrivateHandlers:
markup = await get_reply_keyboard(self.db, message.from_user.id)
await self.db.update_stickers_info(message.from_user.id)
await self.user_service.log_user_message(message)
await message.answer(text=ERROR_MESSAGES["STICKERS_LINK"], reply_markup=markup)
await message.answer(
text=ERROR_MESSAGES["STICKERS_LINK"],
reply_markup=markup
)
await state.set_state(FSM_STATES["START"])
@error_handler
@track_errors("private_handlers", "connect_with_admin")
@track_time("connect_with_admin", "private_handlers")
async def connect_with_admin(
self, message: types.Message, state: FSMContext, **kwargs
):
async def connect_with_admin(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle connect with admin button"""
# User service operations with metrics
await self.user_service.update_user_activity(message.from_user.id)
admin_message = messages.get_message(
get_first_name(message), "CONNECT_WITH_ADMIN"
)
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
await message.answer(admin_message, parse_mode="html")
await self.user_service.log_user_message(message)
await state.set_state(FSM_STATES["PRE_CHAT"])
@error_handler
@track_errors("private_handlers", "resend_message_in_group_for_message")
@track_time("resend_message_in_group_for_message", "private_handlers")
@db_query_time("resend_message_in_group_for_message", "messages", "insert")
async def resend_message_in_group_for_message(
self, message: types.Message, state: FSMContext, **kwargs
):
async def resend_message_in_group_for_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle messages in admin chat states"""
# User service operations with metrics
await self.user_service.update_user_activity(message.from_user.id)
await message.forward(chat_id=self.settings.group_for_message)
current_date = datetime.now()
date = int(current_date.timestamp())
await self.db.add_message(
message.text, message.from_user.id, message.message_id + 1, date
)
question = messages.get_message(get_first_name(message), "QUESTION")
await self.db.add_message(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(get_first_name(message), 'QUESTION')
user_state = await state.get_state()
if user_state == FSM_STATES["PRE_CHAT"]:
markup = await get_reply_keyboard(self.db, message.from_user.id)
await message.answer(question, reply_markup=markup)
@@ -312,44 +236,48 @@ class PrivateHandlers:
# Factory function to create handlers with dependencies
def create_private_handlers(
db: AsyncBotDB, settings: BotSettings, s3_storage=None
) -> PrivateHandlers:
def create_private_handlers(db: AsyncBotDB, settings: BotSettings, s3_storage=None, scoring_manager=None) -> PrivateHandlers:
"""Create private handlers instance with dependencies"""
return PrivateHandlers(db, settings, s3_storage)
return PrivateHandlers(db, settings, s3_storage, scoring_manager)
# Legacy router for backward compatibility
private_router = Router()
# Флаг инициализации для защиты от повторного вызова
_legacy_router_initialized = False
# Initialize with global dependencies (for backward compatibility)
def init_legacy_router():
"""Initialize legacy router with global dependencies"""
global private_router
global private_router, _legacy_router_initialized
if _legacy_router_initialized:
return
from helper_bot.utils.base_dependency_factory import get_global_instance
bdf = get_global_instance()
settings = BotSettings(
group_for_posts=bdf.settings["Telegram"]["group_for_posts"],
group_for_message=bdf.settings["Telegram"]["group_for_message"],
main_public=bdf.settings["Telegram"]["main_public"],
group_for_logs=bdf.settings["Telegram"]["group_for_logs"],
important_logs=bdf.settings["Telegram"]["important_logs"],
preview_link=bdf.settings["Telegram"]["preview_link"],
logs=bdf.settings["Settings"]["logs"],
test=bdf.settings["Settings"]["test"],
group_for_posts=bdf.settings['Telegram']['group_for_posts'],
group_for_message=bdf.settings['Telegram']['group_for_message'],
main_public=bdf.settings['Telegram']['main_public'],
group_for_logs=bdf.settings['Telegram']['group_for_logs'],
important_logs=bdf.settings['Telegram']['important_logs'],
preview_link=bdf.settings['Telegram']['preview_link'],
logs=bdf.settings['Settings']['logs'],
test=bdf.settings['Settings']['test']
)
db = bdf.get_db()
s3_storage = bdf.get_s3_storage()
handlers = create_private_handlers(db, settings, s3_storage)
scoring_manager = bdf.get_scoring_manager()
handlers = create_private_handlers(db, settings, s3_storage, scoring_manager)
# Instead of trying to copy handlers, we'll use the new router directly
# This maintains backward compatibility while using the new architecture
private_router = handlers.router
_legacy_router_initialized = True
# Initialize legacy router
init_legacy_router()

View File

@@ -12,61 +12,37 @@ from typing import Any, Callable, Dict, Protocol, Union
# Third-party imports
from aiogram import types
from aiogram.types import FSInputFile
from database.models import TelegramPost, User
from helper_bot.keyboards import get_reply_keyboard_for_post
# Local imports - utilities
from helper_bot.utils.helper_func import (
add_in_db_media,
check_username_and_full_name,
determine_anonymity,
get_first_name,
get_text_message,
prepare_media_group_from_middlewares,
send_audio_message,
send_media_group_message_to_private_chat,
send_photo_message,
send_text_message,
send_video_message,
send_video_note_message,
send_voice_message,
)
add_in_db_media, check_username_and_full_name, determine_anonymity,
get_first_name, get_text_message, prepare_media_group_from_middlewares,
send_audio_message, send_media_group_message_to_private_chat,
send_photo_message, send_text_message, send_video_message,
send_video_note_message, send_voice_message)
# Local imports - metrics
from helper_bot.utils.metrics import (
db_query_time,
track_errors,
track_file_operations,
track_media_processing,
track_time,
)
from helper_bot.utils.metrics import (db_query_time, track_errors,
track_file_operations,
track_media_processing, track_time)
from logs.custom_logger import logger
class DatabaseProtocol(Protocol):
"""Protocol for database operations"""
async def user_exists(self, user_id: int) -> bool: ...
async def add_user(self, user: User) -> None: ...
async def update_user_info(
self, user_id: int, username: str = None, full_name: str = None
) -> None: ...
async def update_user_info(self, user_id: int, username: str = None, full_name: str = None) -> None: ...
async def update_user_date(self, user_id: int) -> None: ...
async def add_post(self, post: TelegramPost) -> None: ...
async def update_stickers_info(self, user_id: int) -> None: ...
async def add_message(
self, message_text: str, user_id: int, message_id: int, date: int = None
) -> None: ...
async def update_helper_message(
self, message_id: int, helper_message_id: int
) -> None: ...
async def add_message(self, message_text: str, user_id: int, message_id: int, date: int = None) -> None: ...
async def update_helper_message(self, message_id: int, helper_message_id: int) -> None: ...
@dataclass
class BotSettings:
"""Bot configuration settings"""
group_for_posts: str
group_for_message: str
main_public: str
@@ -79,18 +55,18 @@ class BotSettings:
class UserService:
"""Service for user-related operations"""
def __init__(self, db: DatabaseProtocol, settings: BotSettings) -> None:
self.db = db
self.settings = settings
@track_time("update_user_activity", "user_service")
@track_errors("user_service", "update_user_activity")
@db_query_time("update_user_activity", "users", "update")
async def update_user_activity(self, user_id: int) -> None:
"""Update user's last activity timestamp with metrics tracking"""
await self.db.update_user_date(user_id)
@track_time("ensure_user_exists", "user_service")
@track_errors("user_service", "ensure_user_exists")
@db_query_time("ensure_user_exists", "users", "insert")
@@ -98,56 +74,52 @@ class UserService:
"""Ensure user exists in database, create if needed with metrics tracking"""
user_id = message.from_user.id
full_name = message.from_user.full_name
username = message.from_user.username or "private_username"
# Сохраняем только реальный username, если его нет - сохраняем None/пустую строку
username = message.from_user.username
first_name = get_first_name(message)
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
# Create User object with current timestamp
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username,
username=username, # Может быть None - это нормально
is_bot=is_bot,
language_code=language_code,
emoji="",
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False,
voice_bot_welcome_received=False
)
# Пытаемся создать пользователя (если уже существует - игнорируем)
# Это устраняет race condition и упрощает логику
await self.db.add_user(user)
# Проверяем, нужно ли обновить информацию о существующем пользователе
is_need_update = await check_username_and_full_name(
user_id, username, full_name, self.db
)
is_need_update = await check_username_and_full_name(user_id, username, full_name, self.db)
if is_need_update:
await self.db.update_user_info(user_id, username, full_name)
safe_full_name = (
html.escape(full_name) if full_name else "Неизвестный пользователь"
)
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
# Для отображения используем подстановочное значение, но в БД сохраняем только реальный username
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}"
)
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}")
await message.bot.send_message(
chat_id=self.settings.group_for_logs,
text=f"Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}",
)
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}')
await self.db.update_user_date(user_id)
async def log_user_message(self, message: types.Message) -> None:
"""Forward user message to logs group with metrics tracking"""
await message.forward(chat_id=self.settings.group_for_logs)
def get_safe_user_info(self, message: types.Message) -> tuple[str, str]:
"""Get safely escaped user information for logging"""
full_name = message.from_user.full_name or "Неизвестный пользователь"
@@ -157,130 +129,401 @@ class UserService:
class PostService:
"""Service for post-related operations"""
def __init__(
self, db: DatabaseProtocol, settings: BotSettings, s3_storage=None
) -> None:
def __init__(self, db: DatabaseProtocol, settings: BotSettings, s3_storage=None, scoring_manager=None) -> None:
self.db = db
self.settings = settings
self.s3_storage = s3_storage
async def _save_media_background(
self, sent_message: types.Message, bot_db: Any, s3_storage
) -> None:
self.scoring_manager = scoring_manager
async def _save_media_background(self, sent_message: types.Message, bot_db: Any, s3_storage) -> None:
"""Сохраняет медиа в фоне, чтобы не блокировать ответ пользователю"""
try:
success = await add_in_db_media(sent_message, bot_db, s3_storage)
if not success:
logger.warning(
f"_save_media_background: Не удалось сохранить медиа для поста {sent_message.message_id}"
)
logger.warning(f"_save_media_background: Не удалось сохранить медиа для поста {sent_message.message_id}")
except Exception as e:
logger.error(
f"_save_media_background: Ошибка при сохранении медиа для поста {sent_message.message_id}: {e}"
logger.error(f"_save_media_background: Ошибка при сохранении медиа для поста {sent_message.message_id}: {e}")
async def _get_scores(self, text: str) -> tuple:
"""
Получает скоры для текста поста.
Returns:
Tuple (deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json)
"""
if not self.scoring_manager or not text or not text.strip():
return None, None, None, None, None
try:
scores = await self.scoring_manager.score_post(text)
# Формируем JSON для сохранения в БД
import json
ml_scores_json = json.dumps(scores.to_json_dict()) if scores.has_any_score() else None
# Получаем данные от RAG
rag_confidence = scores.rag.confidence if scores.rag else None
rag_score_pos_only = scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None
return scores.deepseek_score, scores.rag_score, rag_confidence, rag_score_pos_only, ml_scores_json
except Exception as e:
logger.error(f"PostService: Ошибка получения скоров: {e}")
return None, None, None, None, None
async def _save_scores_background(self, message_id: int, ml_scores_json: str) -> None:
"""Сохраняет скоры в БД в фоне."""
if ml_scores_json:
try:
await self.db.update_ml_scores(message_id, ml_scores_json)
except Exception as e:
logger.error(f"PostService: Ошибка сохранения скоров для {message_id}: {e}")
async def _get_scores_with_error_handling(self, text: str) -> tuple:
"""
Получает скоры для текста поста с обработкой ошибок.
Returns:
Tuple (deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, error_message)
error_message будет None если все ок, или строка с описанием ошибки
"""
if not self.scoring_manager:
# Скоры выключены в .env - это нормально
return None, None, None, None, None, None
if not text or not text.strip():
return None, None, None, None, None, None
try:
scores = await self.scoring_manager.score_post(text)
# Формируем JSON для сохранения в БД
import json
ml_scores_json = json.dumps(scores.to_json_dict()) if scores.has_any_score() else None
# Получаем данные от RAG
rag_confidence = scores.rag.confidence if scores.rag else None
rag_score_pos_only = scores.rag.metadata.get("rag_score_pos_only") if scores.rag else None
return scores.deepseek_score, scores.rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, None
except Exception as e:
logger.error(f"PostService: Ошибка получения скоров: {e}")
# Возвращаем частичные скоры если есть, или сообщение об ошибке
error_message = "Не удалось рассчитать скоры"
return None, None, None, None, None, error_message
@track_time("_process_post_background", "post_service")
@track_errors("post_service", "_process_post_background")
async def _process_post_background(
self,
message: types.Message,
first_name: str,
content_type: str,
album: Union[list, None] = None
) -> None:
"""
Обрабатывает пост в фоне: получает скоры, отправляет в группу модерации, сохраняет в БД.
Args:
message: Сообщение от пользователя
first_name: Имя пользователя
content_type: Тип контента ('text', 'photo', 'video', 'audio', 'voice', 'video_note', 'media_group')
album: Список сообщений медиагруппы (только для media_group)
"""
try:
# Определяем исходный текст для скоринга и определения анонимности
original_raw_text = ""
if content_type == "text":
original_raw_text = message.text or ""
elif content_type == "media_group":
original_raw_text = album[0].caption or "" if album and album[0].caption else ""
else:
original_raw_text = message.caption or ""
# Получаем скоры с обработкой ошибок
deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json, error_message = \
await self._get_scores_with_error_handling(original_raw_text)
# Формируем текст для поста (с сообщением об ошибке если есть)
text_for_post = original_raw_text
if error_message:
# Для текстовых постов добавляем в конец текста
if content_type == "text":
text_for_post = f"{original_raw_text}\n\n⚠️ {error_message}"
# Для медиа добавляем в caption
elif content_type in ("photo", "video", "audio") and original_raw_text:
text_for_post = f"{original_raw_text}\n\n⚠️ {error_message}"
# Формируем текст/caption с учетом скоров
post_text = ""
if text_for_post or content_type == "text":
post_text = get_text_message(
text_for_post.lower() if text_for_post else "",
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
rag_confidence=rag_confidence,
rag_score_pos_only=rag_score_pos_only,
)
# Определяем анонимность по исходному тексту (без сообщения об ошибке)
is_anonymous = determine_anonymity(original_raw_text)
markup = get_reply_keyboard_for_post()
sent_message = None
# Отправляем пост в группу модерации в зависимости от типа
if content_type == "text":
sent_message = await send_text_message(
self.settings.group_for_posts, message, post_text, markup
)
elif content_type == "photo":
sent_message = await send_photo_message(
self.settings.group_for_posts, message, message.photo[-1].file_id, post_text, markup
)
elif content_type == "video":
sent_message = await send_video_message(
self.settings.group_for_posts, message, message.video.file_id, post_text, markup
)
elif content_type == "audio":
sent_message = await send_audio_message(
self.settings.group_for_posts, message, message.audio.file_id, post_text, markup
)
elif content_type == "voice":
sent_message = await send_voice_message(
self.settings.group_for_posts, message, message.voice.file_id, markup
)
elif content_type == "video_note":
sent_message = await send_video_note_message(
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
elif content_type == "media_group":
# Для медиагруппы используем специальную обработку
# Передаем ml_scores_json для сохранения в БД
await self._process_media_group_background(
message, album, first_name, post_text, is_anonymous, original_raw_text, ml_scores_json
)
return
else:
logger.error(f"PostService: Неподдерживаемый тип контента: {content_type}")
return
if not sent_message:
logger.error(f"PostService: Не удалось отправить пост типа {content_type}")
return
# Сохраняем пост в БД (сохраняем исходный текст, без сообщения об ошибке)
post = TelegramPost(
message_id=sent_message.message_id,
text=original_raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем медиа и скоры в фоне
if content_type in ("photo", "video", "audio", "voice", "video_note"):
asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage))
if ml_scores_json:
asyncio.create_task(self._save_scores_background(sent_message.message_id, ml_scores_json))
except Exception as e:
logger.error(f"PostService: Критическая ошибка в _process_post_background для {content_type}: {e}")
async def _process_media_group_background(
self,
message: types.Message,
album: list,
first_name: str,
post_caption: str,
is_anonymous: bool,
original_raw_text: str,
ml_scores_json: str = None
) -> None:
"""Обрабатывает медиагруппу в фоне"""
try:
media_group = await prepare_media_group_from_middlewares(album, post_caption)
media_group_message_ids = await send_media_group_message_to_private_chat(
self.settings.group_for_posts, message, media_group, self.db, None, self.s3_storage
)
main_post_id = media_group_message_ids[-1]
main_post = TelegramPost(
message_id=main_post_id,
text=original_raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous
)
await self.db.add_post(main_post)
# Сохраняем скоры в фоне (если они были получены)
if ml_scores_json:
asyncio.create_task(self._save_scores_background(main_post_id, ml_scores_json))
for msg_id in media_group_message_ids:
await self.db.add_message_link(main_post_id, msg_id)
await asyncio.sleep(0.2)
markup = get_reply_keyboard_for_post()
helper_message = await send_text_message(
self.settings.group_for_posts,
message,
"^",
markup
)
helper_message_id = helper_message.message_id
helper_post = TelegramPost(
message_id=helper_message_id,
text="^",
author_id=message.from_user.id,
helper_text_message_id=main_post_id,
created_at=int(datetime.now().timestamp())
)
await self.db.add_post(helper_post)
await self.db.update_helper_message(
message_id=main_post_id,
helper_message_id=helper_message_id
)
except Exception as e:
logger.error(f"PostService: Ошибка в _process_media_group_background: {e}")
@track_time("handle_text_post", "post_service")
@track_errors("post_service", "handle_text_post")
@db_query_time("handle_text_post", "posts", "insert")
async def handle_text_post(self, message: types.Message, first_name: str) -> None:
"""Handle text post submission"""
raw_text = message.text or ""
# Получаем скоры для текста
deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json = await self._get_scores(raw_text)
# Формируем текст с учетом скоров
post_text = get_text_message(
message.text.lower(), first_name, message.from_user.username
message.text.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
rag_confidence=rag_confidence,
rag_score_pos_only=rag_score_pos_only,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_text_message(
self.settings.group_for_posts, message, post_text, markup
)
# Сохраняем сырой текст и определяем анонимность
raw_text = message.text or ""
sent_message = await send_text_message(self.settings.group_for_posts, message, post_text, markup)
# Определяем анонимность
is_anonymous = determine_anonymity(raw_text)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_text,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем скоры в фоне
if ml_scores_json:
asyncio.create_task(self._save_scores_background(sent_message.message_id, ml_scores_json))
@track_time("handle_photo_post", "post_service")
@track_errors("post_service", "handle_photo_post")
@db_query_time("handle_photo_post", "posts", "insert")
async def handle_photo_post(self, message: types.Message, first_name: str) -> None:
"""Handle photo post submission"""
raw_caption = message.caption or ""
# Получаем скоры для текста
deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json = await self._get_scores(raw_caption)
post_caption = ""
if message.caption:
post_caption = get_text_message(
message.caption.lower(), first_name, message.from_user.username
message.caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
rag_confidence=rag_confidence,
rag_score_pos_only=rag_score_pos_only,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_photo_message(
self.settings.group_for_posts,
message,
message.photo[-1].file_id,
post_caption,
markup,
self.settings.group_for_posts, message, message.photo[-1].file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
# Определяем анонимность
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
# Сохраняем медиа и скоры в фоне
asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage))
if ml_scores_json:
asyncio.create_task(self._save_scores_background(sent_message.message_id, ml_scores_json))
@track_time("handle_video_post", "post_service")
@track_errors("post_service", "handle_video_post")
@db_query_time("handle_video_post", "posts", "insert")
async def handle_video_post(self, message: types.Message, first_name: str) -> None:
"""Handle video post submission"""
raw_caption = message.caption or ""
# Получаем скоры для текста
deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json = await self._get_scores(raw_caption)
post_caption = ""
if message.caption:
post_caption = get_text_message(
message.caption.lower(), first_name, message.from_user.username
message.caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
rag_confidence=rag_confidence,
rag_score_pos_only=rag_score_pos_only,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_video_message(
self.settings.group_for_posts,
message,
message.video.file_id,
post_caption,
markup,
self.settings.group_for_posts, message, message.video.file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
# Определяем анонимность
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
# Сохраняем медиа и скоры в фоне
asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage))
if ml_scores_json:
asyncio.create_task(self._save_scores_background(sent_message.message_id, ml_scores_json))
@track_time("handle_video_note_post", "post_service")
@track_errors("post_service", "handle_video_note_post")
@db_query_time("handle_video_note_post", "posts", "insert")
@@ -290,61 +533,66 @@ class PostService:
sent_message = await send_video_note_message(
self.settings.group_for_posts, message, message.video_note.file_id, markup
)
# Сохраняем пустую строку, так как video_note не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage))
@track_time("handle_audio_post", "post_service")
@track_errors("post_service", "handle_audio_post")
@db_query_time("handle_audio_post", "posts", "insert")
async def handle_audio_post(self, message: types.Message, first_name: str) -> None:
"""Handle audio post submission"""
raw_caption = message.caption or ""
# Получаем скоры для текста
deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json = await self._get_scores(raw_caption)
post_caption = ""
if message.caption:
post_caption = get_text_message(
message.caption.lower(), first_name, message.from_user.username
message.caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
rag_confidence=rag_confidence,
rag_score_pos_only=rag_score_pos_only,
)
markup = get_reply_keyboard_for_post()
sent_message = await send_audio_message(
self.settings.group_for_posts,
message,
message.audio.file_id,
post_caption,
markup,
self.settings.group_for_posts, message, message.audio.file_id, post_caption, markup
)
# Сохраняем сырой caption и определяем анонимность
raw_caption = message.caption or ""
# Определяем анонимность
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
# Сохраняем медиа и скоры в фоне
asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage))
if ml_scores_json:
asyncio.create_task(self._save_scores_background(sent_message.message_id, ml_scores_json))
@track_time("handle_voice_post", "post_service")
@track_errors("post_service", "handle_voice_post")
@db_query_time("handle_voice_post", "posts", "insert")
@@ -354,146 +602,142 @@ class PostService:
sent_message = await send_voice_message(
self.settings.group_for_posts, message, message.voice.file_id, markup
)
# Сохраняем пустую строку, так как voice не имеет caption
raw_caption = ""
is_anonymous = determine_anonymity(raw_caption)
post = TelegramPost(
message_id=sent_message.message_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(post)
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(
self._save_media_background(sent_message, self.db, self.s3_storage)
)
asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage))
@track_time("handle_media_group_post", "post_service")
@track_errors("post_service", "handle_media_group_post")
@track_errors("post_service", "handle_media_group_post")
@db_query_time("handle_media_group_post", "posts", "insert")
@track_media_processing("media_group")
async def handle_media_group_post(
self, message: types.Message, album: list, first_name: str
) -> None:
async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None:
"""Handle media group post submission"""
post_caption = " "
raw_caption = ""
ml_scores_json = None
if album and album[0].caption:
raw_caption = album[0].caption or ""
# Получаем скоры для текста
deepseek_score, rag_score, rag_confidence, rag_score_pos_only, ml_scores_json = await self._get_scores(raw_caption)
post_caption = get_text_message(
album[0].caption.lower(), first_name, message.from_user.username
album[0].caption.lower(),
first_name,
message.from_user.username,
deepseek_score=deepseek_score,
rag_score=rag_score,
rag_confidence=rag_confidence,
rag_score_pos_only=rag_score_pos_only,
)
is_anonymous = determine_anonymity(raw_caption)
media_group = await prepare_media_group_from_middlewares(album, post_caption)
media_group_message_ids = await send_media_group_message_to_private_chat(
self.settings.group_for_posts,
message,
media_group,
self.db,
None,
self.s3_storage,
self.settings.group_for_posts, message, media_group, self.db, None, self.s3_storage
)
main_post_id = media_group_message_ids[-1]
main_post = TelegramPost(
message_id=main_post_id,
text=raw_caption,
author_id=message.from_user.id,
created_at=int(datetime.now().timestamp()),
is_anonymous=is_anonymous,
is_anonymous=is_anonymous
)
await self.db.add_post(main_post)
# Сохраняем скоры в фоне
if ml_scores_json:
asyncio.create_task(self._save_scores_background(main_post_id, ml_scores_json))
for msg_id in media_group_message_ids:
await self.db.add_message_link(main_post_id, msg_id)
await asyncio.sleep(0.2)
markup = get_reply_keyboard_for_post()
helper_message = await send_text_message(
self.settings.group_for_posts, message, "^", markup
self.settings.group_for_posts,
message,
"^",
markup
)
helper_message_id = helper_message.message_id
helper_post = TelegramPost(
message_id=helper_message_id,
text="^",
author_id=message.from_user.id,
helper_text_message_id=main_post_id,
created_at=int(datetime.now().timestamp()),
created_at=int(datetime.now().timestamp())
)
await self.db.add_post(helper_post)
await self.db.update_helper_message(
message_id=main_post_id, helper_message_id=helper_message_id
message_id=main_post_id,
helper_message_id=helper_message_id
)
@track_time("process_post", "post_service")
@track_errors("post_service", "process_post")
@track_media_processing("media_group")
async def process_post(
self, message: types.Message, album: Union[list, None] = None
) -> None:
"""Process post based on content type"""
async def process_post(self, message: types.Message, album: Union[list, None] = None) -> None:
"""
Запускает обработку поста в фоне.
Не блокирует выполнение - сразу возвращает управление.
"""
first_name = get_first_name(message)
if message.media_group_id is not None:
await self.handle_media_group_post(message, album, first_name)
return
content_handlers: Dict[str, Callable] = {
"text": lambda: self.handle_text_post(message, first_name),
"photo": lambda: self.handle_photo_post(message, first_name),
"video": lambda: self.handle_video_post(message, first_name),
"video_note": lambda: self.handle_video_note_post(message),
"audio": lambda: self.handle_audio_post(message, first_name),
"voice": lambda: self.handle_voice_post(message),
}
handler = content_handlers.get(message.content_type)
if handler:
await handler()
else:
from .constants import ERROR_MESSAGES
await message.bot.send_message(
message.chat.id, ERROR_MESSAGES["UNSUPPORTED_CONTENT"]
)
# Определяем тип контента
content_type = "media_group" if message.media_group_id is not None else message.content_type
# Запускаем фоновую обработку
asyncio.create_task(
self._process_post_background(message, first_name, content_type, album)
)
class StickerService:
"""Service for sticker-related operations"""
def __init__(self, settings: BotSettings) -> None:
self.settings = settings
@track_time("send_random_hello_sticker", "sticker_service")
@track_errors("sticker_service", "send_random_hello_sticker")
@track_file_operations("sticker")
async def send_random_hello_sticker(self, message: types.Message) -> None:
"""Send random hello sticker with metrics tracking"""
name_stick_hello = list(Path("Stick").rglob("Hello_*"))
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
if not name_stick_hello:
return
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
await message.answer_sticker(random_stick_hello)
await asyncio.sleep(0.3)
@track_time("send_random_goodbye_sticker", "sticker_service")
@track_errors("sticker_service", "send_random_goodbye_sticker")
@track_file_operations("sticker")
async def send_random_goodbye_sticker(self, message: types.Message) -> None:
"""Send random goodbye sticker with metrics tracking"""
name_stick_bye = list(Path("Stick").rglob("Universal_*"))
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
if not name_stick_bye:
return
random_stick_bye = random.choice(name_stick_bye)