From 5a90591564952c457d3142b5325508529c5cf168 Mon Sep 17 00:00:00 2001 From: Andrey Date: Sat, 24 Jan 2026 01:35:36 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=20=D0=B0=D1=81=D0=B8=D0=BD=D1=85=D1=80=D0=BE=D0=BD=D0=BD?= =?UTF-8?q?=D1=8B=D0=B9=20=D0=BC=D0=B5=D1=85=D0=B0=D0=BD=D0=B8=D0=B7=D0=BC?= =?UTF-8?q?=20=D0=BE=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D0=BA=D0=B8=20?= =?UTF-8?q?=D0=BC=D0=B5=D0=B4=D0=B8=D0=B0=D0=B3=D1=80=D1=83=D0=BF=D0=BF=20?= =?UTF-8?q?=D0=B2=20`PrivateHandlers`=20=D0=B8=20=D1=83=D0=BB=D1=83=D1=87?= =?UTF-8?q?=D1=88=D0=B5=D0=BD=20`AlbumMiddleware`=20=D0=B4=D0=BB=D1=8F=20?= =?UTF-8?q?=D0=B1=D0=BE=D0=BB=D0=B5=D0=B5=20=D1=8D=D1=84=D1=84=D0=B5=D0=BA?= =?UTF-8?q?=D1=82=D0=B8=D0=B2=D0=BD=D0=BE=D0=B3=D0=BE=20=D1=81=D0=B1=D0=BE?= =?UTF-8?q?=D1=80=D0=B0=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD=D0=B8?= =?UTF-8?q?=D0=B9.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Реализована фоновая обработка медиагрупп, позволяющая пользователю получать ответ сразу, пока происходит сбор сообщений. - Введен класс `AlbumGetter` для получения полной медиагруппы с использованием событий. - Обновлены методы в `AlbumMiddleware` для поддержки нового функционала и улучшения логики обработки сообщений. --- .../handlers/private/private_handlers.py | 44 +++++-- helper_bot/middlewares/album_middleware.py | 118 +++++++++++++++--- 2 files changed, 135 insertions(+), 27 deletions(-) diff --git a/helper_bot/handlers/private/private_handlers.py b/helper_bot/handlers/private/private_handlers.py index 1dace7d..1974cee 100644 --- a/helper_bot/handlers/private/private_handlers.py +++ b/helper_bot/handlers/private/private_handlers.py @@ -157,14 +157,42 @@ class PrivateHandlers: @track_time("suggest_router", "private_handlers") async def suggest_router(self, message: types.Message, state: FSMContext, album: list = None, **kwargs): """Handle post submission in suggest state""" - await self.user_service.update_user_activity(message.from_user.id) - if message.media_group_id is None: - await self.user_service.log_user_message(message) - await self.post_service.process_post(message, album) - markup_for_user = await get_reply_keyboard(self.db, message.from_user.id) - success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') - await message.answer(success_send_message, reply_markup=markup_for_user) - await state.set_state(FSM_STATES["START"]) + # Проверяем, есть ли механизм для получения полной медиагруппы (для медиагрупп) + album_getter = kwargs.get("album_getter") + + if album_getter and message.media_group_id: + # Это медиагруппа - сразу отвечаем пользователю, обработку делаем в фоне + markup_for_user = await get_reply_keyboard(self.db, message.from_user.id) + success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') + await message.answer(success_send_message, reply_markup=markup_for_user) + await state.set_state(FSM_STATES["START"]) + + # В фоне ждем полную медиагруппу и обрабатываем пост + async def process_media_group_background(): + try: + # Ждем полную медиагруппу + full_album = await album_getter.get_album(timeout=10.0) + if not full_album: + return + + # Обрабатываем пост с полной медиагруппой + await self.user_service.update_user_activity(message.from_user.id) + await self.post_service.process_post(message, full_album) + except Exception as e: + from logs.custom_logger import logger + logger.error(f"Ошибка при фоновой обработке медиагруппы: {e}") + + asyncio.create_task(process_media_group_background()) + else: + # Обычное сообщение или медиагруппа уже собрана - обрабатываем синхронно + await self.user_service.update_user_activity(message.from_user.id) + if message.media_group_id is None: + await self.user_service.log_user_message(message) + await self.post_service.process_post(message, album) + markup_for_user = await get_reply_keyboard(self.db, message.from_user.id) + success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') + await message.answer(success_send_message, reply_markup=markup_for_user) + await state.set_state(FSM_STATES["START"]) @error_handler @track_errors("private_handlers", "stickers") diff --git a/helper_bot/middlewares/album_middleware.py b/helper_bot/middlewares/album_middleware.py index a94022a..e190955 100644 --- a/helper_bot/middlewares/album_middleware.py +++ b/helper_bot/middlewares/album_middleware.py @@ -1,14 +1,42 @@ import asyncio -from typing import Any, Dict, Union, List +from typing import Any, Dict, Union, List, Optional from aiogram import BaseMiddleware from aiogram.types import Message +class AlbumGetter: + """Вспомогательный класс для получения полной медиагруппы из middleware""" + + def __init__(self, album_data: Dict[str, Any], media_group_id: str, event: asyncio.Event): + self.album_data = album_data + self.media_group_id = media_group_id + self.event = event + + async def get_album(self, timeout: float = 10.0) -> Optional[List[Message]]: + """ + Ждет полную медиагруппу и возвращает ее. + + Args: + timeout: Максимальное время ожидания в секундах + + Returns: + Список сообщений медиагруппы или None при таймауте + """ + try: + await asyncio.wait_for(self.event.wait(), timeout=timeout) + if self.media_group_id in self.album_data: + return self.album_data[self.media_group_id].get("collected_album") + return None + except asyncio.TimeoutError: + return None + + class AlbumMiddleware(BaseMiddleware): """ Middleware для обработки медиа групп в Telegram. Собирает все сообщения одной медиа группы и передает их как album в data. + Не блокирует handler - сразу вызывает его, а полную медиагруппу передает через Event. """ def __init__(self, latency: Union[int, float] = 5.0): @@ -20,7 +48,8 @@ class AlbumMiddleware(BaseMiddleware): """ super().__init__() self.latency = latency - self.album_data: Dict[str, Dict[str, List[Message]]] = {} + # Храним данные медиагруппы: messages, event для уведомления, task для сбора + self.album_data: Dict[str, Dict[str, Any]] = {} def collect_album_messages(self, event: Message) -> int: """ @@ -41,12 +70,53 @@ class AlbumMiddleware(BaseMiddleware): self.album_data[event.media_group_id]["messages"].append(event) return len(self.album_data[event.media_group_id]["messages"]) + async def _collect_album_background(self, media_group_id: str) -> None: + """ + Фоновая задача для сбора всех сообщений медиагруппы. + + Args: + media_group_id: ID медиагруппы для сбора + """ + try: + await asyncio.sleep(self.latency) + + if media_group_id not in self.album_data: + return + + # Получаем текущий список сообщений + album_messages = self.album_data[media_group_id]["messages"].copy() + album_messages.sort(key=lambda x: x.message_id) + + # Сохраняем собранную медиагруппу и уведомляем через Event + self.album_data[media_group_id]["collected_album"] = album_messages + self.album_data[media_group_id]["event"].set() + + # Очищаем данные после небольшой задержки (чтобы handler успел получить album) + await asyncio.sleep(1.0) + if media_group_id in self.album_data: + task = self.album_data[media_group_id].get("task") + if task and not task.done(): + task.cancel() + del self.album_data[media_group_id] + except Exception: + # В случае ошибки все равно уведомляем, чтобы handler не завис + if media_group_id in self.album_data: + self.album_data[media_group_id]["event"].set() + # Очищаем данные даже при ошибке + try: + task = self.album_data[media_group_id].get("task") + if task and not task.done(): + task.cancel() + del self.album_data[media_group_id] + except Exception: + pass + async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any: """ Основная логика middleware. - Собирает все сообщения медиагруппы и обрабатывает только последнее сообщение - после завершения сбора всех сообщений. + Для медиагрупп: сразу вызывает handler, передавая Event для получения полной медиагруппы. + Для обычных сообщений: сразу вызывает handler. Args: handler: Обработчик события @@ -62,26 +132,36 @@ class AlbumMiddleware(BaseMiddleware): media_group_id = event.media_group_id message_id = event.message_id + # Если это первое сообщение медиагруппы - создаем структуру данных + is_first_message = False if media_group_id not in self.album_data: - self.album_data[media_group_id] = {"messages": []} + is_first_message = True + album_event = asyncio.Event() + self.album_data[media_group_id] = { + "messages": [], + "event": album_event, + "task": None, + "first_message_id": message_id + } + # Запускаем фоновую задачу для сбора медиагруппы + task = asyncio.create_task(self._collect_album_background(media_group_id)) + self.album_data[media_group_id]["task"] = task + # Добавляем сообщение в медиагруппу self.album_data[media_group_id]["messages"].append(event) - count_before = len(self.album_data[media_group_id]["messages"]) - await asyncio.sleep(self.latency) - - count_after = len(self.album_data[media_group_id]["messages"]) - if count_before != count_after: + # Обрабатываем только первое сообщение медиагруппы + if not is_first_message: + # Для остальных сообщений просто возвращаемся, не вызывая handler return - album_messages = self.album_data[media_group_id]["messages"] - album_messages.sort(key=lambda x: x.message_id) - last_message_id = album_messages[-1].message_id - - if message_id != last_message_id: - return - - data["album"] = album_messages - del self.album_data[media_group_id] + # Передаем объект-геттер в data, чтобы handler мог получить полную медиагруппу + album_getter = AlbumGetter( + self.album_data, + media_group_id, + self.album_data[media_group_id]["event"] + ) + data["album_getter"] = album_getter + # Сразу вызываем handler только для первого сообщения (не блокируем) return await handler(event, data)