import asyncio from typing import Any, Dict, Union, List from aiogram import BaseMiddleware from aiogram.types import Message class AlbumMiddleware(BaseMiddleware): """ Middleware для обработки медиа групп в Telegram. Собирает все сообщения одной медиа группы и передает их как album в data. """ def __init__(self, latency: Union[int, float] = 0.01): """ Инициализация middleware. Args: latency: Задержка в секундах для сбора всех сообщений медиа группы """ super().__init__() self.latency = latency self.album_data: Dict[str, Dict[str, List[Message]]] = {} def collect_album_messages(self, event: Message) -> int: """ Собирает сообщения одной медиа группы. Args: event: Сообщение для обработки Returns: Количество сообщений в текущей медиа группе """ if not event.media_group_id: return 0 if event.media_group_id not in self.album_data: self.album_data[event.media_group_id] = {"messages": []} self.album_data[event.media_group_id]["messages"].append(event) return len(self.album_data[event.media_group_id]["messages"]) async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any: """ Основная логика middleware. Args: handler: Обработчик события event: Событие (сообщение) data: Данные для передачи в обработчик Returns: Результат выполнения обработчика """ # Если у события нет media_group_id, передаем его обработчику сразу if not event.media_group_id: return await handler(event, data) # Собираем сообщения одной медиа группы total_before = self.collect_album_messages(event) # Ждем указанный период для сбора всех сообщений await asyncio.sleep(self.latency) # Проверяем количество сообщений после задержки total_after = len(self.album_data[event.media_group_id]["messages"]) # Если за время задержки добавились новые сообщения, выходим if total_before != total_after: return # Сортируем сообщения по message_id и добавляем в data album_messages = self.album_data[event.media_group_id]["messages"] album_messages.sort(key=lambda x: x.message_id) data["album"] = album_messages # Удаляем медиа группу из отслеживания для освобождения памяти del self.album_data[event.media_group_id] # Вызываем оригинальный обработчик события return await handler(event, data)