Files
telegram-helper-bot/helper_bot/middlewares/album_middleware.py
2026-02-01 23:03:23 +03:00

168 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from typing import Any, Dict, List, Optional, Union
from aiogram import BaseMiddleware
from aiogram.types import Message
class AlbumGetter:
"""Вспомогательный класс для получения полной медиагруппы из middleware"""
def __init__(
self, album_data: Dict[str, Any], media_group_id: str, event: asyncio.Event
):
self.album_data = album_data
self.media_group_id = media_group_id
self.event = event
async def get_album(self, timeout: float = 10.0) -> Optional[List[Message]]:
"""
Ждет полную медиагруппу и возвращает ее.
Args:
timeout: Максимальное время ожидания в секундах
Returns:
Список сообщений медиагруппы или None при таймауте
"""
try:
await asyncio.wait_for(self.event.wait(), timeout=timeout)
if self.media_group_id in self.album_data:
return self.album_data[self.media_group_id].get("collected_album")
return None
except asyncio.TimeoutError:
return None
class AlbumMiddleware(BaseMiddleware):
"""
Middleware для обработки медиа групп в Telegram.
Собирает все сообщения одной медиа группы и передает их как album в data.
Не блокирует handler - сразу вызывает его, а полную медиагруппу передает через Event.
"""
def __init__(self, latency: Union[int, float] = 5.0):
"""
Инициализация middleware.
Args:
latency: Задержка в секундах для сбора всех сообщений медиа группы
"""
super().__init__()
self.latency = latency
# Храним данные медиагруппы: messages, event для уведомления, task для сбора
self.album_data: Dict[str, Dict[str, Any]] = {}
def collect_album_messages(self, event: Message) -> int:
"""
Собирает сообщения одной медиа группы.
Args:
event: Сообщение для обработки
Returns:
Количество сообщений в текущей медиа группе
"""
if not event.media_group_id:
return 0
if event.media_group_id not in self.album_data:
self.album_data[event.media_group_id] = {"messages": []}
self.album_data[event.media_group_id]["messages"].append(event)
return len(self.album_data[event.media_group_id]["messages"])
async def _collect_album_background(self, media_group_id: str) -> None:
"""
Фоновая задача для сбора всех сообщений медиагруппы.
Args:
media_group_id: ID медиагруппы для сбора
"""
try:
await asyncio.sleep(self.latency)
if media_group_id not in self.album_data:
return
# Получаем текущий список сообщений
album_messages = self.album_data[media_group_id]["messages"].copy()
album_messages.sort(key=lambda x: x.message_id)
# Сохраняем собранную медиагруппу и уведомляем через Event
self.album_data[media_group_id]["collected_album"] = album_messages
self.album_data[media_group_id]["event"].set()
# Очищаем данные после небольшой задержки (чтобы handler успел получить album)
await asyncio.sleep(1.0)
if media_group_id in self.album_data:
task = self.album_data[media_group_id].get("task")
if task and not task.done():
task.cancel()
del self.album_data[media_group_id]
except Exception:
# В случае ошибки все равно уведомляем, чтобы handler не завис
if media_group_id in self.album_data:
self.album_data[media_group_id]["event"].set()
# Очищаем данные даже при ошибке
try:
task = self.album_data[media_group_id].get("task")
if task and not task.done():
task.cancel()
del self.album_data[media_group_id]
except Exception:
pass
async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any:
"""
Основная логика middleware.
Для медиагрупп: сразу вызывает handler, передавая Event для получения полной медиагруппы.
Для обычных сообщений: сразу вызывает handler.
Args:
handler: Обработчик события
event: Событие (сообщение)
data: Данные для передачи в обработчик
Returns:
Результат выполнения обработчика
"""
if not event.media_group_id:
return await handler(event, data)
media_group_id = event.media_group_id
message_id = event.message_id
# Если это первое сообщение медиагруппы - создаем структуру данных
is_first_message = False
if media_group_id not in self.album_data:
is_first_message = True
album_event = asyncio.Event()
self.album_data[media_group_id] = {
"messages": [],
"event": album_event,
"task": None,
"first_message_id": message_id,
}
# Запускаем фоновую задачу для сбора медиагруппы
task = asyncio.create_task(self._collect_album_background(media_group_id))
self.album_data[media_group_id]["task"] = task
# Добавляем сообщение в медиагруппу
self.album_data[media_group_id]["messages"].append(event)
# Обрабатываем только первое сообщение медиагруппы
if not is_first_message:
# Для остальных сообщений просто возвращаемся, не вызывая handler
return
# Передаем объект-геттер в data, чтобы handler мог получить полную медиагруппу
album_getter = AlbumGetter(
self.album_data, media_group_id, self.album_data[media_group_id]["event"]
)
data["album_getter"] = album_getter
# Сразу вызываем handler только для первого сообщения (не блокируем)
return await handler(event, data)