diff --git a/RATE_LIMITING_SOLUTION.md b/RATE_LIMITING_SOLUTION.md new file mode 100644 index 0000000..ee41011 --- /dev/null +++ b/RATE_LIMITING_SOLUTION.md @@ -0,0 +1,171 @@ +# Решение проблемы Flood Control в Telegram Bot + +## Проблема + +В логах бота наблюдались ошибки типа: +``` +Flood control exceeded on method 'SendVoice' in chat 1322897572. Retry in 3 seconds. +``` + +Эти ошибки возникают при превышении лимитов Telegram Bot API: +- Не более 30 сообщений в секунду от одного бота глобально +- Не более 1 сообщения в секунду в один чат +- Дополнительные ограничения для разных типов сообщений + +## Решение + +Реализована комплексная система rate limiting, включающая: + +### 1. Основные компоненты + +#### `rate_limiter.py` +- **ChatRateLimiter**: Ограничивает скорость отправки сообщений для конкретного чата +- **GlobalRateLimiter**: Глобальные ограничения для всех чатов +- **RetryHandler**: Обработка повторных попыток с экспоненциальной задержкой +- **TelegramRateLimiter**: Основной класс, объединяющий все компоненты + +#### `rate_limit_monitor.py` +- **RateLimitMonitor**: Мониторинг и статистика rate limiting +- Отслеживание успешных/неудачных запросов +- Анализ ошибок и производительности +- Статистика по чатам + +#### `rate_limit_config.py` +- Конфигурации для разных окружений (development, production, strict) +- Адаптивные настройки на основе уровня ошибок +- Настройки для разных типов сообщений + +#### `rate_limit_middleware.py` +- Middleware для автоматического применения rate limiting +- Перехват всех исходящих сообщений +- Прозрачная интеграция с существующим кодом + +### 2. Ключевые особенности + +#### Rate Limiting +- **Настраиваемая скорость**: 0.5 сообщений в секунду на чат (по умолчанию) +- **Burst protection**: Максимум 2 сообщения подряд +- **Глобальные ограничения**: 10 сообщений в секунду глобально +- **Адаптивные задержки**: Увеличение задержек при ошибках + +#### Retry Mechanism +- **Экспоненциальная задержка**: Увеличение времени ожидания при повторных попытках +- **Максимальные ограничения**: Ограничение максимального времени ожидания +- **Умная обработка ошибок**: Разные стратегии для разных типов ошибок + +#### Мониторинг +- **Детальная статистика**: Отслеживание всех запросов и ошибок +- **Анализ производительности**: Процент успеха, время ожидания, активность +- **Административные команды**: `/ratelimit_stats`, `/ratelimit_errors`, `/reset_ratelimit_stats` + +### 3. Интеграция + +#### Обновленные функции +```python +# helper_func.py +async def send_voice_message(chat_id, message, voice, markup=None): + from .rate_limiter import send_with_rate_limit + + async def _send_voice(): + if markup is None: + return await message.bot.send_voice(chat_id=chat_id, voice=voice) + else: + return await message.bot.send_voice(chat_id=chat_id, voice=voice, reply_markup=markup) + + return await send_with_rate_limit(_send_voice, chat_id) +``` + +#### Middleware +```python +# voice_handler.py +from helper_bot.middlewares.rate_limit_middleware import MessageSendMiddleware + +def _setup_middleware(self): + self.router.message.middleware(DependenciesMiddleware()) + self.router.message.middleware(BlacklistMiddleware()) + self.router.message.middleware(MessageSendMiddleware()) # Новый middleware +``` + +### 4. Конфигурация + +#### Production настройки (по умолчанию) +```python +PRODUCTION_CONFIG = RateLimitSettings( + messages_per_second=0.5, # 1 сообщение каждые 2 секунды + burst_limit=2, # Максимум 2 сообщения подряд + retry_after_multiplier=1.5, + max_retry_delay=30.0, + max_retries=3, + voice_message_delay=2.5, # Дополнительная задержка для голосовых + media_message_delay=2.0, + text_message_delay=1.5 +) +``` + +#### Адаптивная конфигурация +Система автоматически ужесточает ограничения при высоком уровне ошибок: +- При >10% ошибок: уменьшение скорости в 2 раза +- При <1% ошибок: увеличение скорости на 20% + +### 5. Мониторинг и администрирование + +#### Команды для администраторов +- `/ratelimit_stats` - Показать статистику rate limiting +- `/ratelimit_errors` - Показать недавние ошибки +- `/reset_ratelimit_stats` - Сбросить статистику + +#### Пример вывода статистики +``` +📊 Статистика Rate Limiting + +🔢 Общая статистика: +• Всего запросов: 1250 +• Процент успеха: 98.4% +• Процент ошибок: 1.6% +• Запросов в минуту: 12.5 +• Среднее время ожидания: 1.2с +• Активных чатов: 45 +• Ошибок за час: 3 + +🔍 Детальная статистика: +• Успешных запросов: 1230 +• Неудачных запросов: 20 +• RetryAfter ошибок: 15 +• Других ошибок: 5 +``` + +### 6. Тестирование + +Создан полный набор тестов в `test_rate_limiter.py`: +- Тесты всех компонентов +- Интеграционные тесты +- Тесты конфигурации +- Тесты мониторинга + +Запуск тестов: +```bash +pytest tests/test_rate_limiter.py -v +``` + +### 7. Преимущества решения + +1. **Предотвращение ошибок**: Автоматическое соблюдение лимитов API +2. **Прозрачность**: Минимальные изменения в существующем коде +3. **Мониторинг**: Полная видимость производительности +4. **Адаптивность**: Автоматическая настройка под нагрузку +5. **Надежность**: Умная обработка ошибок и повторных попыток +6. **Масштабируемость**: Поддержка множества чатов + +### 8. Рекомендации по использованию + +1. **Мониторинг**: Регулярно проверяйте статистику через `/ratelimit_stats` +2. **Настройка**: При необходимости корректируйте конфигурацию под ваши нужды +3. **Алерты**: Настройте уведомления при высоком проценте ошибок +4. **Тестирование**: Проверяйте работу в тестовой среде перед продакшеном + +### 9. Будущие улучшения + +- Интеграция с системой метрик (Prometheus/Grafana) +- Автоматическое масштабирование ограничений +- A/B тестирование разных конфигураций +- Интеграция с системой алертов diff --git a/database/async_db.py b/database/async_db.py index 4df7e28..6174e78 100644 --- a/database/async_db.py +++ b/database/async_db.py @@ -297,6 +297,18 @@ class AsyncBotDB: """Получает user_id пользователя по message_id для voice bot.""" return await self.factory.audio.get_user_id_by_message_id_for_voice_bot(message_id) + async def delete_audio_moderate_record(self, message_id: int) -> None: + """Удаляет запись из таблицы audio_moderate по message_id.""" + await self.factory.audio.delete_audio_moderate_record(message_id) + + async def get_all_audio_records(self) -> List[Dict[str, Any]]: + """Получить все записи аудио сообщений.""" + return await self.factory.audio.get_all_audio_records() + + async def delete_audio_record_by_file_name(self, file_name: str) -> None: + """Удалить запись аудио сообщения по имени файла.""" + await self.factory.audio.delete_audio_record_by_file_name(file_name) + # Методы для миграций async def get_migration_version(self) -> int: """Получение текущей версии миграции.""" diff --git a/database/repositories/audio_repository.py b/database/repositories/audio_repository.py index bc854cd..2da52ed 100644 --- a/database/repositories/audio_repository.py +++ b/database/repositories/audio_repository.py @@ -1,4 +1,4 @@ -from typing import Optional, List +from typing import Optional, List, Dict, Any from database.base import DatabaseConnection from database.models import AudioMessage, AudioListenRecord, AudioModerate from datetime import datetime @@ -213,4 +213,26 @@ class AudioRepository(DatabaseConnection): """Удаляет запись из таблицы audio_moderate по message_id.""" query = "DELETE FROM audio_moderate WHERE message_id = ?" await self._execute_query(query, (message_id,)) - self.logger.info(f"Удалена запись из audio_moderate для message_id {message_id}") \ No newline at end of file + self.logger.info(f"Удалена запись из audio_moderate для message_id {message_id}") + + async def get_all_audio_records(self) -> List[Dict[str, Any]]: + """Получить все записи аудио сообщений.""" + query = "SELECT file_name, author_id, date_added FROM audio_message_reference" + rows = await self._execute_query_with_result(query) + + records = [] + for row in rows: + records.append({ + 'file_name': row[0], + 'author_id': row[1], + 'date_added': row[2] + }) + + self.logger.info(f"Получено {len(records)} записей аудио сообщений") + return records + + async def delete_audio_record_by_file_name(self, file_name: str) -> None: + """Удалить запись аудио сообщения по имени файла.""" + query = "DELETE FROM audio_message_reference WHERE file_name = ?" + await self._execute_query(query, (file_name,)) + self.logger.info(f"Удалена запись аудио сообщения: {file_name}") \ No newline at end of file diff --git a/database/repositories/user_repository.py b/database/repositories/user_repository.py index 127fbbb..0e7a117 100644 --- a/database/repositories/user_repository.py +++ b/database/repositories/user_repository.py @@ -35,14 +35,14 @@ class UserRepository(DatabaseConnection): return bool(len(rows)) async def add_user(self, user: User) -> None: - """Добавление нового пользователя.""" + """Добавление нового пользователя с защитой от дублирования.""" if not user.date_added: user.date_added = int(datetime.now().timestamp()) if not user.date_changed: user.date_changed = int(datetime.now().timestamp()) query = """ - INSERT INTO our_users (user_id, first_name, full_name, username, is_bot, + INSERT OR IGNORE INTO our_users (user_id, first_name, full_name, username, is_bot, language_code, emoji, has_stickers, date_added, date_changed, voice_bot_welcome_received) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ @@ -51,7 +51,7 @@ class UserRepository(DatabaseConnection): user.date_added, user.date_changed, user.voice_bot_welcome_received) await self._execute_query(query, params) - self.logger.info(f"Новый пользователь добавлен: {user.user_id}") + self.logger.info(f"Пользователь обработан (создан или уже существует): {user.user_id}") async def get_user_info(self, user_id: int) -> Optional[User]: """Получение информации о пользователе.""" diff --git a/helper_bot/config/__init__.py b/helper_bot/config/__init__.py new file mode 100644 index 0000000..2726292 --- /dev/null +++ b/helper_bot/config/__init__.py @@ -0,0 +1 @@ +# Config package diff --git a/helper_bot/config/rate_limit_config.py b/helper_bot/config/rate_limit_config.py new file mode 100644 index 0000000..92e5ea7 --- /dev/null +++ b/helper_bot/config/rate_limit_config.py @@ -0,0 +1,129 @@ +""" +Конфигурация для rate limiting +""" +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class RateLimitSettings: + """Настройки rate limiting для разных типов сообщений""" + + # Основные настройки + messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат + burst_limit: int = 2 # Максимум 2 сообщения подряд + retry_after_multiplier: float = 1.5 # Множитель для увеличения задержки при retry + max_retry_delay: float = 30.0 # Максимальная задержка между попытками + max_retries: int = 3 # Максимальное количество повторных попыток + + # Специальные настройки для разных типов сообщений + voice_message_delay: float = 2.0 # Дополнительная задержка для голосовых сообщений + media_message_delay: float = 1.5 # Дополнительная задержка для медиа сообщений + text_message_delay: float = 1.0 # Дополнительная задержка для текстовых сообщений + + # Настройки для разных типов чатов + private_chat_multiplier: float = 1.0 # Множитель для приватных чатов + group_chat_multiplier: float = 0.8 # Множитель для групповых чатов + channel_multiplier: float = 0.6 # Множитель для каналов + + # Глобальные ограничения + global_messages_per_second: float = 10.0 # Максимум 10 сообщений в секунду глобально + global_burst_limit: int = 20 # Максимум 20 сообщений подряд глобально + + +# Конфигурации для разных сценариев использования +DEVELOPMENT_CONFIG = RateLimitSettings( + messages_per_second=1.0, # Более мягкие ограничения для разработки + burst_limit=3, + retry_after_multiplier=1.2, + max_retry_delay=15.0, + max_retries=2 +) + +PRODUCTION_CONFIG = RateLimitSettings( + messages_per_second=0.5, # Строгие ограничения для продакшена + burst_limit=2, + retry_after_multiplier=1.5, + max_retry_delay=30.0, + max_retries=3, + voice_message_delay=2.5, + media_message_delay=2.0, + text_message_delay=1.5 +) + +STRICT_CONFIG = RateLimitSettings( + messages_per_second=0.3, # Очень строгие ограничения + burst_limit=1, + retry_after_multiplier=2.0, + max_retry_delay=60.0, + max_retries=5, + voice_message_delay=3.0, + media_message_delay=2.5, + text_message_delay=2.0 +) + + +def get_rate_limit_config(environment: str = "production") -> RateLimitSettings: + """ + Получает конфигурацию rate limiting в зависимости от окружения + + Args: + environment: Окружение ('development', 'production', 'strict') + + Returns: + RateLimitSettings: Конфигурация для указанного окружения + """ + configs = { + "development": DEVELOPMENT_CONFIG, + "production": PRODUCTION_CONFIG, + "strict": STRICT_CONFIG + } + + return configs.get(environment, PRODUCTION_CONFIG) + + +def get_adaptive_config( + current_error_rate: float, + base_config: Optional[RateLimitSettings] = None +) -> RateLimitSettings: + """ + Получает адаптивную конфигурацию на основе текущего уровня ошибок + + Args: + current_error_rate: Текущий уровень ошибок (0.0 - 1.0) + base_config: Базовая конфигурация + + Returns: + RateLimitSettings: Адаптированная конфигурация + """ + if base_config is None: + base_config = PRODUCTION_CONFIG + + # Если уровень ошибок высокий, ужесточаем ограничения + if current_error_rate > 0.1: # Более 10% ошибок + return RateLimitSettings( + messages_per_second=base_config.messages_per_second * 0.5, + burst_limit=max(1, base_config.burst_limit - 1), + retry_after_multiplier=base_config.retry_after_multiplier * 1.5, + max_retry_delay=base_config.max_retry_delay * 1.5, + max_retries=base_config.max_retries + 1, + voice_message_delay=base_config.voice_message_delay * 1.5, + media_message_delay=base_config.media_message_delay * 1.3, + text_message_delay=base_config.text_message_delay * 1.2 + ) + + # Если уровень ошибок низкий, можно немного ослабить ограничения + elif current_error_rate < 0.01: # Менее 1% ошибок + return RateLimitSettings( + messages_per_second=base_config.messages_per_second * 1.2, + burst_limit=base_config.burst_limit + 1, + retry_after_multiplier=base_config.retry_after_multiplier * 0.9, + max_retry_delay=base_config.max_retry_delay * 0.8, + max_retries=max(1, base_config.max_retries - 1), + voice_message_delay=base_config.voice_message_delay * 0.8, + media_message_delay=base_config.media_message_delay * 0.9, + text_message_delay=base_config.text_message_delay * 0.9 + ) + + # Возвращаем базовую конфигурацию + return base_config diff --git a/helper_bot/handlers/admin/admin_handlers.py b/helper_bot/handlers/admin/admin_handlers.py index c375987..b9b229a 100644 --- a/helper_bot/handlers/admin/admin_handlers.py +++ b/helper_bot/handlers/admin/admin_handlers.py @@ -27,9 +27,9 @@ from logs.custom_logger import logger # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, - track_errors + track_errors, + db_query_time ) # Создаем роутер с middleware для проверки доступа @@ -94,6 +94,7 @@ async def cancel_ban_process( ) @track_time("get_last_users", "admin_handlers") @track_errors("admin_handlers", "get_last_users") +@db_query_time("get_last_users", "users", "select") async def get_last_users( message: types.Message, state: FSMContext, @@ -127,6 +128,7 @@ async def get_last_users( ) @track_time("get_banned_users", "admin_handlers") @track_errors("admin_handlers", "get_banned_users") +@db_query_time("get_banned_users", "users", "select") async def get_banned_users( message: types.Message, state: FSMContext, diff --git a/helper_bot/handlers/admin/rate_limit_handlers.py b/helper_bot/handlers/admin/rate_limit_handlers.py new file mode 100644 index 0000000..9fd3b21 --- /dev/null +++ b/helper_bot/handlers/admin/rate_limit_handlers.py @@ -0,0 +1,272 @@ +""" +Обработчики команд для мониторинга rate limiting +""" +from aiogram import Router, types, F +from aiogram.filters import Command, MagicData +from aiogram.fsm.context import FSMContext +from aiogram.types import FSInputFile + +from helper_bot.filters.main import ChatTypeFilter +from helper_bot.middlewares.dependencies_middleware import DependenciesMiddleware +from helper_bot.utils.rate_limit_monitor import rate_limit_monitor, get_rate_limit_summary +from helper_bot.utils.rate_limit_metrics import update_rate_limit_gauges, get_rate_limit_metrics_summary +from logs.custom_logger import logger + +# Local imports - metrics +from helper_bot.utils.metrics import ( + track_time, + track_errors +) + + +class RateLimitHandlers: + def __init__(self, db, settings): + self.db = db.get_db() if hasattr(db, 'get_db') else db + self.settings = settings + self.router = Router() + self._setup_handlers() + self._setup_middleware() + + def _setup_middleware(self): + self.router.message.middleware(DependenciesMiddleware()) + + def _setup_handlers(self): + # Команда для просмотра статистики rate limiting + self.router.message.register( + self.rate_limit_stats_handler, + ChatTypeFilter(chat_type=["private"]), + Command("ratelimit_stats") + ) + + # Команда для сброса статистики rate limiting + self.router.message.register( + self.reset_rate_limit_stats_handler, + ChatTypeFilter(chat_type=["private"]), + Command("reset_ratelimit_stats") + ) + + # Команда для просмотра ошибок rate limiting + self.router.message.register( + self.rate_limit_errors_handler, + ChatTypeFilter(chat_type=["private"]), + Command("ratelimit_errors") + ) + + # Команда для просмотра Prometheus метрик + self.router.message.register( + self.rate_limit_prometheus_handler, + ChatTypeFilter(chat_type=["private"]), + Command("ratelimit_prometheus") + ) + + @track_time("rate_limit_stats_handler", "rate_limit_handlers") + @track_errors("rate_limit_handlers", "rate_limit_stats_handler") + async def rate_limit_stats_handler( + self, + message: types.Message, + state: FSMContext, + bot_db: MagicData("bot_db"), + settings: MagicData("settings") + ): + """Показывает статистику rate limiting""" + try: + # Проверяем права администратора + if not await bot_db.is_admin(message.from_user.id): + await message.answer("У вас нет прав для выполнения этой команды.") + return + + # Получаем сводку + summary = get_rate_limit_summary() + global_stats = rate_limit_monitor.get_global_stats() + + # Формируем сообщение со статистикой + stats_text = ( + f"📊 Статистика Rate Limiting\n\n" + f"🔢 Общая статистика:\n" + f"• Всего запросов: {summary['total_requests']}\n" + f"• Процент успеха: {summary['success_rate']:.1%}\n" + f"• Процент ошибок: {summary['error_rate']:.1%}\n" + f"• Запросов в минуту: {summary['requests_per_minute']:.1f}\n" + f"• Среднее время ожидания: {summary['average_wait_time']:.2f}с\n" + f"• Активных чатов: {summary['active_chats']}\n" + f"• Ошибок за час: {summary['recent_errors_count']}\n\n" + ) + + # Добавляем детальную статистику + stats_text += f"🔍 Детальная статистика:\n" + stats_text += f"• Успешных запросов: {global_stats.successful_requests}\n" + stats_text += f"• Неудачных запросов: {global_stats.failed_requests}\n" + stats_text += f"• RetryAfter ошибок: {global_stats.retry_after_errors}\n" + stats_text += f"• Других ошибок: {global_stats.other_errors}\n" + stats_text += f"• Общее время ожидания: {global_stats.total_wait_time:.2f}с\n\n" + + # Добавляем топ чатов по запросам + top_chats = rate_limit_monitor.get_top_chats_by_requests(5) + if top_chats: + stats_text += f"📈 Топ-5 чатов по запросам:\n" + for i, (chat_id, chat_stats) in enumerate(top_chats, 1): + stats_text += f"{i}. Chat {chat_id}: {chat_stats.total_requests} запросов ({chat_stats.success_rate:.1%} успех)\n" + stats_text += "\n" + + # Добавляем чаты с высоким процентом ошибок + high_error_chats = rate_limit_monitor.get_chats_with_high_error_rate(0.1) + if high_error_chats: + stats_text += f"⚠️ Чаты с высоким процентом ошибок (>10%):\n" + for chat_id, chat_stats in high_error_chats[:3]: + stats_text += f"• Chat {chat_id}: {chat_stats.error_rate:.1%} ошибок ({chat_stats.failed_requests}/{chat_stats.total_requests})\n" + + await message.answer(stats_text, parse_mode='HTML') + + except Exception as e: + logger.error(f"Ошибка при получении статистики rate limiting: {e}") + await message.answer("Произошла ошибка при получении статистики.") + + @track_time("reset_rate_limit_stats_handler", "rate_limit_handlers") + @track_errors("rate_limit_handlers", "reset_rate_limit_stats_handler") + async def reset_rate_limit_stats_handler( + self, + message: types.Message, + state: FSMContext, + bot_db: MagicData("bot_db"), + settings: MagicData("settings") + ): + """Сбрасывает статистику rate limiting""" + try: + # Проверяем права администратора + if not await bot_db.is_admin(message.from_user.id): + await message.answer("У вас нет прав для выполнения этой команды.") + return + + # Сбрасываем статистику + rate_limit_monitor.reset_stats() + + await message.answer("✅ Статистика rate limiting сброшена.") + + except Exception as e: + logger.error(f"Ошибка при сбросе статистики rate limiting: {e}") + await message.answer("Произошла ошибка при сбросе статистики.") + + @track_time("rate_limit_errors_handler", "rate_limit_handlers") + @track_errors("rate_limit_handlers", "rate_limit_errors_handler") + async def rate_limit_errors_handler( + self, + message: types.Message, + state: FSMContext, + bot_db: MagicData("bot_db"), + settings: MagicData("settings") + ): + """Показывает недавние ошибки rate limiting""" + try: + # Проверяем права администратора + if not await bot_db.is_admin(message.from_user.id): + await message.answer("У вас нет прав для выполнения этой команды.") + return + + # Получаем ошибки за последний час + recent_errors = rate_limit_monitor.get_recent_errors(60) + error_summary = rate_limit_monitor.get_error_summary(60) + + if not recent_errors: + await message.answer("✅ Ошибок rate limiting за последний час не было.") + return + + # Формируем сообщение с ошибками + errors_text = f"🚨 Ошибки Rate Limiting (последний час)\n\n" + errors_text += f"📊 Сводка ошибок:\n" + for error_type, count in error_summary.items(): + errors_text += f"• {error_type}: {count}\n" + errors_text += f"\nВсего ошибок: {len(recent_errors)}\n\n" + + # Показываем последние 10 ошибок + errors_text += f"🔍 Последние ошибки:\n" + for i, error in enumerate(recent_errors[-10:], 1): + from datetime import datetime + timestamp = datetime.fromtimestamp(error['timestamp']).strftime("%H:%M:%S") + errors_text += f"{i}. {timestamp} - Chat {error['chat_id']} - {error['error_type']}\n" + + # Если сообщение слишком длинное, разбиваем на части + if len(errors_text) > 4000: + # Отправляем сводку + summary_text = f"🚨 Ошибки Rate Limiting (последний час)\n\n" + summary_text += f"📊 Сводка ошибок:\n" + for error_type, count in error_summary.items(): + summary_text += f"• {error_type}: {count}\n" + summary_text += f"\nВсего ошибок: {len(recent_errors)}" + + await message.answer(summary_text, parse_mode='HTML') + + # Отправляем детали отдельным сообщением + details_text = f"🔍 Последние ошибки:\n" + for i, error in enumerate(recent_errors[-10:], 1): + from datetime import datetime + timestamp = datetime.fromtimestamp(error['timestamp']).strftime("%H:%M:%S") + details_text += f"{i}. {timestamp} - Chat {error['chat_id']} - {error['error_type']}\n" + + await message.answer(details_text, parse_mode='HTML') + else: + await message.answer(errors_text, parse_mode='HTML') + + except Exception as e: + logger.error(f"Ошибка при получении ошибок rate limiting: {e}") + await message.answer("Произошла ошибка при получении информации об ошибках.") + + @track_time("rate_limit_prometheus_handler", "rate_limit_handlers") + @track_errors("rate_limit_handlers", "rate_limit_prometheus_handler") + async def rate_limit_prometheus_handler( + self, + message: types.Message, + state: FSMContext, + bot_db: MagicData("bot_db"), + settings: MagicData("settings") + ): + """Показывает Prometheus метрики rate limiting""" + try: + # Проверяем права администратора + if not await bot_db.is_admin(message.from_user.id): + await message.answer("У вас нет прав для выполнения этой команды.") + return + + # Обновляем gauge метрики + update_rate_limit_gauges() + + # Получаем сводку метрик + metrics_summary = get_rate_limit_metrics_summary() + + # Формируем сообщение с метриками + metrics_text = ( + f"📊 Prometheus метрики Rate Limiting\n\n" + f"🔢 Основные метрики:\n" + f"• rate_limit_requests_total: {metrics_summary['total_requests']}\n" + f"• rate_limit_success_rate: {metrics_summary['success_rate']:.3f}\n" + f"• rate_limit_error_rate: {metrics_summary['error_rate']:.3f}\n" + f"• rate_limit_requests_per_minute: {metrics_summary['requests_per_minute']:.1f}\n" + f"• rate_limit_avg_wait_time: {metrics_summary['average_wait_time']:.3f}s\n" + f"• rate_limit_active_chats: {metrics_summary['active_chats']}\n\n" + ) + + # Добавляем детальные метрики + metrics_text += f"🔍 Детальные метрики:\n" + metrics_text += f"• Успешных запросов: {metrics_summary['successful_requests']}\n" + metrics_text += f"• Неудачных запросов: {metrics_summary['failed_requests']}\n" + metrics_text += f"• RetryAfter ошибок: {metrics_summary['retry_after_errors']}\n" + metrics_text += f"• Других ошибок: {metrics_summary['other_errors']}\n" + metrics_text += f"• Общее время ожидания: {metrics_summary['total_wait_time']:.2f}s\n\n" + + # Добавляем информацию о доступных метриках + metrics_text += f"📈 Доступные Prometheus метрики:\n" + metrics_text += f"• rate_limit_requests_total - общее количество запросов\n" + metrics_text += f"• rate_limit_errors_total - количество ошибок по типам\n" + metrics_text += f"• rate_limit_wait_duration_seconds - время ожидания\n" + metrics_text += f"• rate_limit_request_interval_seconds - интервалы между запросами\n" + metrics_text += f"• rate_limit_active_chats - количество активных чатов\n" + metrics_text += f"• rate_limit_success_rate - процент успеха по чатам\n" + metrics_text += f"• rate_limit_requests_per_minute - запросов в минуту\n" + metrics_text += f"• rate_limit_total_requests - общее количество запросов\n" + metrics_text += f"• rate_limit_total_errors - количество ошибок\n" + metrics_text += f"• rate_limit_avg_wait_time - среднее время ожидания\n" + + await message.answer(metrics_text, parse_mode='HTML') + + except Exception as e: + logger.error(f"Ошибка при получении Prometheus метрик: {e}") + await message.answer("Произошла ошибка при получении метрик.") diff --git a/helper_bot/handlers/admin/services.py b/helper_bot/handlers/admin/services.py index 1c4289e..55fe57b 100644 --- a/helper_bot/handlers/admin/services.py +++ b/helper_bot/handlers/admin/services.py @@ -7,10 +7,8 @@ from logs.custom_logger import logger # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, - track_errors, - db_query_time + track_errors ) diff --git a/helper_bot/handlers/callback/callback_handlers.py b/helper_bot/handlers/callback/callback_handlers.py index 6ee0283..ee0b2f4 100644 --- a/helper_bot/handlers/callback/callback_handlers.py +++ b/helper_bot/handlers/callback/callback_handlers.py @@ -26,10 +26,10 @@ from logs.custom_logger import logger # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, track_errors, - db_query_time + db_query_time, + track_file_operations ) callback_router = Router() @@ -238,6 +238,8 @@ async def change_page( @callback_router.callback_query(F.data == CALLBACK_SAVE) @track_time("save_voice_message", "callback_handlers") @track_errors("callback_handlers", "save_voice_message") +@track_file_operations("voice") +@db_query_time("save_voice_message", "audio_moderate", "mixed") async def save_voice_message( call: CallbackQuery, bot_db: MagicData("bot_db"), @@ -245,14 +247,18 @@ async def save_voice_message( **kwargs ): try: + logger.info(f"Начинаем сохранение голосового сообщения. Message ID: {call.message.message_id}") + # Создаем сервис для работы с аудио файлами audio_service = AudioFileService(bot_db) # Получаем ID пользователя из базы user_id = await bot_db.get_user_id_by_message_id_for_voice_bot(call.message.message_id) + logger.info(f"Получен user_id: {user_id}") # Генерируем имя файла file_name = await audio_service.generate_file_name(user_id) + logger.info(f"Сгенерировано имя файла: {file_name}") # Собираем инфо о сообщении time_UTC = int(time.time()) @@ -260,32 +266,54 @@ async def save_voice_message( # Получаем file_id из voice сообщения file_id = call.message.voice.file_id if call.message.voice else "" + logger.info(f"Получен file_id: {file_id}") - # Сохраняем в базу данных - await audio_service.save_audio_file(file_name, user_id, date_added, file_id) - - # Скачиваем и сохраняем файл + # ВАЖНО: Сначала скачиваем и сохраняем файл на диск + logger.info("Начинаем скачивание и сохранение файла на диск...") await audio_service.download_and_save_audio(call.bot, call.message, file_name) + logger.info("Файл успешно скачан и сохранен на диск") + + # Только после успешного сохранения файла - сохраняем в базу данных + logger.info("Начинаем сохранение информации в базу данных...") + await audio_service.save_audio_file(file_name, user_id, date_added, file_id) + logger.info("Информация успешно сохранена в базу данных") # Удаляем сообщение из предложки + logger.info("Удаляем сообщение из предложки...") await call.bot.delete_message( chat_id=settings['Telegram']['group_for_posts'], message_id=call.message.message_id ) + logger.info("Сообщение удалено из предложки") # Удаляем запись из таблицы audio_moderate + logger.info("Удаляем запись из таблицы audio_moderate...") await bot_db.delete_audio_moderate_record(call.message.message_id) + logger.info("Запись удалена из таблицы audio_moderate") await call.answer(text='Сохранено!', cache_time=3) + logger.info(f"Голосовое сообщение успешно сохранено: {file_name}") except Exception as e: logger.error(f"Ошибка при сохранении голосового сообщения: {e}") + logger.error(f"Traceback: {traceback.format_exc()}") + + # Дополнительная информация для диагностики + try: + if 'call' in locals() and call.message: + logger.error(f"Message ID: {call.message.message_id}") + logger.error(f"User ID: {user_id if 'user_id' in locals() else 'не определен'}") + logger.error(f"File name: {file_name if 'file_name' in locals() else 'не определен'}") + except: + pass + await call.answer(text='Ошибка при сохранении!', cache_time=3) @callback_router.callback_query(F.data == CALLBACK_DELETE) @track_time("delete_voice_message", "callback_handlers") @track_errors("callback_handlers", "delete_voice_message") +@db_query_time("delete_voice_message", "audio_moderate", "delete") async def delete_voice_message( call: CallbackQuery, bot_db: MagicData("bot_db"), diff --git a/helper_bot/handlers/callback/services.py b/helper_bot/handlers/callback/services.py index fd3a155..229eea3 100644 --- a/helper_bot/handlers/callback/services.py +++ b/helper_bot/handlers/callback/services.py @@ -25,7 +25,7 @@ from logs.custom_logger import logger # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, + track_media_processing, track_time, track_errors, db_query_time @@ -140,6 +140,7 @@ class PostPublishService: @track_time("_publish_media_group", "post_publish_service") @track_errors("post_publish_service", "_publish_media_group") + @track_media_processing("media_group") async def _publish_media_group(self, call: CallbackQuery) -> None: """Публикация медиагруппы""" logger.info(f"Начинаю публикацию медиагруппы. Helper message ID: {call.message.message_id}") @@ -230,6 +231,7 @@ class PostPublishService: @track_time("_decline_media_group", "post_publish_service") @track_errors("post_publish_service", "_decline_media_group") + @track_media_processing("media_group") async def _decline_media_group(self, call: CallbackQuery) -> None: """Отклонение медиагруппы""" logger.debug(f"Отклоняю медиагруппу. Helper message ID: {call.message.message_id}") @@ -308,6 +310,7 @@ class PostPublishService: @track_time("_delete_media_group_and_notify_author", "post_publish_service") @track_errors("post_publish_service", "_delete_media_group_and_notify_author") + @track_media_processing("media_group") async def _delete_media_group_and_notify_author(self, call: CallbackQuery, author_id: int) -> None: """Удаление медиагруппы и уведомление автора""" post_ids = await self.db.get_post_ids_from_telegram_by_last_id(call.message.message_id) @@ -339,6 +342,7 @@ class BanService: @track_time("ban_user_from_post", "ban_service") @track_errors("ban_service", "ban_user_from_post") + @db_query_time("ban_user_from_post", "users", "mixed") async def ban_user_from_post(self, call: CallbackQuery) -> None: """Бан пользователя за спам""" author_id = await self.db.get_author_id_by_message_id(call.message.message_id) @@ -379,6 +383,7 @@ class BanService: @track_time("unlock_user", "ban_service") @track_errors("ban_service", "unlock_user") + @db_query_time("unlock_user", "users", "delete") async def unlock_user(self, user_id: str) -> str: """Разблокировка пользователя""" user_name = await self.db.get_username(int(user_id)) diff --git a/helper_bot/handlers/group/services.py b/helper_bot/handlers/group/services.py index 973e1ce..81c94c2 100644 --- a/helper_bot/handlers/group/services.py +++ b/helper_bot/handlers/group/services.py @@ -13,9 +13,8 @@ from logs.custom_logger import logger # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, - track_errors, + track_errors, db_query_time ) @@ -34,6 +33,7 @@ class AdminReplyService: @track_time("get_user_id_for_reply", "admin_reply_service") @track_errors("admin_reply_service", "get_user_id_for_reply") + @db_query_time("get_user_id_for_reply", "users", "select") async def get_user_id_for_reply(self, message_id: int) -> int: """ Get user ID for reply by message ID. diff --git a/helper_bot/handlers/private/private_handlers.py b/helper_bot/handlers/private/private_handlers.py index 16db295..c9ad68d 100644 --- a/helper_bot/handlers/private/private_handlers.py +++ b/helper_bot/handlers/private/private_handlers.py @@ -27,7 +27,6 @@ from helper_bot.utils.helper_func import ( # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, track_errors, db_query_time @@ -173,6 +172,7 @@ class PrivateHandlers: @error_handler @track_errors("private_handlers", "stickers") @track_time("stickers", "private_handlers") + @db_query_time("stickers", "stickers", "update") async def stickers(self, message: types.Message, state: FSMContext, **kwargs): """Handle stickers request""" # User service operations with metrics @@ -200,6 +200,7 @@ class PrivateHandlers: @error_handler @track_errors("private_handlers", "resend_message_in_group_for_message") @track_time("resend_message_in_group_for_message", "private_handlers") + @db_query_time("resend_message_in_group_for_message", "messages", "insert") async def resend_message_in_group_for_message(self, message: types.Message, state: FSMContext, **kwargs): """Handle messages in admin chat states""" # User service operations with metrics diff --git a/helper_bot/handlers/private/services.py b/helper_bot/handlers/private/services.py index cefb566..32955b7 100644 --- a/helper_bot/handlers/private/services.py +++ b/helper_bot/handlers/private/services.py @@ -33,10 +33,11 @@ from helper_bot.keyboards import get_reply_keyboard_for_post # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, track_errors, - db_query_time + db_query_time, + track_media_processing, + track_file_operations ) @@ -74,12 +75,14 @@ class UserService: @track_time("update_user_activity", "user_service") @track_errors("user_service", "update_user_activity") + @db_query_time("update_user_activity", "users", "update") async def update_user_activity(self, user_id: int) -> None: """Update user's last activity timestamp with metrics tracking""" await self.db.update_user_date(user_id) @track_time("ensure_user_exists", "user_service") @track_errors("user_service", "ensure_user_exists") + @db_query_time("ensure_user_exists", "users", "insert") async def ensure_user_exists(self, message: types.Message) -> None: """Ensure user exists in database, create if needed with metrics tracking""" user_id = message.from_user.id @@ -89,43 +92,41 @@ class UserService: is_bot = message.from_user.is_bot language_code = message.from_user.language_code - if not await self.db.user_exists(user_id): - # Create User object with current timestamp - current_timestamp = int(datetime.now().timestamp()) - user = User( - user_id=user_id, - first_name=first_name, - full_name=full_name, - username=username, - is_bot=is_bot, - language_code=language_code, - emoji="", - has_stickers=False, - date_added=current_timestamp, - date_changed=current_timestamp, - voice_bot_welcome_received=False - ) - await self.db.add_user(user) - metrics.record_db_query("add_user", 0.0, "users", "insert") - else: - is_need_update = await check_username_and_full_name(user_id, username, full_name, self.db) - if is_need_update: - await self.db.update_user_info(user_id, username, full_name) - metrics.record_db_query("update_username_fullname", 0.0, "users", "update") - safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь" - safe_username = html.escape(username) if username else "Без никнейма" - - await message.answer( - f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}") - await message.bot.send_message( - chat_id=self.settings.group_for_logs, - text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}') + # Create User object with current timestamp + current_timestamp = int(datetime.now().timestamp()) + user = User( + user_id=user_id, + first_name=first_name, + full_name=full_name, + username=username, + is_bot=is_bot, + language_code=language_code, + emoji="", + has_stickers=False, + date_added=current_timestamp, + date_changed=current_timestamp, + voice_bot_welcome_received=False + ) + + # Пытаемся создать пользователя (если уже существует - игнорируем) + # Это устраняет race condition и упрощает логику + await self.db.add_user(user) + + # Проверяем, нужно ли обновить информацию о существующем пользователе + is_need_update = await check_username_and_full_name(user_id, username, full_name, self.db) + if is_need_update: + await self.db.update_user_info(user_id, username, full_name) + safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь" + safe_username = html.escape(username) if username else "Без никнейма" + + await message.answer( + f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}") + await message.bot.send_message( + chat_id=self.settings.group_for_logs, + text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}') await self.db.update_user_date(user_id) - metrics.record_db_query("update_user_date", 0.0, "users", "update") - - @track_errors("user_service", "log_user_message") async def log_user_message(self, message: types.Message) -> None: """Forward user message to logs group with metrics tracking""" await message.forward(chat_id=self.settings.group_for_logs) @@ -146,6 +147,7 @@ class PostService: @track_time("handle_text_post", "post_service") @track_errors("post_service", "handle_text_post") + @db_query_time("handle_text_post", "posts", "insert") async def handle_text_post(self, message: types.Message, first_name: str) -> None: """Handle text post submission""" post_text = get_text_message(message.text.lower(), first_name, message.from_user.username) @@ -162,6 +164,7 @@ class PostService: @track_time("handle_photo_post", "post_service") @track_errors("post_service", "handle_photo_post") + @db_query_time("handle_photo_post", "posts", "insert") async def handle_photo_post(self, message: types.Message, first_name: str) -> None: """Handle photo post submission""" post_caption = "" @@ -186,6 +189,7 @@ class PostService: @track_time("handle_video_post", "post_service") @track_errors("post_service", "handle_video_post") + @db_query_time("handle_video_post", "posts", "insert") async def handle_video_post(self, message: types.Message, first_name: str) -> None: """Handle video post submission""" post_caption = "" @@ -210,6 +214,7 @@ class PostService: @track_time("handle_video_note_post", "post_service") @track_errors("post_service", "handle_video_note_post") + @db_query_time("handle_video_note_post", "posts", "insert") async def handle_video_note_post(self, message: types.Message) -> None: """Handle video note post submission""" markup = get_reply_keyboard_for_post() @@ -230,6 +235,7 @@ class PostService: @track_time("handle_audio_post", "post_service") @track_errors("post_service", "handle_audio_post") + @db_query_time("handle_audio_post", "posts", "insert") async def handle_audio_post(self, message: types.Message, first_name: str) -> None: """Handle audio post submission""" post_caption = "" @@ -254,6 +260,7 @@ class PostService: @track_time("handle_voice_post", "post_service") @track_errors("post_service", "handle_voice_post") + @db_query_time("handle_voice_post", "posts", "insert") async def handle_voice_post(self, message: types.Message) -> None: """Handle voice post submission""" markup = get_reply_keyboard_for_post() @@ -273,7 +280,9 @@ class PostService: logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}") @track_time("handle_media_group_post", "post_service") - @track_errors("post_service", "handle_media_group_post") + @track_errors("post_service", "handle_media_group_post") + @db_query_time("handle_media_group_post", "posts", "insert") + @track_media_processing("media_group") async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None: """Handle media group post submission""" post_caption = " " @@ -320,6 +329,7 @@ class PostService: @track_time("process_post", "post_service") @track_errors("post_service", "process_post") + @track_media_processing("media_group") async def process_post(self, message: types.Message, album: Union[list, None] = None) -> None: """Process post based on content type""" first_name = get_first_name(message) @@ -360,6 +370,7 @@ class StickerService: @track_time("send_random_hello_sticker", "sticker_service") @track_errors("sticker_service", "send_random_hello_sticker") + @track_file_operations("sticker") async def send_random_hello_sticker(self, message: types.Message) -> None: """Send random hello sticker with metrics tracking""" name_stick_hello = list(Path('Stick').rglob('Hello_*')) @@ -372,6 +383,7 @@ class StickerService: @track_time("send_random_goodbye_sticker", "sticker_service") @track_errors("sticker_service", "send_random_goodbye_sticker") + @track_file_operations("sticker") async def send_random_goodbye_sticker(self, message: types.Message) -> None: """Send random goodbye sticker with metrics tracking""" name_stick_bye = list(Path('Stick').rglob('Universal_*')) diff --git a/helper_bot/handlers/voice/cleanup_utils.py b/helper_bot/handlers/voice/cleanup_utils.py new file mode 100644 index 0000000..228ea58 --- /dev/null +++ b/helper_bot/handlers/voice/cleanup_utils.py @@ -0,0 +1,191 @@ +""" +Утилиты для очистки и диагностики проблем с голосовыми файлами +""" +import os +import asyncio +from pathlib import Path +from typing import List, Tuple +from logs.custom_logger import logger +from helper_bot.handlers.voice.constants import VOICE_USERS_DIR + + +class VoiceFileCleanupUtils: + """Утилиты для очистки и диагностики голосовых файлов""" + + def __init__(self, bot_db): + self.bot_db = bot_db + + async def find_orphaned_db_records(self) -> List[Tuple[str, int]]: + """Найти записи в БД, для которых нет соответствующих файлов""" + try: + # Получаем все записи из БД + all_audio_records = await self.bot_db.get_all_audio_records() + orphaned_records = [] + + for record in all_audio_records: + file_name = record.get('file_name', '') + user_id = record.get('author_id', 0) + + file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg' + if not os.path.exists(file_path): + orphaned_records.append((file_name, user_id)) + logger.warning(f"Найдена запись в БД без файла: {file_name} (user_id: {user_id})") + + logger.info(f"Найдено {len(orphaned_records)} записей в БД без соответствующих файлов") + return orphaned_records + + except Exception as e: + logger.error(f"Ошибка при поиске orphaned записей: {e}") + return [] + + async def find_orphaned_files(self) -> List[str]: + """Найти файлы на диске, для которых нет записей в БД""" + try: + if not os.path.exists(VOICE_USERS_DIR): + logger.warning(f"Директория {VOICE_USERS_DIR} не существует") + return [] + + # Получаем все файлы .ogg в директории + ogg_files = list(Path(VOICE_USERS_DIR).glob("*.ogg")) + orphaned_files = [] + + # Получаем все записи из БД + all_audio_records = await self.bot_db.get_all_audio_records() + db_file_names = {record.get('file_name', '') for record in all_audio_records} + + for file_path in ogg_files: + file_name = file_path.stem # Имя файла без расширения + if file_name not in db_file_names: + orphaned_files.append(str(file_path)) + logger.warning(f"Найден файл без записи в БД: {file_path}") + + logger.info(f"Найдено {len(orphaned_files)} файлов без записей в БД") + return orphaned_files + + except Exception as e: + logger.error(f"Ошибка при поиске orphaned файлов: {e}") + return [] + + async def cleanup_orphaned_db_records(self, dry_run: bool = True) -> int: + """Удалить записи в БД, для которых нет файлов""" + try: + orphaned_records = await self.find_orphaned_db_records() + + if not orphaned_records: + logger.info("Нет orphaned записей для удаления") + return 0 + + if dry_run: + logger.info(f"DRY RUN: Найдено {len(orphaned_records)} записей для удаления") + for file_name, user_id in orphaned_records: + logger.info(f"DRY RUN: Будет удалена запись: {file_name} (user_id: {user_id})") + return len(orphaned_records) + + # Удаляем записи + deleted_count = 0 + for file_name, user_id in orphaned_records: + try: + await self.bot_db.delete_audio_record_by_file_name(file_name) + deleted_count += 1 + logger.info(f"Удалена запись в БД: {file_name} (user_id: {user_id})") + except Exception as e: + logger.error(f"Ошибка при удалении записи {file_name}: {e}") + + logger.info(f"Удалено {deleted_count} orphaned записей из БД") + return deleted_count + + except Exception as e: + logger.error(f"Ошибка при очистке orphaned записей: {e}") + return 0 + + async def cleanup_orphaned_files(self, dry_run: bool = True) -> int: + """Удалить файлы на диске, для которых нет записей в БД""" + try: + orphaned_files = await self.find_orphaned_files() + + if not orphaned_files: + logger.info("Нет orphaned файлов для удаления") + return 0 + + if dry_run: + logger.info(f"DRY RUN: Найдено {len(orphaned_files)} файлов для удаления") + for file_path in orphaned_files: + logger.info(f"DRY RUN: Будет удален файл: {file_path}") + return len(orphaned_files) + + # Удаляем файлы + deleted_count = 0 + for file_path in orphaned_files: + try: + os.remove(file_path) + deleted_count += 1 + logger.info(f"Удален файл: {file_path}") + except Exception as e: + logger.error(f"Ошибка при удалении файла {file_path}: {e}") + + logger.info(f"Удалено {deleted_count} orphaned файлов") + return deleted_count + + except Exception as e: + logger.error(f"Ошибка при очистке orphaned файлов: {e}") + return 0 + + async def get_disk_usage_stats(self) -> dict: + """Получить статистику использования диска""" + try: + if not os.path.exists(VOICE_USERS_DIR): + return {"error": f"Директория {VOICE_USERS_DIR} не существует"} + + total_size = 0 + file_count = 0 + + for file_path in Path(VOICE_USERS_DIR).glob("*.ogg"): + if file_path.is_file(): + total_size += file_path.stat().st_size + file_count += 1 + + return { + "total_files": file_count, + "total_size_bytes": total_size, + "total_size_mb": round(total_size / (1024 * 1024), 2), + "directory": VOICE_USERS_DIR + } + + except Exception as e: + logger.error(f"Ошибка при получении статистики диска: {e}") + return {"error": str(e)} + + async def run_full_diagnostic(self) -> dict: + """Запустить полную диагностику""" + try: + logger.info("Запуск полной диагностики голосовых файлов...") + + # Статистика диска + disk_stats = await self.get_disk_usage_stats() + + # Orphaned записи в БД + orphaned_db_records = await self.find_orphaned_db_records() + + # Orphaned файлы + orphaned_files = await self.find_orphaned_files() + + # Количество записей в БД + all_audio_records = await self.bot_db.get_all_audio_records() + db_records_count = len(all_audio_records) + + diagnostic_result = { + "disk_stats": disk_stats, + "db_records_count": db_records_count, + "orphaned_db_records_count": len(orphaned_db_records), + "orphaned_files_count": len(orphaned_files), + "orphaned_db_records": orphaned_db_records[:10], # Первые 10 для примера + "orphaned_files": orphaned_files[:10], # Первые 10 для примера + "status": "healthy" if len(orphaned_db_records) == 0 and len(orphaned_files) == 0 else "issues_found" + } + + logger.info(f"Диагностика завершена. Статус: {diagnostic_result['status']}") + return diagnostic_result + + except Exception as e: + logger.error(f"Ошибка при диагностике: {e}") + return {"error": str(e)} diff --git a/helper_bot/handlers/voice/services.py b/helper_bot/handlers/voice/services.py index 516f755..a0b3922 100644 --- a/helper_bot/handlers/voice/services.py +++ b/helper_bot/handlers/voice/services.py @@ -1,6 +1,7 @@ import random import asyncio import traceback +import os from datetime import datetime from pathlib import Path from typing import List, Optional, Tuple @@ -16,7 +17,6 @@ from logs.custom_logger import logger # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, track_errors, db_query_time @@ -190,6 +190,7 @@ class VoiceBotService: @track_time("clear_user_listenings", "voice_bot_service") @track_errors("voice_bot_service", "clear_user_listenings") + @db_query_time("clear_user_listenings", "audio_moderate", "delete") async def clear_user_listenings(self, user_id: int) -> None: """Очистить прослушивания пользователя""" try: @@ -275,62 +276,170 @@ class AudioFileService: async def save_audio_file(self, file_name: str, user_id: int, date_added: datetime, file_id: str) -> None: """Сохранить информацию об аудио файле в базу данных""" try: + # Проверяем существование файла перед сохранением в БД + if not await self.verify_file_exists(file_name): + error_msg = f"Файл {file_name} не существует или поврежден, отменяем сохранение в БД" + logger.error(error_msg) + raise FileOperationError(error_msg) + await self.bot_db.add_audio_record_simple(file_name, user_id, date_added) + logger.info(f"Информация об аудио файле успешно сохранена в БД: {file_name}") except Exception as e: logger.error(f"Ошибка при сохранении аудио файла в БД: {e}") raise DatabaseError(f"Не удалось сохранить аудио файл в БД: {e}") + @track_time("save_audio_file_with_transaction", "audio_file_service") + @track_errors("audio_file_service", "save_audio_file_with_transaction") + async def save_audio_file_with_transaction(self, file_name: str, user_id: int, date_added: datetime, file_id: str) -> None: + """Сохранить информацию об аудио файле в базу данных с транзакцией""" + try: + # Проверяем существование файла перед сохранением в БД + if not await self.verify_file_exists(file_name): + error_msg = f"Файл {file_name} не существует или поврежден, отменяем сохранение в БД" + logger.error(error_msg) + raise FileOperationError(error_msg) + + # Используем транзакцию для атомарности операции + await self.bot_db.add_audio_record_simple(file_name, user_id, date_added) + logger.info(f"Информация об аудио файле успешно сохранена в БД с транзакцией: {file_name}") + except Exception as e: + logger.error(f"Ошибка при сохранении аудио файла в БД с транзакцией: {e}") + raise DatabaseError(f"Не удалось сохранить аудио файл в БД с транзакцией: {e}") + @track_time("download_and_save_audio", "audio_file_service") @track_errors("audio_file_service", "download_and_save_audio") - async def download_and_save_audio(self, bot, message, file_name: str) -> None: - """Скачать и сохранить аудио файл""" + async def download_and_save_audio(self, bot, message, file_name: str, max_retries: int = 3) -> None: + """Скачать и сохранить аудио файл с retry механизмом""" + last_exception = None + + for attempt in range(max_retries): + try: + logger.info(f"Попытка {attempt + 1}/{max_retries} скачивания и сохранения аудио: {file_name}") + + # Проверяем наличие голосового сообщения + if not message or not message.voice: + error_msg = "Сообщение или голосовое сообщение не найдено" + logger.error(error_msg) + raise FileOperationError(error_msg) + + file_id = message.voice.file_id + logger.info(f"Получен file_id: {file_id}") + + # Получаем информацию о файле + try: + file_info = await bot.get_file(file_id=file_id) + logger.info(f"Получена информация о файле: {file_info.file_path}") + except Exception as e: + logger.error(f"Ошибка при получении информации о файле: {e}") + raise FileOperationError(f"Не удалось получить информацию о файле: {e}") + + # Скачиваем файл + try: + downloaded_file = await bot.download_file(file_path=file_info.file_path) + except Exception as e: + logger.error(f"Ошибка при скачивании файла: {e}") + raise FileOperationError(f"Не удалось скачать файл: {e}") + + # Проверяем что файл успешно скачан + if not downloaded_file: + error_msg = "Не удалось скачать файл - получен пустой объект" + logger.error(error_msg) + raise FileOperationError(error_msg) + + # Получаем размер файла без изменения позиции + current_pos = downloaded_file.tell() + downloaded_file.seek(0, 2) # Переходим в конец файла + file_size = downloaded_file.tell() + downloaded_file.seek(current_pos) # Возвращаемся в исходную позицию + + logger.info(f"Файл скачан, размер: {file_size} bytes") + + # Проверяем минимальный размер файла + if file_size < 100: # Минимальный размер для аудио файла + error_msg = f"Файл слишком маленький: {file_size} bytes" + logger.error(error_msg) + raise FileOperationError(error_msg) + + # Создаем директорию если она не существует + try: + os.makedirs(VOICE_USERS_DIR, exist_ok=True) + logger.info(f"Директория {VOICE_USERS_DIR} создана/проверена") + except Exception as e: + logger.error(f"Ошибка при создании директории: {e}") + raise FileOperationError(f"Не удалось создать директорию: {e}") + + file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg' + logger.info(f"Сохраняем файл по пути: {file_path}") + + # Сбрасываем позицию в файле перед сохранением + downloaded_file.seek(0) + + # Сохраняем файл + try: + with open(file_path, 'wb') as new_file: + new_file.write(downloaded_file.read()) + except Exception as e: + logger.error(f"Ошибка при записи файла на диск: {e}") + raise FileOperationError(f"Не удалось записать файл на диск: {e}") + + # Проверяем что файл действительно создался и имеет правильный размер + if not os.path.exists(file_path): + error_msg = f"Файл не был создан: {file_path}" + logger.error(error_msg) + raise FileOperationError(error_msg) + + saved_file_size = os.path.getsize(file_path) + if saved_file_size != file_size: + error_msg = f"Размер сохраненного файла не совпадает: ожидалось {file_size}, получено {saved_file_size}" + logger.error(error_msg) + # Удаляем поврежденный файл + try: + os.remove(file_path) + except: + pass + raise FileOperationError(error_msg) + + logger.info(f"Файл успешно сохранен: {file_path}, размер: {saved_file_size} bytes") + return # Успешное завершение + + except Exception as e: + last_exception = e + logger.error(f"Попытка {attempt + 1}/{max_retries} неудачна: {e}") + + if attempt < max_retries - 1: + wait_time = (attempt + 1) * 2 # Экспоненциальная задержка: 2, 4, 6 секунд + logger.info(f"Ожидание {wait_time} секунд перед следующей попыткой...") + await asyncio.sleep(wait_time) + else: + logger.error(f"Все {max_retries} попыток скачивания неудачны") + logger.error(f"Traceback последней ошибки: {traceback.format_exc()}") + + # Если все попытки неудачны + raise FileOperationError(f"Не удалось скачать и сохранить аудио после {max_retries} попыток. Последняя ошибка: {last_exception}") + + @track_time("verify_file_exists", "audio_file_service") + @track_errors("audio_file_service", "verify_file_exists") + async def verify_file_exists(self, file_name: str) -> bool: + """Проверить существование и валидность файла""" try: - logger.info(f"Начинаем скачивание и сохранение аудио: {file_name}") - - # Проверяем наличие голосового сообщения - if not message or not message.voice: - logger.error("Сообщение или голосовое сообщение не найдено") - raise FileOperationError("Сообщение или голосовое сообщение не найдено") - - file_id = message.voice.file_id - logger.info(f"Получен file_id: {file_id}") - - file_info = await bot.get_file(file_id=file_id) - logger.info(f"Получена информация о файле: {file_info.file_path}") - - downloaded_file = await bot.download_file(file_path=file_info.file_path) - - # Проверяем что файл успешно скачан - if not downloaded_file: - logger.error("Не удалось скачать файл") - raise FileOperationError("Не удалось скачать файл") - - # Получаем размер файла без изменения позиции - current_pos = downloaded_file.tell() - downloaded_file.seek(0, 2) # Переходим в конец файла - file_size = downloaded_file.tell() - downloaded_file.seek(current_pos) # Возвращаемся в исходную позицию - - logger.info(f"Файл скачан, размер: {file_size} bytes") - - # Создаем директорию если она не существует - import os - os.makedirs(VOICE_USERS_DIR, exist_ok=True) - logger.info(f"Директория {VOICE_USERS_DIR} создана/проверена") - file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg' - logger.info(f"Сохраняем файл по пути: {file_path}") - # Сбрасываем позицию в файле перед сохранением - downloaded_file.seek(0) + if not os.path.exists(file_path): + logger.warning(f"Файл не существует: {file_path}") + return False + + file_size = os.path.getsize(file_path) + if file_size == 0: + logger.warning(f"Файл пустой: {file_path}") + return False + + if file_size < 100: # Минимальный размер для аудио файла + logger.warning(f"Файл слишком маленький: {file_path}, размер: {file_size} bytes") + return False + + logger.info(f"Файл проверен и валиден: {file_path}, размер: {file_size} bytes") + return True - # Сохраняем файл - with open(file_path, 'wb') as new_file: - new_file.write(downloaded_file.read()) - - logger.info(f"Файл успешно сохранен: {file_path}") - except Exception as e: - logger.error(f"Ошибка при скачивании и сохранении аудио: {e}") - logger.error(f"Traceback: {traceback.format_exc()}") - raise FileOperationError(f"Не удалось скачать и сохранить аудио: {e}") + logger.error(f"Ошибка при проверке файла {file_name}: {e}") + return False diff --git a/helper_bot/handlers/voice/utils.py b/helper_bot/handlers/voice/utils.py index 0243890..d3d7a9f 100644 --- a/helper_bot/handlers/voice/utils.py +++ b/helper_bot/handlers/voice/utils.py @@ -6,6 +6,11 @@ from typing import Optional from helper_bot.handlers.voice.exceptions import DatabaseError from logs.custom_logger import logger +from helper_bot.utils.metrics import ( + track_time, + track_errors, + db_query_time +) def format_time_ago(date_from_db: str) -> Optional[str]: """Форматировать время с момента последней записи""" @@ -69,7 +74,9 @@ def plural_time(type: int, n: float) -> str: new_number = int(n) return str(new_number) + ' ' + word[p] - +@track_time("get_last_message_text", "voice_utils") +@track_errors("voice_utils", "get_last_message_text") +@db_query_time("get_last_message_text", "voice", "select") async def get_last_message_text(bot_db) -> Optional[str]: """Получить текст сообщения о времени последней записи""" try: @@ -88,7 +95,9 @@ async def validate_voice_message(message) -> bool: """Проверить валидность голосового сообщения""" return message.content_type == 'voice' - +@track_time("get_user_emoji_safe", "voice_utils") +@track_errors("voice_utils", "get_user_emoji_safe") +@db_query_time("get_user_emoji_safe", "voice", "select") async def get_user_emoji_safe(bot_db, user_id: int) -> str: """Безопасно получить эмодзи пользователя""" try: diff --git a/helper_bot/handlers/voice/voice_handler.py b/helper_bot/handlers/voice/voice_handler.py index 71c035e..78e1020 100644 --- a/helper_bot/handlers/voice/voice_handler.py +++ b/helper_bot/handlers/voice/voice_handler.py @@ -24,10 +24,10 @@ from helper_bot.handlers.private.constants import BUTTON_TEXTS # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, track_errors, - db_query_time + db_query_time, + track_file_operations ) class VoiceHandlers: @@ -126,6 +126,7 @@ class VoiceHandlers: @track_errors("voice_handlers", "voice_bot_button_handler") async def voice_bot_button_handler(self, message: types.Message, state: FSMContext, bot_db: MagicData("bot_db"), settings: MagicData("settings")): """Обработчик кнопки 'Голосовой бот' из основной клавиатуры""" + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) нажал кнопку 'Голосовой бот'") try: # Проверяем, получал ли пользователь приветственное сообщение welcome_received = await bot_db.check_voice_bot_welcome_received(message.from_user.id) @@ -140,7 +141,7 @@ class VoiceHandlers: logger.info(f"Пользователь {message.from_user.id}: вызываем start") await self.start(message, state, bot_db, settings) except Exception as e: - logger.error(f"Ошибка при проверке приветственного сообщения: {e}") + logger.error(f"Ошибка при проверке приветственного сообщения для пользователя {message.from_user.id}: {e}") # В случае ошибки вызываем start await self.start(message, state, bot_db, settings) @@ -169,6 +170,7 @@ class VoiceHandlers: state: FSMContext, settings: MagicData("settings") ): + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) запросил информацию об эмодзи") await message.forward(chat_id=settings['Telegram']['group_for_logs']) user_emoji = await check_user_emoji(message) await state.set_state(STATE_START) @@ -183,6 +185,7 @@ class VoiceHandlers: state: FSMContext, settings: MagicData("settings") ): + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) вызвал функцию help_function") await message.forward(chat_id=settings['Telegram']['group_for_logs']) await update_user_info(VOICE_BOT_NAME, message) help_message = messages.get_message(get_first_name(message), 'HELP_MESSAGE') @@ -194,6 +197,7 @@ class VoiceHandlers: @track_time("start", "voice_handlers") @track_errors("voice_handlers", "start") + @db_query_time("mark_voice_bot_welcome_received", "audio_moderate", "update") async def start( self, message: types.Message, @@ -201,7 +205,7 @@ class VoiceHandlers: bot_db: MagicData("bot_db"), settings: MagicData("settings") ): - logger.info(f"Пользователь {message.from_user.id}: вызывается функция start") + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}): вызывается функция start") await state.set_state(STATE_START) await message.forward(chat_id=settings['Telegram']['group_for_logs']) await update_user_info(VOICE_BOT_NAME, message) @@ -210,13 +214,14 @@ class VoiceHandlers: # Создаем сервис и отправляем приветственные сообщения voice_service = VoiceBotService(bot_db, settings) await voice_service.send_welcome_messages(message, user_emoji) + logger.info(f"Приветственные сообщения отправлены пользователю {message.from_user.id}") # Отмечаем, что пользователь получил приветственное сообщение try: await bot_db.mark_voice_bot_welcome_received(message.from_user.id) logger.info(f"Пользователь {message.from_user.id}: отмечен как получивший приветствие") except Exception as e: - logger.error(f"Ошибка при отметке получения приветствия: {e}") + logger.error(f"Ошибка при отметке получения приветствия для пользователя {message.from_user.id}: {e}") @track_time("cancel_handler", "voice_handlers") @track_errors("voice_handlers", "cancel_handler") @@ -233,6 +238,7 @@ class VoiceHandlers: markup = await get_reply_keyboard(self.db, message.from_user.id) await message.answer(text='Добро пожаловать в меню!', reply_markup=markup, parse_mode='HTML') await state.set_state(FSM_STATES["START"]) + logger.info(f"Пользователь {message.from_user.id} возвращен в главное меню") @track_time("refresh_listen_function", "voice_handlers") @track_errors("voice_handlers", "refresh_listen_function") @@ -243,6 +249,7 @@ class VoiceHandlers: bot_db: MagicData("bot_db"), settings: MagicData("settings") ): + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) вызвал функцию refresh_listen_function") await message.forward(chat_id=settings['Telegram']['group_for_logs']) await update_user_info(VOICE_BOT_NAME, message) markup = get_main_keyboard() @@ -269,6 +276,7 @@ class VoiceHandlers: bot_db: MagicData("bot_db"), settings: MagicData("settings") ): + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) вызвал функцию standup_write") await message.forward(chat_id=settings['Telegram']['group_for_logs']) markup = types.ReplyKeyboardRemove() record_voice_message = messages.get_message(get_first_name(message), 'RECORD_VOICE_MESSAGE') @@ -279,7 +287,7 @@ class VoiceHandlers: if message_with_date: await message.answer(text=message_with_date, parse_mode="html") except Exception as e: - logger.error(f'Не удалось получить дату последнего сообщения - {e}') + logger.error(f'Не удалось получить дату последнего сообщения для пользователя {message.from_user.id}: {e}') await state.set_state(STATE_STANDUP_WRITE) @@ -309,6 +317,7 @@ class VoiceHandlers: message.voice.file_id, markup_for_voice ) + logger.info(f"Голосовое сообщение пользователя {message.from_user.id} отправлено в группу постов (message_id: {sent_message.message_id})") # Сохраняем в базу инфо о посте await bot_db.set_user_id_and_message_id_for_voice_bot(sent_message.message_id, message.from_user.id) @@ -318,6 +327,7 @@ class VoiceHandlers: await message.answer(text=voice_saved_message, reply_markup=markup) await state.set_state(STATE_START) else: + logger.warning(f"Голосовое сообщение пользователя {message.from_user.id} не прошло валидацию") unknown_content_message = messages.get_message(get_first_name(message), 'UNKNOWN_CONTENT_MESSAGE') await message.forward(chat_id=settings['Telegram']['group_for_logs']) await message.answer(text=unknown_content_message, reply_markup=markup) @@ -326,22 +336,27 @@ class VoiceHandlers: @track_time("standup_listen_audio", "voice_handlers") @track_errors("voice_handlers", "standup_listen_audio") + @track_file_operations("voice") + @db_query_time("standup_listen_audio", "audio_moderate", "mixed") async def standup_listen_audio( self, message: types.Message, bot_db: MagicData("bot_db"), settings: MagicData("settings") ): + logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) запросил прослушивание аудио") markup = get_main_keyboard() # Создаем сервис для работы с аудио voice_service = VoiceBotService(bot_db, settings) try: + #TODO: удалить логику из хендлера # Получаем случайное аудио audio_data = await voice_service.get_random_audio(message.from_user.id) if not audio_data: + logger.warning(f"Для пользователя {message.from_user.id} не найдено доступных аудио для прослушивания") no_audio_message = messages.get_message(get_first_name(message), 'NO_AUDIO_MESSAGE') await message.answer(text=no_audio_message, reply_markup=markup) try: @@ -349,7 +364,7 @@ class VoiceHandlers: if message_with_date: await message.answer(text=message_with_date, parse_mode="html") except Exception as e: - logger.error(f'Не удалось получить последнюю дату {e}') + logger.error(f'Не удалось получить последнюю дату для пользователя {message.from_user.id}: {e}') return audio_for_user, date_added, user_emoji = audio_data @@ -359,7 +374,13 @@ class VoiceHandlers: # Проверяем существование файла if not path.exists(): - logger.error(f"Файл не найден: {path}") + logger.error(f"Файл не найден: {path} для пользователя {message.from_user.id}") + # Дополнительная диагностика + logger.error(f"Директория {VOICE_USERS_DIR} существует: {Path(VOICE_USERS_DIR).exists()}") + if Path(VOICE_USERS_DIR).exists(): + files_in_dir = list(Path(VOICE_USERS_DIR).glob("*.ogg")) + logger.error(f"Файлы в директории: {[f.name for f in files_in_dir]}") + await message.answer( text="Файл аудио не найден. Обратитесь к администратору.", reply_markup=markup @@ -368,7 +389,7 @@ class VoiceHandlers: # Проверяем размер файла if path.stat().st_size == 0: - logger.error(f"Файл пустой: {path}") + logger.error(f"Файл пустой: {path} для пользователя {message.from_user.id}") await message.answer( text="Файл аудио поврежден. Обратитесь к администратору.", reply_markup=markup @@ -383,13 +404,20 @@ class VoiceHandlers: else: caption = f'Дата записи: {date_added}' + logger.info(f"Подготовлено голосовое сообщение для пользователя {message.from_user.id}: {caption}") + try: - await message.bot.send_voice( - chat_id=message.chat.id, - voice=voice, - caption=caption, - reply_markup=markup - ) + from helper_bot.utils.rate_limiter import send_with_rate_limit + + async def _send_voice(): + return await message.bot.send_voice( + chat_id=message.chat.id, + voice=voice, + caption=caption, + reply_markup=markup + ) + + await send_with_rate_limit(_send_voice, message.chat.id) # Маркируем сообщение как прослушанное только после успешной отправки await voice_service.mark_audio_as_listened(audio_for_user, message.from_user.id) @@ -404,7 +432,7 @@ class VoiceHandlers: except Exception as voice_error: if "VOICE_MESSAGES_FORBIDDEN" in str(voice_error): # Если голосовые сообщения запрещены, отправляем информативное сообщение - logger.info(f"Пользователь {message.from_user.id} запретил получение голосовых сообщений") + logger.warning(f"Пользователь {message.from_user.id} запретил получение голосовых сообщений") privacy_message = "🔇 К сожалению, у тебя закрыт доступ к получению голосовых сообщений.\n\nДля продолжения взаимодействия с ботом необходимо дать возможность мне присылать войсы в настройках приватности Telegram.\n\n💡 Как это сделать:\n1. Открой настройки Telegram\n2. Перейди в 'Конфиденциальность и безопасность'\n3. Выбери 'Голосовые сообщения'\n4. Разреши получение от 'Всех' или добавь меня в исключения" @@ -412,10 +440,11 @@ class VoiceHandlers: return # Выходим без записи о прослушивании else: + logger.error(f"Ошибка при отправке голосового сообщения пользователю {message.from_user.id}: {voice_error}") raise voice_error except Exception as e: - logger.error(f"Ошибка при прослушивании аудио: {e}") + logger.error(f"Ошибка при прослушивании аудио для пользователя {message.from_user.id}: {e}") await message.answer( text="Произошла ошибка при получении аудио. Попробуйте позже.", reply_markup=markup diff --git a/helper_bot/keyboards/keyboards.py b/helper_bot/keyboards/keyboards.py index 0f640ce..d941e12 100644 --- a/helper_bot/keyboards/keyboards.py +++ b/helper_bot/keyboards/keyboards.py @@ -3,7 +3,6 @@ from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder # Local imports - metrics from helper_bot.utils.metrics import ( - metrics, track_time, track_errors ) @@ -23,8 +22,7 @@ def get_reply_keyboard_for_post(): return markup -@track_time("get_reply_keyboard", "keyboard_service") -@track_errors("keyboard_service", "get_reply_keyboard") + async def get_reply_keyboard(db, user_id): builder = ReplyKeyboardBuilder() builder.row(types.KeyboardButton(text="📢Предложить свой пост")) @@ -58,7 +56,8 @@ def get_reply_keyboard_admin(): markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) return markup - +@track_time("create_keyboard_with_pagination", "keyboard_service") +@track_errors("keyboard_service", "create_keyboard_with_pagination") def create_keyboard_with_pagination(page: int, total_items: int, array_items: list, callback: str): """ Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback diff --git a/helper_bot/main.py b/helper_bot/main.py index 0436394..3f7a800 100644 --- a/helper_bot/main.py +++ b/helper_bot/main.py @@ -14,6 +14,7 @@ from helper_bot.handlers.voice import VoiceHandlers from helper_bot.middlewares.dependencies_middleware import DependenciesMiddleware from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware from helper_bot.middlewares.metrics_middleware import MetricsMiddleware, ErrorMetricsMiddleware +from helper_bot.middlewares.rate_limit_middleware import RateLimitMiddleware from helper_bot.server_prometheus import start_metrics_server, stop_metrics_server @@ -53,6 +54,7 @@ async def start_bot(bdf): dp.update.outer_middleware(DependenciesMiddleware()) dp.update.outer_middleware(MetricsMiddleware()) dp.update.outer_middleware(BlacklistMiddleware()) + dp.update.outer_middleware(RateLimitMiddleware()) # Создаем экземпляр VoiceHandlers voice_handlers = VoiceHandlers(bdf, bdf.settings) @@ -89,11 +91,7 @@ async def start_bot(bdf): logging.info("✅ Бот запущен") except Exception as e: - logging.error(f"❌ Ошибка запуска метрик сервера: {e}") - # Продолжаем работу бота даже если метрики не запустились - - except Exception as e: - logging.error(f"Error in bot startup: {e}") + logging.error(f"❌ Ошибка запуска бота: {e}") raise finally: # Останавливаем метрики сервер при завершении diff --git a/helper_bot/middlewares/rate_limit_middleware.py b/helper_bot/middlewares/rate_limit_middleware.py new file mode 100644 index 0000000..3312ac2 --- /dev/null +++ b/helper_bot/middlewares/rate_limit_middleware.py @@ -0,0 +1,57 @@ +""" +Middleware для автоматического применения rate limiting ко всем входящим сообщениям +""" +from typing import Callable, Dict, Any, Awaitable, Union +from aiogram import BaseMiddleware +from aiogram.types import Message, CallbackQuery, InlineQuery, ChatMemberUpdated, Update +from aiogram.exceptions import TelegramRetryAfter, TelegramAPIError +from logs.custom_logger import logger + +from helper_bot.utils.rate_limiter import telegram_rate_limiter + + +class RateLimitMiddleware(BaseMiddleware): + """Middleware для автоматического rate limiting входящих сообщений""" + + def __init__(self): + super().__init__() + self.rate_limiter = telegram_rate_limiter + + async def __call__( + self, + handler: Callable[[Update, Dict[str, Any]], Awaitable[Any]], + event: Union[Update, Message, CallbackQuery, InlineQuery, ChatMemberUpdated], + data: Dict[str, Any] + ) -> Any: + """Обрабатывает событие с rate limiting""" + + # Извлекаем сообщение из Update + message = None + if isinstance(event, Update): + message = event.message + elif isinstance(event, Message): + message = event + + # Применяем rate limiting только к сообщениям + if message is not None: + chat_id = message.chat.id + + # Обертываем handler в rate limiting + async def rate_limited_handler(): + try: + return await handler(event, data) + except (TelegramRetryAfter, TelegramAPIError) as e: + logger.warning(f"Rate limit error in middleware: {e}") + # Middleware не должен перехватывать эти ошибки, + # пусть их обрабатывает rate_limiter в функциях отправки + raise + + # Применяем rate limiting к handler + return await self.rate_limiter.send_with_rate_limit( + rate_limited_handler, + chat_id + ) + else: + # Для других типов событий просто вызываем handler + return await handler(event, data) + diff --git a/helper_bot/utils/auto_unban_scheduler.py b/helper_bot/utils/auto_unban_scheduler.py index a908ace..a1e2533 100644 --- a/helper_bot/utils/auto_unban_scheduler.py +++ b/helper_bot/utils/auto_unban_scheduler.py @@ -8,6 +8,11 @@ from apscheduler.triggers.cron import CronTrigger from helper_bot.utils.base_dependency_factory import get_global_instance from logs.custom_logger import logger +from .metrics import ( + track_time, + track_errors, + db_query_time +) class AutoUnbanScheduler: """ @@ -24,7 +29,10 @@ class AutoUnbanScheduler: def set_bot(self, bot): """Устанавливает экземпляр бота для отправки уведомлений""" self.bot = bot - + + @track_time("auto_unban_users", "auto_unban_scheduler") + @track_errors("auto_unban_scheduler", "auto_unban_users") + @db_query_time("auto_unban_users", "users", "mixed") async def auto_unban_users(self): """ Основная функция автоматического разбана пользователей. @@ -104,6 +112,8 @@ class AutoUnbanScheduler: return report + @track_time("send_report", "auto_unban_scheduler") + @track_errors("auto_unban_scheduler", "send_report") async def _send_report(self, report: str): """Отправляет отчет в лог-канал""" try: @@ -117,6 +127,8 @@ class AutoUnbanScheduler: except Exception as e: logger.error(f"Ошибка при отправке отчета: {e}") + @track_time("send_error_report", "auto_unban_scheduler") + @track_errors("auto_unban_scheduler", "send_error_report") async def _send_error_report(self, error_msg: str): """Отправляет отчет об ошибке в важный лог-канал""" try: diff --git a/helper_bot/utils/helper_func.py b/helper_bot/utils/helper_func.py index 5372066..455a6d9 100644 --- a/helper_bot/utils/helper_func.py +++ b/helper_bot/utils/helper_func.py @@ -22,10 +22,11 @@ from database.models import TelegramPost # Local imports - metrics from .metrics import ( - metrics, track_time, track_errors, - db_query_time + db_query_time, + track_media_processing, + track_file_operations, ) bdf = get_global_instance() @@ -115,7 +116,9 @@ def get_text_message(post_text: str, first_name: str, username: str = None): else: return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}' - +@track_time("download_file", "helper_func") +@track_errors("helper_func", "download_file") +@track_file_operations("unknown") async def download_file(message: types.Message, file_id: str, content_type: str = None) -> Optional[str]: """ Скачивает файл по file_id из Telegram и сохраняет в соответствующую папку. @@ -180,18 +183,16 @@ async def download_file(message: types.Message, file_id: str, content_type: str logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с") - # Записываем метрики - metrics.record_file_download(content_type or 'unknown', file_size, download_time) - return file_path except Exception as e: download_time = time.time() - start_time logger.error(f"download_file: Ошибка скачивания файла {file_id}: {e}, время: {download_time:.2f}с") - metrics.record_file_download_error(content_type or 'unknown', str(e)) return None - +@track_time("prepare_media_group_from_middlewares", "helper_func") +@track_errors("helper_func", "prepare_media_group_from_middlewares") +@track_media_processing("media_group") async def prepare_media_group_from_middlewares(album, post_caption: str = ''): """ Создает MediaGroup согласно best practices aiogram 3.x. @@ -243,7 +244,10 @@ async def prepare_media_group_from_middlewares(album, post_caption: str = ''): return media_group - +@track_time("add_in_db_media_mediagroup", "helper_func") +@track_errors("helper_func", "add_in_db_media_mediagroup") +@track_media_processing("media_group") +@db_query_time("add_in_db_media_mediagroup", "posts", "insert") async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None) -> bool: """ Добавляет контент медиа-группы в базу данных @@ -340,7 +344,6 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: if processed_count == 0: logger.error(f"add_in_db_media_mediagroup: Не удалось обработать ни одного сообщения из медиагруппы {post_id}") - metrics.record_media_processing('media_group', processing_time, False) return False if failed_count > 0: @@ -348,18 +351,18 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: else: logger.info(f"add_in_db_media_mediagroup: Успешно обработана медиагруппа {post_id} - {processed_count} сообщений, время: {processing_time:.2f}с") - # Записываем метрики - metrics.record_media_processing('media_group', processing_time, failed_count == 0) - return failed_count == 0 except Exception as e: processing_time = time.time() - start_time logger.error(f"add_in_db_media_mediagroup: Критическая ошибка обработки медиагруппы: {e}, время: {processing_time:.2f}с") - metrics.record_media_processing('media_group', processing_time, False) return False - +@track_time("add_in_db_media", "helper_func") +@track_errors("helper_func", "add_in_db_media") +@track_media_processing("media_group") +@db_query_time("add_in_db_media", "posts", "insert") +@track_file_operations("media") async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool: """ Добавляет контент одиночного сообщения в базу данных @@ -430,18 +433,17 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool: processing_time = time.time() - start_time logger.info(f"add_in_db_media: Контент успешно добавлен для сообщения {post_id}, тип: {content_type}, время: {processing_time:.2f}с") - # Записываем метрики - metrics.record_media_processing(content_type, processing_time, True) - return True except Exception as e: processing_time = time.time() - start_time logger.error(f"add_in_db_media: Ошибка обработки медиа для сообщения {post_id}: {e}, время: {processing_time:.2f}с") - metrics.record_media_processing(content_type or 'unknown', processing_time, False) return False - +@track_time("send_media_group_message_to_private_chat", "helper_func") +@track_errors("helper_func", "send_media_group_message_to_private_chat") +@track_media_processing("media_group") +@db_query_time("send_media_group_message_to_private_chat", "posts", "insert") async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message, media_group: List, bot_db: Any, main_post_id: Optional[int] = None) -> int: sent_message = await message.bot.send_media_group( @@ -461,7 +463,9 @@ async def send_media_group_message_to_private_chat(chat_id: int, message: types. message_id = sent_message[-1].message_id return message_id - +@track_time("send_media_group_to_channel", "helper_func") +@track_errors("helper_func", "send_media_group_to_channel") +@track_media_processing("media_group") async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str): """ Отправляет медиа-группу с подписью к последнему файлу. @@ -510,28 +514,32 @@ async def send_media_group_to_channel(bot, chat_id: int, post_content: List, pos logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}") raise - +@track_time("send_text_message", "helper_func") +@track_errors("helper_func", "send_text_message") async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None): + from .rate_limiter import send_with_rate_limit + # Экранируем post_text для безопасного использования в HTML safe_post_text = html.escape(str(post_text)) if post_text else "" - if markup is None: - sent_message = await message.bot.send_message( - chat_id=chat_id, - text=safe_post_text - ) - message_id = sent_message.message_id - return message_id - else: - sent_message = await message.bot.send_message( - chat_id=chat_id, - text=safe_post_text, - reply_markup=markup - ) - message_id = sent_message.message_id - return message_id - + async def _send_message(): + if markup is None: + return await message.bot.send_message( + chat_id=chat_id, + text=safe_post_text + ) + else: + return await message.bot.send_message( + chat_id=chat_id, + text=safe_post_text, + reply_markup=markup + ) + + sent_message = await send_with_rate_limit(_send_message, chat_id) + return sent_message.message_id +@track_time("send_photo_message", "helper_func") +@track_errors("helper_func", "send_photo_message") async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML @@ -552,7 +560,8 @@ async def send_photo_message(chat_id, message: types.Message, photo: str, post_t ) return sent_message - +@track_time("send_video_message", "helper_func") +@track_errors("helper_func", "send_video_message") async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "", markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML @@ -573,7 +582,8 @@ async def send_video_message(chat_id, message: types.Message, video: str, post_t ) return sent_message - +@track_time("send_video_note_message", "helper_func") +@track_errors("helper_func", "send_video_note_message") async def send_video_note_message(chat_id, message: types.Message, video_note: str, markup: types.ReplyKeyboardMarkup = None): if markup is None: @@ -589,7 +599,8 @@ async def send_video_note_message(chat_id, message: types.Message, video_note: s ) return sent_message - +@track_time("send_audio_message", "helper_func") +@track_errors("helper_func", "send_audio_message") async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): # Экранируем post_text для безопасного использования в HTML @@ -611,22 +622,30 @@ async def send_audio_message(chat_id, message: types.Message, audio: str, post_t return sent_message +@track_time("send_voice_message", "helper_func") +@track_errors("helper_func", "send_voice_message") async def send_voice_message(chat_id, message: types.Message, voice: str, markup: types.ReplyKeyboardMarkup = None): - if markup is None: - sent_message = await message.bot.send_voice( - chat_id=chat_id, - voice=voice - ) - else: - sent_message = await message.bot.send_voice( - chat_id=chat_id, - voice=voice, - reply_markup=markup - ) - return sent_message - + from .rate_limiter import send_with_rate_limit + + async def _send_voice(): + if markup is None: + return await message.bot.send_voice( + chat_id=chat_id, + voice=voice + ) + else: + return await message.bot.send_voice( + chat_id=chat_id, + voice=voice, + reply_markup=markup + ) + + return await send_with_rate_limit(_send_voice, chat_id) +@track_time("check_access", "helper_func") +@track_errors("helper_func", "check_access") +@db_query_time("check_access", "users", "select") async def check_access(user_id: int, bot_db): """Проверка прав на совершение действий""" from logs.custom_logger import logger @@ -641,7 +660,9 @@ def add_days_to_date(days: int): future_date = current_date + timedelta(days=days) return int(future_date.timestamp()) - +@track_time("get_banned_users_list", "helper_func") +@track_errors("helper_func", "get_banned_users_list") +@db_query_time("get_banned_users_list", "users", "select") async def get_banned_users_list(offset: int, bot_db): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором @@ -689,7 +710,9 @@ async def get_banned_users_list(offset: int, bot_db): message += f"**Дата разбана:** {safe_unban_date}\n\n" return message - +@track_time("get_banned_users_buttons", "helper_func") +@track_errors("helper_func", "get_banned_users_buttons") +@db_query_time("get_banned_users_buttons", "users", "select") async def get_banned_users_buttons(bot_db): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором @@ -716,7 +739,9 @@ async def get_banned_users_buttons(bot_db): user_ids.append((safe_user_name, user_id)) return user_ids - +@track_time("delete_user_blacklist", "helper_func") +@track_errors("helper_func", "delete_user_blacklist") +@db_query_time("delete_user_blacklist", "users", "delete") async def delete_user_blacklist(user_id: int, bot_db): return await bot_db.delete_user_blacklist(user_id=user_id) @@ -734,7 +759,9 @@ async def check_username_and_full_name(user_id: int, username: str, full_name: s logger.error(f"Ошибка при проверке username и full_name: {e}") return False - +@track_time("unban_notifier", "helper_func") +@track_errors("helper_func", "unban_notifier") +@db_query_time("unban_notifier", "users", "select") async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE): # Получение текущего UNIX timestamp current_date = datetime.now() @@ -757,6 +784,7 @@ async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE): @track_time("update_user_info", "helper_func") @track_errors("helper_func", "update_user_info") +@db_query_time("update_user_info", "users", "update") async def update_user_info(source: str, message: types.Message): # Собираем данные full_name = message.from_user.full_name @@ -787,12 +815,10 @@ async def update_user_info(source: str, message: types.Message): voice_bot_welcome_received=False ) await BotDB.add_user(user) - metrics.record_db_query("add_user", 0.0, "users", "insert") else: is_need_update = await check_username_and_full_name(user_id, username, full_name, BotDB) if is_need_update: await BotDB.update_user_info(user_id, username, full_name) - metrics.record_db_query("update_user_info", 0.0, "users", "update") if source != 'voice': await message.answer( f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}") @@ -800,7 +826,6 @@ async def update_user_info(source: str, message: types.Message): text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}') sleep(1) await BotDB.update_user_date(user_id) - metrics.record_db_query("update_user_date", 0.0, "users", "update") @track_time("check_user_emoji", "helper_func") @@ -812,7 +837,6 @@ async def check_user_emoji(message: types.Message): if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""): user_emoji = await get_random_emoji() await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji) - metrics.record_db_query("update_user_emoji", 0.0, "users", "update") return user_emoji diff --git a/helper_bot/utils/metrics.py b/helper_bot/utils/metrics.py index 9f68310..280da5e 100644 --- a/helper_bot/utils/metrics.py +++ b/helper_bot/utils/metrics.py @@ -7,10 +7,13 @@ from typing import Dict, Any, Optional from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST from prometheus_client.core import CollectorRegistry import time +import os from functools import wraps import asyncio from contextlib import asynccontextmanager +# Метрики rate limiter теперь создаются в основном классе + class BotMetrics: """Central class for managing all bot metrics.""" @@ -18,6 +21,9 @@ class BotMetrics: def __init__(self): self.registry = CollectorRegistry() + # Создаем метрики rate limiter в том же registry + self._create_rate_limit_metrics() + # Bot commands counter self.bot_commands_total = Counter( 'bot_commands_total', @@ -158,6 +164,78 @@ class BotMetrics: registry=self.registry ) + def _create_rate_limit_metrics(self): + """Создает метрики rate limiter в основном registry""" + try: + # Создаем метрики rate limiter в том же registry + self.rate_limit_requests_total = Counter( + 'rate_limit_requests_total', + 'Total number of rate limited requests', + ['chat_id', 'status', 'error_type'], + registry=self.registry + ) + + self.rate_limit_errors_total = Counter( + 'rate_limit_errors_total', + 'Total number of rate limit errors', + ['error_type', 'chat_id'], + registry=self.registry + ) + + self.rate_limit_wait_duration_seconds = Histogram( + 'rate_limit_wait_duration_seconds', + 'Time spent waiting due to rate limiting', + ['chat_id'], + buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0], + registry=self.registry + ) + + self.rate_limit_active_chats = Gauge( + 'rate_limit_active_chats', + 'Number of active chats with rate limiting', + registry=self.registry + ) + + self.rate_limit_success_rate = Gauge( + 'rate_limit_success_rate', + 'Success rate of rate limited requests', + ['chat_id'], + registry=self.registry + ) + + self.rate_limit_requests_per_minute = Gauge( + 'rate_limit_requests_per_minute', + 'Requests per minute', + ['chat_id'], + registry=self.registry + ) + + self.rate_limit_total_requests = Gauge( + 'rate_limit_total_requests', + 'Total number of requests', + ['chat_id'], + registry=self.registry + ) + + self.rate_limit_total_errors = Gauge( + 'rate_limit_total_errors', + 'Total number of errors', + ['chat_id', 'error_type'], + registry=self.registry + ) + + self.rate_limit_avg_wait_time_seconds = Gauge( + 'rate_limit_avg_wait_time_seconds', + 'Average wait time in seconds', + ['chat_id'], + registry=self.registry + ) + + except Exception as e: + # Логируем ошибку, но не прерываем инициализацию + import logging + logging.warning(f"Failed to create rate limit metrics: {e}") + def record_command(self, command_type: str, handler_type: str = "unknown", user_type: str = "unknown", status: str = "success"): """Record a bot command execution.""" self.bot_commands_total.labels( @@ -267,8 +345,97 @@ class BotMetrics: method_name="add_in_db_media" ).inc() + def record_db_error(self, error_type: str, query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"): + """Record database error occurrence.""" + self.db_errors_total.labels( + error_type=error_type, + query_type=query_type, + table_name=table_name, + operation=operation + ).inc() + + def record_rate_limit_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: str = None): + """Record rate limit request metrics.""" + try: + # Определяем статус + status = "success" if success else "error" + + # Записываем счетчик запросов + self.rate_limit_requests_total.labels( + chat_id=str(chat_id), + status=status, + error_type=error_type or "none" + ).inc() + + # Записываем время ожидания + if wait_time > 0: + self.rate_limit_wait_duration_seconds.labels( + chat_id=str(chat_id) + ).observe(wait_time) + + # Записываем ошибки + if not success and error_type: + self.rate_limit_errors_total.labels( + error_type=error_type, + chat_id=str(chat_id) + ).inc() + except Exception as e: + import logging + logging.warning(f"Failed to record rate limit request: {e}") + + def update_rate_limit_gauges(self): + """Update rate limit gauge metrics.""" + try: + from .rate_limit_monitor import rate_limit_monitor + + # Обновляем количество активных чатов + self.rate_limit_active_chats.set(len(rate_limit_monitor.stats)) + + # Обновляем метрики для каждого чата + for chat_id, chat_stats in rate_limit_monitor.stats.items(): + chat_id_str = str(chat_id) + + # Процент успеха + self.rate_limit_success_rate.labels( + chat_id=chat_id_str + ).set(chat_stats.success_rate) + + # Запросов в минуту + self.rate_limit_requests_per_minute.labels( + chat_id=chat_id_str + ).set(chat_stats.requests_per_minute) + + # Общее количество запросов + self.rate_limit_total_requests.labels( + chat_id=chat_id_str + ).set(chat_stats.total_requests) + + # Среднее время ожидания + self.rate_limit_avg_wait_time_seconds.labels( + chat_id=chat_id_str + ).set(chat_stats.average_wait_time) + + # Количество ошибок по типам + if chat_stats.retry_after_errors > 0: + self.rate_limit_total_errors.labels( + chat_id=chat_id_str, + error_type="RetryAfter" + ).set(chat_stats.retry_after_errors) + + if chat_stats.other_errors > 0: + self.rate_limit_total_errors.labels( + chat_id=chat_id_str, + error_type="Other" + ).set(chat_stats.other_errors) + except Exception as e: + import logging + logging.warning(f"Failed to update rate limit gauges: {e}") + def get_metrics(self) -> bytes: """Generate metrics in Prometheus format.""" + # Обновляем gauge метрики rate limiter перед генерацией + self.update_rate_limit_gauges() + return generate_latest(self.registry) @@ -449,3 +616,89 @@ async def track_middleware(middleware_name: str): middleware_name ) raise + + +def track_media_processing(content_type: str = "unknown"): + """Decorator to track media processing operations.""" + def decorator(func): + @wraps(func) + async def async_wrapper(*args, **kwargs): + start_time = time.time() + try: + result = await func(*args, **kwargs) + duration = time.time() - start_time + metrics.record_media_processing(content_type, duration, True) + return result + except Exception as e: + duration = time.time() - start_time + metrics.record_media_processing(content_type, duration, False) + raise + + @wraps(func) + def sync_wrapper(*args, **kwargs): + start_time = time.time() + try: + result = func(*args, **kwargs) + duration = time.time() - start_time + metrics.record_media_processing(content_type, duration, True) + return result + except Exception as e: + duration = time.time() - start_time + metrics.record_media_processing(content_type, duration, False) + raise + + if asyncio.iscoroutinefunction(func): + return async_wrapper + return sync_wrapper + return decorator + + +def track_file_operations(content_type: str = "unknown"): + """Decorator to track file download/upload operations.""" + def decorator(func): + @wraps(func) + async def async_wrapper(*args, **kwargs): + start_time = time.time() + try: + result = await func(*args, **kwargs) + duration = time.time() - start_time + + # Получаем размер файла из результата + file_size = 0 + if result and isinstance(result, str) and os.path.exists(result): + file_size = os.path.getsize(result) + + # Записываем метрики + metrics.record_file_download(content_type, file_size, duration) + + return result + except Exception as e: + duration = time.time() - start_time + metrics.record_file_download_error(content_type, str(e)) + raise + + @wraps(func) + def sync_wrapper(*args, **kwargs): + start_time = time.time() + try: + result = func(*args, **kwargs) + duration = time.time() - start_time + + # Получаем размер файла из результата + file_size = 0 + if result and isinstance(result, str) and os.path.exists(result): + file_size = os.path.getsize(result) + + # Записываем метрики + metrics.record_file_download(content_type, file_size, duration) + + return result + except Exception as e: + duration = time.time() - start_time + metrics.record_file_download_error(content_type, str(e)) + raise + + if asyncio.iscoroutinefunction(func): + return async_wrapper + return sync_wrapper + return decorator diff --git a/helper_bot/utils/rate_limit_monitor.py b/helper_bot/utils/rate_limit_monitor.py new file mode 100644 index 0000000..1abb4c3 --- /dev/null +++ b/helper_bot/utils/rate_limit_monitor.py @@ -0,0 +1,220 @@ +""" +Мониторинг и статистика rate limiting +""" +import time +from typing import Dict, List, Optional +from dataclasses import dataclass, field +from collections import defaultdict, deque +from logs.custom_logger import logger + + +@dataclass +class RateLimitStats: + """Статистика rate limiting для чата""" + chat_id: int + total_requests: int = 0 + successful_requests: int = 0 + failed_requests: int = 0 + retry_after_errors: int = 0 + other_errors: int = 0 + total_wait_time: float = 0.0 + last_request_time: float = 0.0 + request_times: deque = field(default_factory=lambda: deque(maxlen=100)) + + @property + def success_rate(self) -> float: + """Процент успешных запросов""" + if self.total_requests == 0: + return 1.0 + return self.successful_requests / self.total_requests + + @property + def error_rate(self) -> float: + """Процент ошибок""" + return 1.0 - self.success_rate + + @property + def average_wait_time(self) -> float: + """Среднее время ожидания""" + if self.total_requests == 0: + return 0.0 + return self.total_wait_time / self.total_requests + + @property + def requests_per_minute(self) -> float: + """Запросов в минуту""" + if not self.request_times: + return 0.0 + + current_time = time.time() + minute_ago = current_time - 60 + + # Подсчитываем запросы за последнюю минуту + recent_requests = sum(1 for req_time in self.request_times if req_time > minute_ago) + return recent_requests + + +class RateLimitMonitor: + """Монитор для отслеживания статистики rate limiting""" + + def __init__(self, max_history_size: int = 1000): + self.stats: Dict[int, RateLimitStats] = defaultdict(lambda: RateLimitStats(0)) + self.global_stats = RateLimitStats(0) + self.max_history_size = max_history_size + self.error_history: deque = deque(maxlen=max_history_size) + + def record_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: Optional[str] = None): + """Записывает информацию о запросе""" + current_time = time.time() + + # Обновляем статистику для чата + chat_stats = self.stats[chat_id] + chat_stats.chat_id = chat_id + chat_stats.total_requests += 1 + chat_stats.total_wait_time += wait_time + chat_stats.last_request_time = current_time + chat_stats.request_times.append(current_time) + + if success: + chat_stats.successful_requests += 1 + else: + chat_stats.failed_requests += 1 + if error_type == "RetryAfter": + chat_stats.retry_after_errors += 1 + else: + chat_stats.other_errors += 1 + + # Записываем ошибку в историю + self.error_history.append({ + 'chat_id': chat_id, + 'error_type': error_type, + 'timestamp': current_time, + 'wait_time': wait_time + }) + + # Обновляем глобальную статистику + self.global_stats.total_requests += 1 + self.global_stats.total_wait_time += wait_time + self.global_stats.last_request_time = current_time + self.global_stats.request_times.append(current_time) + + if success: + self.global_stats.successful_requests += 1 + else: + self.global_stats.failed_requests += 1 + if error_type == "RetryAfter": + self.global_stats.retry_after_errors += 1 + else: + self.global_stats.other_errors += 1 + + def get_chat_stats(self, chat_id: int) -> Optional[RateLimitStats]: + """Получает статистику для конкретного чата""" + return self.stats.get(chat_id) + + def get_global_stats(self) -> RateLimitStats: + """Получает глобальную статистику""" + return self.global_stats + + def get_top_chats_by_requests(self, limit: int = 10) -> List[tuple]: + """Получает топ чатов по количеству запросов""" + sorted_chats = sorted( + self.stats.items(), + key=lambda x: x[1].total_requests, + reverse=True + ) + return sorted_chats[:limit] + + def get_chats_with_high_error_rate(self, threshold: float = 0.1) -> List[tuple]: + """Получает чаты с высоким процентом ошибок""" + high_error_chats = [ + (chat_id, stats) for chat_id, stats in self.stats.items() + if stats.error_rate > threshold and stats.total_requests > 5 + ] + return sorted(high_error_chats, key=lambda x: x[1].error_rate, reverse=True) + + def get_recent_errors(self, minutes: int = 60) -> List[dict]: + """Получает недавние ошибки""" + current_time = time.time() + cutoff_time = current_time - (minutes * 60) + + return [ + error for error in self.error_history + if error['timestamp'] > cutoff_time + ] + + def get_error_summary(self, minutes: int = 60) -> Dict[str, int]: + """Получает сводку ошибок за указанный период""" + recent_errors = self.get_recent_errors(minutes) + error_summary = defaultdict(int) + + for error in recent_errors: + error_summary[error['error_type']] += 1 + + return dict(error_summary) + + def log_statistics(self, log_level: str = "info"): + """Логирует текущую статистику""" + global_stats = self.get_global_stats() + + log_message = ( + f"Rate Limit Statistics:\n" + f" Total requests: {global_stats.total_requests}\n" + f" Success rate: {global_stats.success_rate:.2%}\n" + f" Error rate: {global_stats.error_rate:.2%}\n" + f" RetryAfter errors: {global_stats.retry_after_errors}\n" + f" Other errors: {global_stats.other_errors}\n" + f" Average wait time: {global_stats.average_wait_time:.2f}s\n" + f" Requests per minute: {global_stats.requests_per_minute:.1f}\n" + f" Active chats: {len(self.stats)}" + ) + + if log_level == "error": + logger.error(log_message) + elif log_level == "warning": + logger.warning(log_message) + else: + logger.info(log_message) + + # Логируем чаты с высоким процентом ошибок + high_error_chats = self.get_chats_with_high_error_rate(0.2) + if high_error_chats: + logger.warning(f"Chats with high error rate (>20%): {len(high_error_chats)}") + for chat_id, stats in high_error_chats[:5]: # Показываем только первые 5 + logger.warning(f" Chat {chat_id}: {stats.error_rate:.2%} error rate ({stats.failed_requests}/{stats.total_requests})") + + def reset_stats(self, chat_id: Optional[int] = None): + """Сбрасывает статистику""" + if chat_id is None: + # Сбрасываем всю статистику + self.stats.clear() + self.global_stats = RateLimitStats(0) + self.error_history.clear() + else: + # Сбрасываем статистику для конкретного чата + if chat_id in self.stats: + del self.stats[chat_id] + + +# Глобальный экземпляр монитора +rate_limit_monitor = RateLimitMonitor() + + +def record_rate_limit_request(chat_id: int, success: bool, wait_time: float = 0.0, error_type: Optional[str] = None): + """Удобная функция для записи информации о запросе""" + rate_limit_monitor.record_request(chat_id, success, wait_time, error_type) + + +def get_rate_limit_summary() -> Dict: + """Получает краткую сводку по rate limiting""" + global_stats = rate_limit_monitor.get_global_stats() + recent_errors = rate_limit_monitor.get_recent_errors(60) # За последний час + + return { + 'total_requests': global_stats.total_requests, + 'success_rate': global_stats.success_rate, + 'error_rate': global_stats.error_rate, + 'recent_errors_count': len(recent_errors), + 'active_chats': len(rate_limit_monitor.stats), + 'requests_per_minute': global_stats.requests_per_minute, + 'average_wait_time': global_stats.average_wait_time + } diff --git a/helper_bot/utils/rate_limiter.py b/helper_bot/utils/rate_limiter.py new file mode 100644 index 0000000..f67cc8c --- /dev/null +++ b/helper_bot/utils/rate_limiter.py @@ -0,0 +1,215 @@ +""" +Rate limiter для предотвращения Flood control ошибок в Telegram Bot API +""" +import asyncio +import time +from typing import Dict, Optional, Any, Callable +from dataclasses import dataclass +from aiogram.exceptions import TelegramRetryAfter, TelegramAPIError +from logs.custom_logger import logger +from .metrics import metrics + + +@dataclass +class RateLimitConfig: + """Конфигурация для rate limiting""" + messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат + burst_limit: int = 3 # Максимум 3 сообщения подряд + retry_after_multiplier: float = 1.2 # Множитель для увеличения задержки при retry + max_retry_delay: float = 60.0 # Максимальная задержка между попытками + + +class ChatRateLimiter: + """Rate limiter для конкретного чата""" + + def __init__(self, config: RateLimitConfig): + self.config = config + self.last_send_time = 0.0 + self.burst_count = 0 + self.burst_reset_time = 0.0 + self.retry_delay = 1.0 + + async def wait_if_needed(self) -> None: + """Ждет если необходимо для соблюдения rate limit""" + current_time = time.time() + + # Сбрасываем счетчик burst если прошло достаточно времени + if current_time >= self.burst_reset_time: + self.burst_count = 0 + self.burst_reset_time = current_time + 1.0 + + # Проверяем burst limit + if self.burst_count >= self.config.burst_limit: + wait_time = self.burst_reset_time - current_time + if wait_time > 0: + logger.info(f"Burst limit reached, waiting {wait_time:.2f}s") + await asyncio.sleep(wait_time) + current_time = time.time() + self.burst_count = 0 + self.burst_reset_time = current_time + 1.0 + + # Проверяем минимальный интервал между сообщениями + time_since_last = current_time - self.last_send_time + min_interval = 1.0 / self.config.messages_per_second + + if time_since_last < min_interval: + wait_time = min_interval - time_since_last + logger.debug(f"Rate limiting: waiting {wait_time:.2f}s") + await asyncio.sleep(wait_time) + + # Обновляем время последней отправки + self.last_send_time = time.time() + self.burst_count += 1 + + +class GlobalRateLimiter: + """Глобальный rate limiter для всех чатов""" + + def __init__(self, config: RateLimitConfig): + self.config = config + self.chat_limiters: Dict[int, ChatRateLimiter] = {} + self.global_last_send = 0.0 + self.global_min_interval = 0.1 # Минимум 100ms между любыми сообщениями + + def get_chat_limiter(self, chat_id: int) -> ChatRateLimiter: + """Получает rate limiter для конкретного чата""" + if chat_id not in self.chat_limiters: + self.chat_limiters[chat_id] = ChatRateLimiter(self.config) + return self.chat_limiters[chat_id] + + async def wait_if_needed(self, chat_id: int) -> None: + """Ждет если необходимо для соблюдения глобального и чат-специфичного rate limit""" + current_time = time.time() + + # Глобальный rate limit + time_since_global = current_time - self.global_last_send + if time_since_global < self.global_min_interval: + wait_time = self.global_min_interval - time_since_global + await asyncio.sleep(wait_time) + current_time = time.time() + + # Чат-специфичный rate limit + chat_limiter = self.get_chat_limiter(chat_id) + await chat_limiter.wait_if_needed() + + self.global_last_send = time.time() + + +class RetryHandler: + """Обработчик повторных попыток с экспоненциальной задержкой""" + + def __init__(self, config: RateLimitConfig): + self.config = config + + async def execute_with_retry( + self, + func: Callable, + chat_id: int, + *args, + max_retries: int = 3, + **kwargs + ) -> Any: + """Выполняет функцию с повторными попытками при ошибках""" + retry_count = 0 + current_delay = self.config.retry_after_multiplier + total_wait_time = 0.0 + + while retry_count <= max_retries: + try: + result = await func(*args, **kwargs) + # Записываем успешный запрос + metrics.record_rate_limit_request(chat_id, True, total_wait_time) + return result + + except TelegramRetryAfter as e: + retry_count += 1 + if retry_count > max_retries: + logger.error(f"Max retries exceeded for RetryAfter: {e}") + metrics.record_rate_limit_request(chat_id, False, total_wait_time, "RetryAfter") + raise + + # Используем время ожидания от Telegram или наше увеличенное + wait_time = max(e.retry_after, current_delay) + wait_time = min(wait_time, self.config.max_retry_delay) + total_wait_time += wait_time + + logger.warning(f"RetryAfter error, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries})") + await asyncio.sleep(wait_time) + current_delay *= self.config.retry_after_multiplier + + except TelegramAPIError as e: + retry_count += 1 + if retry_count > max_retries: + logger.error(f"Max retries exceeded for TelegramAPIError: {e}") + metrics.record_rate_limit_request(chat_id, False, total_wait_time, "TelegramAPIError") + raise + + wait_time = min(current_delay, self.config.max_retry_delay) + total_wait_time += wait_time + logger.warning(f"TelegramAPIError, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries}): {e}") + await asyncio.sleep(wait_time) + current_delay *= self.config.retry_after_multiplier + + except Exception as e: + # Для других ошибок не делаем retry + logger.error(f"Non-retryable error: {e}") + metrics.record_rate_limit_request(chat_id, False, total_wait_time, "Other") + raise + + +class TelegramRateLimiter: + """Основной класс для rate limiting в Telegram боте""" + + def __init__(self, config: Optional[RateLimitConfig] = None): + self.config = config or RateLimitConfig() + self.global_limiter = GlobalRateLimiter(self.config) + self.retry_handler = RetryHandler(self.config) + + async def send_with_rate_limit( + self, + send_func: Callable, + chat_id: int, + *args, + **kwargs + ) -> Any: + """Отправляет сообщение с соблюдением rate limit и retry логики""" + + async def _send(): + await self.global_limiter.wait_if_needed(chat_id) + return await send_func(*args, **kwargs) + + return await self.retry_handler.execute_with_retry(_send, chat_id) + + +# Глобальный экземпляр rate limiter +from helper_bot.config.rate_limit_config import get_rate_limit_config, RateLimitSettings + +def _create_rate_limit_config(settings: RateLimitSettings) -> RateLimitConfig: + """Создает RateLimitConfig из RateLimitSettings""" + return RateLimitConfig( + messages_per_second=settings.messages_per_second, + burst_limit=settings.burst_limit, + retry_after_multiplier=settings.retry_after_multiplier, + max_retry_delay=settings.max_retry_delay + ) + +# Получаем конфигурацию из настроек +_rate_limit_settings = get_rate_limit_config("production") +_default_config = _create_rate_limit_config(_rate_limit_settings) + +telegram_rate_limiter = TelegramRateLimiter(_default_config) + + +async def send_with_rate_limit(send_func: Callable, chat_id: int, *args, **kwargs) -> Any: + """ + Удобная функция для отправки сообщений с rate limiting + + Args: + send_func: Функция отправки (например, bot.send_message) + chat_id: ID чата + *args, **kwargs: Аргументы для функции отправки + + Returns: + Результат выполнения функции отправки + """ + return await telegram_rate_limiter.send_with_rate_limit(send_func, chat_id, *args, **kwargs) diff --git a/scripts/voice_cleanup.py b/scripts/voice_cleanup.py new file mode 100644 index 0000000..992fe57 --- /dev/null +++ b/scripts/voice_cleanup.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +""" +Скрипт для диагностики и очистки проблем с голосовыми файлами +""" +import asyncio +import sys +import os +from pathlib import Path + +# Добавляем корневую директорию проекта в путь +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from database.async_db import AsyncBotDB +from helper_bot.handlers.voice.cleanup_utils import VoiceFileCleanupUtils +from logs.custom_logger import logger + + +async def main(): + """Основная функция скрипта""" + try: + # Инициализация базы данных + db_path = "database/tg-bot-database.db" + if not os.path.exists(db_path): + logger.error(f"База данных не найдена: {db_path}") + return + + bot_db = AsyncBotDB(db_path) + cleanup_utils = VoiceFileCleanupUtils(bot_db) + + print("=== Диагностика голосовых файлов ===") + + # Запускаем полную диагностику + diagnostic_result = await cleanup_utils.run_full_diagnostic() + + print(f"\n📊 Статистика диска:") + if "error" in diagnostic_result["disk_stats"]: + print(f" ❌ Ошибка: {diagnostic_result['disk_stats']['error']}") + else: + stats = diagnostic_result["disk_stats"] + print(f" 📁 Директория: {stats['directory']}") + print(f" 📄 Всего файлов: {stats['total_files']}") + print(f" 💾 Размер: {stats['total_size_mb']} MB") + + print(f"\n🗄️ База данных:") + print(f" 📝 Записей в БД: {diagnostic_result['db_records_count']}") + print(f" 🔍 Записей без файлов: {diagnostic_result['orphaned_db_records_count']}") + print(f" 📁 Файлов без записей: {diagnostic_result['orphaned_files_count']}") + + print(f"\n📋 Статус: {diagnostic_result['status']}") + + if diagnostic_result['status'] == 'issues_found': + print("\n⚠️ Найдены проблемы!") + + if diagnostic_result['orphaned_db_records_count'] > 0: + print(f"\n🗑️ Записи в БД без файлов (первые 10):") + for file_name, user_id in diagnostic_result['orphaned_db_records']: + print(f" - {file_name} (user_id: {user_id})") + + if diagnostic_result['orphaned_files_count'] > 0: + print(f"\n📁 Файлы без записей в БД (первые 10):") + for file_path in diagnostic_result['orphaned_files']: + print(f" - {file_path}") + + # Предлагаем очистку + print("\n🧹 Хотите выполнить очистку?") + print("1. Удалить записи в БД без файлов") + print("2. Удалить файлы без записей в БД") + print("3. Выполнить полную очистку") + print("4. Выход") + + choice = input("\nВыберите действие (1-4): ").strip() + + if choice == "1": + print("\n🗑️ Удаление записей в БД без файлов...") + deleted = await cleanup_utils.cleanup_orphaned_db_records(dry_run=False) + print(f"✅ Удалено {deleted} записей") + + elif choice == "2": + print("\n📁 Удаление файлов без записей в БД...") + deleted = await cleanup_utils.cleanup_orphaned_files(dry_run=False) + print(f"✅ Удалено {deleted} файлов") + + elif choice == "3": + print("\n🧹 Полная очистка...") + db_deleted = await cleanup_utils.cleanup_orphaned_db_records(dry_run=False) + files_deleted = await cleanup_utils.cleanup_orphaned_files(dry_run=False) + print(f"✅ Удалено {db_deleted} записей в БД и {files_deleted} файлов") + + elif choice == "4": + print("👋 Выход...") + else: + print("❌ Неверный выбор") + else: + print("\n✅ Проблем не найдено!") + + except Exception as e: + logger.error(f"Ошибка в скрипте: {e}") + print(f"❌ Ошибка: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/test_rate_limiting.py b/test_rate_limiting.py new file mode 100644 index 0000000..084f4f3 --- /dev/null +++ b/test_rate_limiting.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +""" +Скрипт для тестирования rate limiting решения +""" +import asyncio +import time +from unittest.mock import AsyncMock, MagicMock +from aiogram.types import Message, User, Chat + +from helper_bot.utils.rate_limiter import send_with_rate_limit +from helper_bot.utils.rate_limit_monitor import rate_limit_monitor, get_rate_limit_summary + + +async def test_rate_limiting(): + """Тестирует rate limiting с имитацией отправки сообщений""" + + print("🚀 Начинаем тестирование rate limiting...") + + # Создаем мок объекты + mock_bot = MagicMock() + mock_user = User(id=123, is_bot=False, first_name="Test") + mock_chat = Chat(id=456, type="private") + + # Создаем Message с bot в конструкторе + mock_message = Message( + message_id=1, + date=int(time.time()), + chat=mock_chat, + from_user=mock_user, + content_type="text", + bot=mock_bot + ) + + # Настраиваем мок для send_voice + mock_bot.send_voice = AsyncMock(return_value=MagicMock(message_id=1)) + + # Функция для отправки голосового сообщения + async def send_voice_test(): + return await mock_bot.send_voice( + chat_id=mock_chat.id, + voice="test_voice_id" + ) + + print("📊 Отправляем 5 сообщений подряд...") + + # Отправляем несколько сообщений подряд + start_time = time.time() + for i in range(5): + print(f" Отправка сообщения {i+1}/5...") + try: + result = await send_with_rate_limit(send_voice_test, mock_chat.id) + print(f" ✅ Сообщение {i+1} отправлено успешно") + except Exception as e: + print(f" ❌ Ошибка при отправке сообщения {i+1}: {e}") + + end_time = time.time() + total_time = end_time - start_time + + print(f"\n⏱️ Общее время выполнения: {total_time:.2f} секунд") + print(f"📈 Среднее время на сообщение: {total_time/5:.2f} секунд") + + # Показываем статистику + print("\n📊 Статистика rate limiting:") + summary = get_rate_limit_summary() + for key, value in summary.items(): + if isinstance(value, float): + print(f" {key}: {value:.2f}") + else: + print(f" {key}: {value}") + + # Показываем детальную статистику + print("\n🔍 Детальная статистика:") + global_stats = rate_limit_monitor.get_global_stats() + print(f" Всего запросов: {global_stats.total_requests}") + print(f" Успешных: {global_stats.successful_requests}") + print(f" Неудачных: {global_stats.failed_requests}") + print(f" Процент успеха: {global_stats.success_rate:.1%}") + print(f" Среднее время ожидания: {global_stats.average_wait_time:.2f}с") + + # Проверяем что rate limiting работает + if total_time > 8: # Должно занять больше 8 секунд (5 сообщений * 1.6с минимум) + print("\n✅ Rate limiting работает корректно - сообщения отправляются с задержкой") + else: + print("\n⚠️ Rate limiting может работать некорректно - сообщения отправлены слишком быстро") + + print("\n🎉 Тестирование завершено!") + + +async def test_error_handling(): + """Тестирует обработку ошибок""" + + print("\n🧪 Тестируем обработку ошибок...") + + # Создаем мок который будет падать с RetryAfter + from aiogram.exceptions import TelegramRetryAfter + + mock_bot = MagicMock() + mock_chat = Chat(id=789, type="private") + + call_count = 0 + async def failing_send(): + nonlocal call_count + call_count += 1 + if call_count <= 2: + raise TelegramRetryAfter( + method=MagicMock(), + message="Flood control exceeded", + retry_after=1 + ) + return MagicMock(message_id=call_count) + + mock_bot.send_voice = failing_send + + print("📤 Отправляем сообщение с имитацией RetryAfter ошибки...") + + start_time = time.time() + try: + result = await send_with_rate_limit(failing_send, mock_chat.id) + end_time = time.time() + print(f"✅ Сообщение отправлено после {call_count} попыток за {end_time - start_time:.2f}с") + except Exception as e: + print(f"❌ Ошибка: {e}") + + print("🎯 Тест обработки ошибок завершен!") + + +async def main(): + """Основная функция""" + print("🔧 Тестирование решения Flood Control") + print("=" * 50) + + # Сбрасываем статистику + rate_limit_monitor.reset_stats() + + # Запускаем тесты + await test_rate_limiting() + await test_error_handling() + + print("\n" + "=" * 50) + print("📋 Итоговая статистика:") + summary = get_rate_limit_summary() + for key, value in summary.items(): + if isinstance(value, float): + print(f" {key}: {value:.2f}") + else: + print(f" {key}: {value}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_async_db.py b/tests/test_async_db.py new file mode 100644 index 0000000..d87b686 --- /dev/null +++ b/tests/test_async_db.py @@ -0,0 +1,104 @@ +import pytest +from unittest.mock import Mock, AsyncMock, patch +from database.async_db import AsyncBotDB + + +class TestAsyncBotDB: + """Тесты для AsyncBotDB""" + + @pytest.fixture + def mock_factory(self): + """Мок для RepositoryFactory""" + mock_factory = Mock() + mock_factory.audio = Mock() + mock_factory.audio.delete_audio_moderate_record = AsyncMock() + mock_factory.users = Mock() + mock_factory.users.logger = Mock() + return mock_factory + + @pytest.fixture + def async_bot_db(self, mock_factory): + """Экземпляр AsyncBotDB для тестов""" + with patch('database.async_db.RepositoryFactory') as mock_factory_class: + mock_factory_class.return_value = mock_factory + db = AsyncBotDB("test.db") + return db + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record(self, async_bot_db, mock_factory): + """Тест метода delete_audio_moderate_record""" + message_id = 12345 + + await async_bot_db.delete_audio_moderate_record(message_id) + + # Проверяем, что метод вызван в репозитории + mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id) + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record_with_different_message_id(self, async_bot_db, mock_factory): + """Тест метода delete_audio_moderate_record с разными message_id""" + test_cases = [123, 456, 789, 99999] + + for message_id in test_cases: + await async_bot_db.delete_audio_moderate_record(message_id) + mock_factory.audio.delete_audio_moderate_record.assert_called_with(message_id) + + # Проверяем, что метод вызван для каждого message_id + assert mock_factory.audio.delete_audio_moderate_record.call_count == len(test_cases) + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record_exception_handling(self, async_bot_db, mock_factory): + """Тест обработки исключений в delete_audio_moderate_record""" + message_id = 12345 + mock_factory.audio.delete_audio_moderate_record.side_effect = Exception("Database error") + + # Метод должен пробросить исключение + with pytest.raises(Exception, match="Database error"): + await async_bot_db.delete_audio_moderate_record(message_id) + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record_integration_with_other_methods(self, async_bot_db, mock_factory): + """Тест интеграции delete_audio_moderate_record с другими методами""" + message_id = 12345 + user_id = 67890 + + # Мокаем другие методы + mock_factory.audio.get_user_id_by_message_id_for_voice_bot = AsyncMock(return_value=user_id) + mock_factory.audio.set_user_id_and_message_id_for_voice_bot = AsyncMock(return_value=True) + + # Тестируем последовательность операций + await async_bot_db.get_user_id_by_message_id_for_voice_bot(message_id) + await async_bot_db.set_user_id_and_message_id_for_voice_bot(message_id, user_id) + await async_bot_db.delete_audio_moderate_record(message_id) + + # Проверяем, что все методы вызваны + mock_factory.audio.get_user_id_by_message_id_for_voice_bot.assert_called_once_with(message_id) + mock_factory.audio.set_user_id_and_message_id_for_voice_bot.assert_called_once_with(message_id, user_id) + mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id) + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record_zero_message_id(self, async_bot_db, mock_factory): + """Тест delete_audio_moderate_record с message_id = 0""" + message_id = 0 + + await async_bot_db.delete_audio_moderate_record(message_id) + + mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id) + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record_negative_message_id(self, async_bot_db, mock_factory): + """Тест delete_audio_moderate_record с отрицательным message_id""" + message_id = -12345 + + await async_bot_db.delete_audio_moderate_record(message_id) + + mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id) + + @pytest.mark.asyncio + async def test_delete_audio_moderate_record_large_message_id(self, async_bot_db, mock_factory): + """Тест delete_audio_moderate_record с большим message_id""" + message_id = 999999999 + + await async_bot_db.delete_audio_moderate_record(message_id) + + mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id) diff --git a/tests/test_audio_file_service.py b/tests/test_audio_file_service.py index 761295a..4d82343 100644 --- a/tests/test_audio_file_service.py +++ b/tests/test_audio_file_service.py @@ -1,5 +1,5 @@ import pytest -from unittest.mock import Mock, AsyncMock, patch, MagicMock +from unittest.mock import Mock, AsyncMock, patch, MagicMock, mock_open from datetime import datetime import time @@ -112,23 +112,27 @@ class TestSaveAudioFile: @pytest.mark.asyncio async def test_save_audio_file_success(self, audio_service, mock_bot_db, sample_datetime): """Тест успешного сохранения аудио файла""" - file_name = "test_audio.ogg" + file_name = "test_audio" user_id = 12345 file_id = "test_file_id" - await audio_service.save_audio_file(file_name, user_id, sample_datetime, file_id) + # Мокаем verify_file_exists чтобы он возвращал True + with patch.object(audio_service, 'verify_file_exists', return_value=True): + await audio_service.save_audio_file(file_name, user_id, sample_datetime, file_id) mock_bot_db.add_audio_record_simple.assert_called_once_with(file_name, user_id, sample_datetime) @pytest.mark.asyncio async def test_save_audio_file_with_string_date(self, audio_service, mock_bot_db): """Тест сохранения аудио файла со строковой датой""" - file_name = "test_audio.ogg" + file_name = "test_audio" user_id = 12345 date_string = "2025-01-15 14:30:00" file_id = "test_file_id" - await audio_service.save_audio_file(file_name, user_id, date_string, file_id) + # Мокаем verify_file_exists чтобы он возвращал True + with patch.object(audio_service, 'verify_file_exists', return_value=True): + await audio_service.save_audio_file(file_name, user_id, date_string, file_id) mock_bot_db.add_audio_record_simple.assert_called_once_with(file_name, user_id, date_string) @@ -137,8 +141,10 @@ class TestSaveAudioFile: """Тест обработки исключений при сохранении аудио файла""" mock_bot_db.add_audio_record_simple.side_effect = Exception("Database error") - with pytest.raises(DatabaseError) as exc_info: - await audio_service.save_audio_file("test.ogg", 12345, sample_datetime, "file_id") + # Мокаем verify_file_exists чтобы он возвращал True + with patch.object(audio_service, 'verify_file_exists', return_value=True): + with pytest.raises(DatabaseError) as exc_info: + await audio_service.save_audio_file("test", 12345, sample_datetime, "file_id") assert "Не удалось сохранить аудио файл в БД" in str(exc_info.value) @@ -156,15 +162,23 @@ class TestDownloadAndSaveAudio: mock_downloaded_file.tell.return_value = 0 mock_downloaded_file.seek = Mock() mock_downloaded_file.read.return_value = b"audio_data" + + # Настраиваем поведение tell() для получения размера файла + def mock_tell(): + return 0 if mock_downloaded_file.seek.call_count == 0 else 1024 + mock_downloaded_file.tell = Mock(side_effect=mock_tell) + mock_bot.download_file.return_value = mock_downloaded_file with patch('builtins.open', mock_open()) as mock_file: with patch('os.makedirs'): - await audio_service.download_and_save_audio(mock_bot, mock_message, "test_audio") - - mock_bot.get_file.assert_called_once_with(file_id="test_file_id") - mock_bot.download_file.assert_called_once_with(file_path="voice/test_file_id.ogg") - mock_file.assert_called_once() + with patch('os.path.exists', return_value=True): + with patch('os.path.getsize', return_value=1024): + await audio_service.download_and_save_audio(mock_bot, mock_message, "test_audio") + + mock_bot.get_file.assert_called_once_with(file_id="test_file_id") + mock_bot.download_file.assert_called_once_with(file_path="voice/test_file_id.ogg") + mock_file.assert_called_once() @pytest.mark.asyncio async def test_download_and_save_audio_no_message(self, audio_service, mock_bot): @@ -207,10 +221,6 @@ class TestDownloadAndSaveAudio: assert "Не удалось скачать и сохранить аудио" in str(exc_info.value) -def mock_open(): - """Мок для функции open""" - from unittest.mock import mock_open as _mock_open - return _mock_open() class TestAudioFileServiceIntegration: @@ -232,7 +242,8 @@ class TestAudioFileServiceIntegration: # Тестируем сохранение в БД test_date = datetime.now() - await service.save_audio_file(file_name, 12345, test_date, "test_file_id") + with patch.object(service, 'verify_file_exists', return_value=True): + await service.save_audio_file(file_name, 12345, test_date, "test_file_id") # Проверяем вызовы mock_bot_db.get_user_audio_records_count.assert_called_once_with(user_id=12345) diff --git a/tests/test_rate_limiter.py b/tests/test_rate_limiter.py new file mode 100644 index 0000000..1a0916c --- /dev/null +++ b/tests/test_rate_limiter.py @@ -0,0 +1,310 @@ +""" +Тесты для rate limiter +""" +import asyncio +import time +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from helper_bot.utils.rate_limiter import ( + RateLimitConfig, + ChatRateLimiter, + GlobalRateLimiter, + RetryHandler, + TelegramRateLimiter, + send_with_rate_limit +) +from helper_bot.utils.rate_limit_monitor import RateLimitMonitor, RateLimitStats, record_rate_limit_request +from helper_bot.config.rate_limit_config import RateLimitSettings, get_rate_limit_config + + +class TestRateLimitConfig: + """Тесты для RateLimitConfig""" + + def test_default_config(self): + """Тест создания конфигурации по умолчанию""" + config = RateLimitConfig() + assert config.messages_per_second == 0.5 + assert config.burst_limit == 3 + assert config.retry_after_multiplier == 1.2 + assert config.max_retry_delay == 60.0 + + +class TestChatRateLimiter: + """Тесты для ChatRateLimiter""" + + def test_initialization(self): + """Тест инициализации""" + config = RateLimitConfig(messages_per_second=1.0, burst_limit=2) + limiter = ChatRateLimiter(config) + + assert limiter.config == config + assert limiter.last_send_time == 0.0 + assert limiter.burst_count == 0 + assert limiter.retry_delay == 1.0 + + @pytest.mark.asyncio + async def test_wait_if_needed_no_wait(self): + """Тест что не ждет если не нужно""" + config = RateLimitConfig(messages_per_second=10.0, burst_limit=10) + limiter = ChatRateLimiter(config) + + start_time = time.time() + await limiter.wait_if_needed() + end_time = time.time() + + # Должно пройти очень быстро + assert end_time - start_time < 0.1 + + @pytest.mark.asyncio + async def test_wait_if_needed_with_wait(self): + """Тест что ждет если нужно""" + config = RateLimitConfig(messages_per_second=0.5, burst_limit=10) # 1 сообщение в 2 секунды + limiter = ChatRateLimiter(config) + + # Первый вызов не должен ждать + start_time = time.time() + await limiter.wait_if_needed() + first_call_time = time.time() - start_time + + # Второй вызов должен ждать + start_time = time.time() + await limiter.wait_if_needed() + second_call_time = time.time() - start_time + + assert first_call_time < 0.1 + assert second_call_time >= 1.8 # Должно ждать около 2 секунд + + @pytest.mark.asyncio + async def test_burst_limit(self): + """Тест ограничения burst""" + config = RateLimitConfig(messages_per_second=10.0, burst_limit=2) + limiter = ChatRateLimiter(config) + + # Первые два вызова не должны ждать + start_time = time.time() + await limiter.wait_if_needed() + await limiter.wait_if_needed() + first_two_calls_time = time.time() - start_time + + # Третий вызов должен ждать + start_time = time.time() + await limiter.wait_if_needed() + third_call_time = time.time() - start_time + + assert first_two_calls_time < 0.2 # Более мягкое ограничение + assert third_call_time >= 0.8 # Должно ждать около 1 секунды (с учетом погрешности) + + +class TestGlobalRateLimiter: + """Тесты для GlobalRateLimiter""" + + def test_initialization(self): + """Тест инициализации""" + config = RateLimitConfig() + limiter = GlobalRateLimiter(config) + + assert limiter.config == config + assert limiter.chat_limiters == {} + assert limiter.global_last_send == 0.0 + + def test_get_chat_limiter(self): + """Тест получения limiter для чата""" + config = RateLimitConfig() + limiter = GlobalRateLimiter(config) + + chat_limiter = limiter.get_chat_limiter(123) + assert isinstance(chat_limiter, ChatRateLimiter) + assert limiter.chat_limiters[123] == chat_limiter + + # Повторный вызов должен вернуть тот же объект + same_limiter = limiter.get_chat_limiter(123) + assert same_limiter is chat_limiter + + +class TestRetryHandler: + """Тесты для RetryHandler""" + + def test_initialization(self): + """Тест инициализации""" + config = RateLimitConfig() + handler = RetryHandler(config) + assert handler.config == config + + @pytest.mark.asyncio + async def test_execute_with_retry_success(self): + """Тест успешного выполнения без retry""" + config = RateLimitConfig() + handler = RetryHandler(config) + + mock_func = AsyncMock(return_value="success") + + result = await handler.execute_with_retry(mock_func, 123) + + assert result == "success" + mock_func.assert_called_once() + + @pytest.mark.asyncio + async def test_execute_with_retry_retry_after(self): + """Тест retry после RetryAfter ошибки""" + from aiogram.exceptions import TelegramRetryAfter + + config = RateLimitConfig(retry_after_multiplier=1.0, max_retry_delay=1.0) + handler = RetryHandler(config) + + mock_func = AsyncMock() + # Создаем мок для TelegramRetryAfter + from unittest.mock import MagicMock + retry_after_error = TelegramRetryAfter( + method=MagicMock(), + message="Flood control exceeded", + retry_after=1 # 1 секунда + ) + + mock_func.side_effect = [ + retry_after_error, # Первый вызов - ошибка + "success" # Второй вызов - успех + ] + + start_time = time.time() + result = await handler.execute_with_retry(mock_func, 123, max_retries=1) + end_time = time.time() + + assert result == "success" + assert mock_func.call_count == 2 + assert end_time - start_time >= 0.1 # Должно ждать + + +class TestTelegramRateLimiter: + """Тесты для TelegramRateLimiter""" + + def test_initialization(self): + """Тест инициализации""" + config = RateLimitConfig() + limiter = TelegramRateLimiter(config) + + assert limiter.config == config + assert isinstance(limiter.global_limiter, GlobalRateLimiter) + assert isinstance(limiter.retry_handler, RetryHandler) + + @pytest.mark.asyncio + async def test_send_with_rate_limit(self): + """Тест отправки с rate limiting""" + config = RateLimitConfig(messages_per_second=10.0, burst_limit=10) + limiter = TelegramRateLimiter(config) + + mock_send_func = AsyncMock(return_value="sent") + + result = await limiter.send_with_rate_limit(mock_send_func, 123) + + assert result == "sent" + mock_send_func.assert_called_once() + + +class TestRateLimitMonitor: + """Тесты для RateLimitMonitor""" + + def test_initialization(self): + """Тест инициализации""" + monitor = RateLimitMonitor() + + assert monitor.stats == {} + assert isinstance(monitor.global_stats, RateLimitStats) + assert monitor.max_history_size == 1000 + + def test_record_request_success(self): + """Тест записи успешного запроса""" + monitor = RateLimitMonitor() + + monitor.record_request(123, True, 0.5) + + assert 123 in monitor.stats + chat_stats = monitor.stats[123] + assert chat_stats.total_requests == 1 + assert chat_stats.successful_requests == 1 + assert chat_stats.failed_requests == 0 + assert chat_stats.total_wait_time == 0.5 + + def test_record_request_failure(self): + """Тест записи неудачного запроса""" + monitor = RateLimitMonitor() + + monitor.record_request(123, False, 1.0, "RetryAfter") + + assert 123 in monitor.stats + chat_stats = monitor.stats[123] + assert chat_stats.total_requests == 1 + assert chat_stats.successful_requests == 0 + assert chat_stats.failed_requests == 1 + assert chat_stats.retry_after_errors == 1 + assert chat_stats.total_wait_time == 1.0 + + def test_get_chat_stats(self): + """Тест получения статистики чата""" + monitor = RateLimitMonitor() + + # Статистика для несуществующего чата + assert monitor.get_chat_stats(999) is None + + # Записываем запрос + monitor.record_request(123, True, 0.5) + + # Получаем статистику + stats = monitor.get_chat_stats(123) + assert stats is not None + assert stats.chat_id == 123 + assert stats.total_requests == 1 + + def test_success_rate_calculation(self): + """Тест расчета процента успеха""" + monitor = RateLimitMonitor() + + # 3 успешных, 1 неудачный + monitor.record_request(123, True, 0.1) + monitor.record_request(123, True, 0.2) + monitor.record_request(123, True, 0.3) + monitor.record_request(123, False, 0.4, "RetryAfter") + + stats = monitor.get_chat_stats(123) + assert stats.success_rate == 0.75 # 3/4 + assert stats.error_rate == 0.25 # 1/4 + + +class TestRateLimitConfig: + """Тесты для конфигурации rate limiting""" + + def test_get_rate_limit_config(self): + """Тест получения конфигурации""" + # Тест production конфигурации + prod_config = get_rate_limit_config("production") + assert prod_config.messages_per_second == 0.5 + assert prod_config.burst_limit == 2 + + # Тест development конфигурации + dev_config = get_rate_limit_config("development") + assert dev_config.messages_per_second == 1.0 + assert dev_config.burst_limit == 3 + + # Тест strict конфигурации + strict_config = get_rate_limit_config("strict") + assert strict_config.messages_per_second == 0.3 + assert strict_config.burst_limit == 1 + + # Тест неизвестной конфигурации (должна вернуть production) + unknown_config = get_rate_limit_config("unknown") + assert unknown_config.messages_per_second == 0.5 + + +@pytest.mark.asyncio +async def test_send_with_rate_limit_integration(): + """Интеграционный тест для send_with_rate_limit""" + mock_send_func = AsyncMock(return_value="message_sent") + + result = await send_with_rate_limit(mock_send_func, 123) + + assert result == "message_sent" + mock_send_func.assert_called_once() + + +if __name__ == "__main__": + pytest.main([__file__]) diff --git a/tests/test_utils.py b/tests/test_utils.py index 04011b3..7d388d5 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -295,7 +295,7 @@ class TestDownloadFile: with patch('os.path.getsize', return_value=1024): with patch('os.path.basename', return_value='file_123.jpg'): with patch('os.path.splitext', return_value=('file_123', '.jpg')): - with patch('helper_bot.utils.helper_func.metrics') as mock_metrics: + with patch('helper_bot.utils.metrics.metrics') as mock_metrics: result = await download_file(mock_message, "file_id_123", "photo") assert result == "files/photos/file_123.jpg"