Dev 11 #13
@@ -157,6 +157,34 @@ class PrivateHandlers:
|
||||
@track_time("suggest_router", "private_handlers")
|
||||
async def suggest_router(self, message: types.Message, state: FSMContext, album: list = None, **kwargs):
|
||||
"""Handle post submission in suggest state"""
|
||||
# Проверяем, есть ли механизм для получения полной медиагруппы (для медиагрупп)
|
||||
album_getter = kwargs.get("album_getter")
|
||||
|
||||
if album_getter and message.media_group_id:
|
||||
# Это медиагруппа - сразу отвечаем пользователю, обработку делаем в фоне
|
||||
markup_for_user = await get_reply_keyboard(self.db, message.from_user.id)
|
||||
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
|
||||
await message.answer(success_send_message, reply_markup=markup_for_user)
|
||||
await state.set_state(FSM_STATES["START"])
|
||||
|
||||
# В фоне ждем полную медиагруппу и обрабатываем пост
|
||||
async def process_media_group_background():
|
||||
try:
|
||||
# Ждем полную медиагруппу
|
||||
full_album = await album_getter.get_album(timeout=10.0)
|
||||
if not full_album:
|
||||
return
|
||||
|
||||
# Обрабатываем пост с полной медиагруппой
|
||||
await self.user_service.update_user_activity(message.from_user.id)
|
||||
await self.post_service.process_post(message, full_album)
|
||||
except Exception as e:
|
||||
from logs.custom_logger import logger
|
||||
logger.error(f"Ошибка при фоновой обработке медиагруппы: {e}")
|
||||
|
||||
asyncio.create_task(process_media_group_background())
|
||||
else:
|
||||
# Обычное сообщение или медиагруппа уже собрана - обрабатываем синхронно
|
||||
await self.user_service.update_user_activity(message.from_user.id)
|
||||
if message.media_group_id is None:
|
||||
await self.user_service.log_user_message(message)
|
||||
|
||||
@@ -1,14 +1,42 @@
|
||||
import asyncio
|
||||
from typing import Any, Dict, Union, List
|
||||
from typing import Any, Dict, Union, List, Optional
|
||||
|
||||
from aiogram import BaseMiddleware
|
||||
from aiogram.types import Message
|
||||
|
||||
|
||||
class AlbumGetter:
|
||||
"""Вспомогательный класс для получения полной медиагруппы из middleware"""
|
||||
|
||||
def __init__(self, album_data: Dict[str, Any], media_group_id: str, event: asyncio.Event):
|
||||
self.album_data = album_data
|
||||
self.media_group_id = media_group_id
|
||||
self.event = event
|
||||
|
||||
async def get_album(self, timeout: float = 10.0) -> Optional[List[Message]]:
|
||||
"""
|
||||
Ждет полную медиагруппу и возвращает ее.
|
||||
|
||||
Args:
|
||||
timeout: Максимальное время ожидания в секундах
|
||||
|
||||
Returns:
|
||||
Список сообщений медиагруппы или None при таймауте
|
||||
"""
|
||||
try:
|
||||
await asyncio.wait_for(self.event.wait(), timeout=timeout)
|
||||
if self.media_group_id in self.album_data:
|
||||
return self.album_data[self.media_group_id].get("collected_album")
|
||||
return None
|
||||
except asyncio.TimeoutError:
|
||||
return None
|
||||
|
||||
|
||||
class AlbumMiddleware(BaseMiddleware):
|
||||
"""
|
||||
Middleware для обработки медиа групп в Telegram.
|
||||
Собирает все сообщения одной медиа группы и передает их как album в data.
|
||||
Не блокирует handler - сразу вызывает его, а полную медиагруппу передает через Event.
|
||||
"""
|
||||
|
||||
def __init__(self, latency: Union[int, float] = 5.0):
|
||||
@@ -20,7 +48,8 @@ class AlbumMiddleware(BaseMiddleware):
|
||||
"""
|
||||
super().__init__()
|
||||
self.latency = latency
|
||||
self.album_data: Dict[str, Dict[str, List[Message]]] = {}
|
||||
# Храним данные медиагруппы: messages, event для уведомления, task для сбора
|
||||
self.album_data: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def collect_album_messages(self, event: Message) -> int:
|
||||
"""
|
||||
@@ -41,12 +70,53 @@ class AlbumMiddleware(BaseMiddleware):
|
||||
self.album_data[event.media_group_id]["messages"].append(event)
|
||||
return len(self.album_data[event.media_group_id]["messages"])
|
||||
|
||||
async def _collect_album_background(self, media_group_id: str) -> None:
|
||||
"""
|
||||
Фоновая задача для сбора всех сообщений медиагруппы.
|
||||
|
||||
Args:
|
||||
media_group_id: ID медиагруппы для сбора
|
||||
"""
|
||||
try:
|
||||
await asyncio.sleep(self.latency)
|
||||
|
||||
if media_group_id not in self.album_data:
|
||||
return
|
||||
|
||||
# Получаем текущий список сообщений
|
||||
album_messages = self.album_data[media_group_id]["messages"].copy()
|
||||
album_messages.sort(key=lambda x: x.message_id)
|
||||
|
||||
# Сохраняем собранную медиагруппу и уведомляем через Event
|
||||
self.album_data[media_group_id]["collected_album"] = album_messages
|
||||
self.album_data[media_group_id]["event"].set()
|
||||
|
||||
# Очищаем данные после небольшой задержки (чтобы handler успел получить album)
|
||||
await asyncio.sleep(1.0)
|
||||
if media_group_id in self.album_data:
|
||||
task = self.album_data[media_group_id].get("task")
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
del self.album_data[media_group_id]
|
||||
except Exception:
|
||||
# В случае ошибки все равно уведомляем, чтобы handler не завис
|
||||
if media_group_id in self.album_data:
|
||||
self.album_data[media_group_id]["event"].set()
|
||||
# Очищаем данные даже при ошибке
|
||||
try:
|
||||
task = self.album_data[media_group_id].get("task")
|
||||
if task and not task.done():
|
||||
task.cancel()
|
||||
del self.album_data[media_group_id]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any:
|
||||
"""
|
||||
Основная логика middleware.
|
||||
|
||||
Собирает все сообщения медиагруппы и обрабатывает только последнее сообщение
|
||||
после завершения сбора всех сообщений.
|
||||
Для медиагрупп: сразу вызывает handler, передавая Event для получения полной медиагруппы.
|
||||
Для обычных сообщений: сразу вызывает handler.
|
||||
|
||||
Args:
|
||||
handler: Обработчик события
|
||||
@@ -62,26 +132,36 @@ class AlbumMiddleware(BaseMiddleware):
|
||||
media_group_id = event.media_group_id
|
||||
message_id = event.message_id
|
||||
|
||||
# Если это первое сообщение медиагруппы - создаем структуру данных
|
||||
is_first_message = False
|
||||
if media_group_id not in self.album_data:
|
||||
self.album_data[media_group_id] = {"messages": []}
|
||||
is_first_message = True
|
||||
album_event = asyncio.Event()
|
||||
self.album_data[media_group_id] = {
|
||||
"messages": [],
|
||||
"event": album_event,
|
||||
"task": None,
|
||||
"first_message_id": message_id
|
||||
}
|
||||
# Запускаем фоновую задачу для сбора медиагруппы
|
||||
task = asyncio.create_task(self._collect_album_background(media_group_id))
|
||||
self.album_data[media_group_id]["task"] = task
|
||||
|
||||
# Добавляем сообщение в медиагруппу
|
||||
self.album_data[media_group_id]["messages"].append(event)
|
||||
count_before = len(self.album_data[media_group_id]["messages"])
|
||||
|
||||
await asyncio.sleep(self.latency)
|
||||
|
||||
count_after = len(self.album_data[media_group_id]["messages"])
|
||||
if count_before != count_after:
|
||||
# Обрабатываем только первое сообщение медиагруппы
|
||||
if not is_first_message:
|
||||
# Для остальных сообщений просто возвращаемся, не вызывая handler
|
||||
return
|
||||
|
||||
album_messages = self.album_data[media_group_id]["messages"]
|
||||
album_messages.sort(key=lambda x: x.message_id)
|
||||
last_message_id = album_messages[-1].message_id
|
||||
|
||||
if message_id != last_message_id:
|
||||
return
|
||||
|
||||
data["album"] = album_messages
|
||||
del self.album_data[media_group_id]
|
||||
# Передаем объект-геттер в data, чтобы handler мог получить полную медиагруппу
|
||||
album_getter = AlbumGetter(
|
||||
self.album_data,
|
||||
media_group_id,
|
||||
self.album_data[media_group_id]["event"]
|
||||
)
|
||||
data["album_getter"] = album_getter
|
||||
|
||||
# Сразу вызываем handler только для первого сообщения (не блокируем)
|
||||
return await handler(event, data)
|
||||
|
||||
Reference in New Issue
Block a user