import asyncio from typing import Any, Dict, Union, List from aiogram import BaseMiddleware from aiogram.types import Message class AlbumMiddleware(BaseMiddleware): """ Middleware для обработки медиа групп в Telegram. Собирает все сообщения одной медиа группы и передает их как album в data. """ def __init__(self, latency: Union[int, float] = 5.0): """ Инициализация middleware. Args: latency: Задержка в секундах для сбора всех сообщений медиа группы """ super().__init__() self.latency = latency self.album_data: Dict[str, Dict[str, List[Message]]] = {} def collect_album_messages(self, event: Message) -> int: """ Собирает сообщения одной медиа группы. Args: event: Сообщение для обработки Returns: Количество сообщений в текущей медиа группе """ if not event.media_group_id: return 0 if event.media_group_id not in self.album_data: self.album_data[event.media_group_id] = {"messages": []} self.album_data[event.media_group_id]["messages"].append(event) return len(self.album_data[event.media_group_id]["messages"]) async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any: """ Основная логика middleware. Собирает все сообщения медиагруппы и обрабатывает только последнее сообщение после завершения сбора всех сообщений. Args: handler: Обработчик события event: Событие (сообщение) data: Данные для передачи в обработчик Returns: Результат выполнения обработчика """ if not event.media_group_id: return await handler(event, data) media_group_id = event.media_group_id message_id = event.message_id if media_group_id not in self.album_data: self.album_data[media_group_id] = {"messages": []} self.album_data[media_group_id]["messages"].append(event) count_before = len(self.album_data[media_group_id]["messages"]) await asyncio.sleep(self.latency) count_after = len(self.album_data[media_group_id]["messages"]) if count_before != count_after: return album_messages = self.album_data[media_group_id]["messages"] album_messages.sort(key=lambda x: x.message_id) last_message_id = album_messages[-1].message_id if message_id != last_message_id: return data["album"] = album_messages del self.album_data[media_group_id] return await handler(event, data)