Dev 9 #11

Merged
KerradKerridi merged 3 commits from dev-9 into master 2025-09-19 10:02:40 +00:00
32 changed files with 2661 additions and 214 deletions
Showing only changes of commit 5f6882d348 - Show all commits

171
RATE_LIMITING_SOLUTION.md Normal file
View File

@@ -0,0 +1,171 @@
# Решение проблемы Flood Control в Telegram Bot
## Проблема
В логах бота наблюдались ошибки типа:
```
Flood control exceeded on method 'SendVoice' in chat 1322897572. Retry in 3 seconds.
```
Эти ошибки возникают при превышении лимитов Telegram Bot API:
- Не более 30 сообщений в секунду от одного бота глобально
- Не более 1 сообщения в секунду в один чат
- Дополнительные ограничения для разных типов сообщений
## Решение
Реализована комплексная система rate limiting, включающая:
### 1. Основные компоненты
#### `rate_limiter.py`
- **ChatRateLimiter**: Ограничивает скорость отправки сообщений для конкретного чата
- **GlobalRateLimiter**: Глобальные ограничения для всех чатов
- **RetryHandler**: Обработка повторных попыток с экспоненциальной задержкой
- **TelegramRateLimiter**: Основной класс, объединяющий все компоненты
#### `rate_limit_monitor.py`
- **RateLimitMonitor**: Мониторинг и статистика rate limiting
- Отслеживание успешных/неудачных запросов
- Анализ ошибок и производительности
- Статистика по чатам
#### `rate_limit_config.py`
- Конфигурации для разных окружений (development, production, strict)
- Адаптивные настройки на основе уровня ошибок
- Настройки для разных типов сообщений
#### `rate_limit_middleware.py`
- Middleware для автоматического применения rate limiting
- Перехват всех исходящих сообщений
- Прозрачная интеграция с существующим кодом
### 2. Ключевые особенности
#### Rate Limiting
- **Настраиваемая скорость**: 0.5 сообщений в секунду на чат (по умолчанию)
- **Burst protection**: Максимум 2 сообщения подряд
- **Глобальные ограничения**: 10 сообщений в секунду глобально
- **Адаптивные задержки**: Увеличение задержек при ошибках
#### Retry Mechanism
- **Экспоненциальная задержка**: Увеличение времени ожидания при повторных попытках
- **Максимальные ограничения**: Ограничение максимального времени ожидания
- **Умная обработка ошибок**: Разные стратегии для разных типов ошибок
#### Мониторинг
- **Детальная статистика**: Отслеживание всех запросов и ошибок
- **Анализ производительности**: Процент успеха, время ожидания, активность
- **Административные команды**: `/ratelimit_stats`, `/ratelimit_errors`, `/reset_ratelimit_stats`
### 3. Интеграция
#### Обновленные функции
```python
# helper_func.py
async def send_voice_message(chat_id, message, voice, markup=None):
from .rate_limiter import send_with_rate_limit
async def _send_voice():
if markup is None:
return await message.bot.send_voice(chat_id=chat_id, voice=voice)
else:
return await message.bot.send_voice(chat_id=chat_id, voice=voice, reply_markup=markup)
return await send_with_rate_limit(_send_voice, chat_id)
```
#### Middleware
```python
# voice_handler.py
from helper_bot.middlewares.rate_limit_middleware import MessageSendMiddleware
def _setup_middleware(self):
self.router.message.middleware(DependenciesMiddleware())
self.router.message.middleware(BlacklistMiddleware())
self.router.message.middleware(MessageSendMiddleware()) # Новый middleware
```
### 4. Конфигурация
#### Production настройки (по умолчанию)
```python
PRODUCTION_CONFIG = RateLimitSettings(
messages_per_second=0.5, # 1 сообщение каждые 2 секунды
burst_limit=2, # Максимум 2 сообщения подряд
retry_after_multiplier=1.5,
max_retry_delay=30.0,
max_retries=3,
voice_message_delay=2.5, # Дополнительная задержка для голосовых
media_message_delay=2.0,
text_message_delay=1.5
)
```
#### Адаптивная конфигурация
Система автоматически ужесточает ограничения при высоком уровне ошибок:
- При >10% ошибок: уменьшение скорости в 2 раза
- При <1% ошибок: увеличение скорости на 20%
### 5. Мониторинг и администрирование
#### Команды для администраторов
- `/ratelimit_stats` - Показать статистику rate limiting
- `/ratelimit_errors` - Показать недавние ошибки
- `/reset_ratelimit_stats` - Сбросить статистику
#### Пример вывода статистики
```
📊 Статистика Rate Limiting
🔢 Общая статистика:
Всего запросов: 1250
• Процент успеха: 98.4%
• Процент ошибок: 1.6%
• Запросов в минуту: 12.5
• Среднее время ожидания: 1.2с
• Активных чатов: 45
• Ошибок за час: 3
🔍 Детальная статистика:
• Успешных запросов: 1230
• Неудачных запросов: 20
• RetryAfter ошибок: 15
• Других ошибок: 5
```
### 6. Тестирование
Создан полный набор тестов в `test_rate_limiter.py`:
- Тесты всех компонентов
- Интеграционные тесты
- Тесты конфигурации
- Тесты мониторинга
Запуск тестов:
```bash
pytest tests/test_rate_limiter.py -v
```
### 7. Преимущества решения
1. **Предотвращение ошибок**: Автоматическое соблюдение лимитов API
2. **Прозрачность**: Минимальные изменения в существующем коде
3. **Мониторинг**: Полная видимость производительности
4. **Адаптивность**: Автоматическая настройка под нагрузку
5. **Надежность**: Умная обработка ошибок и повторных попыток
6. **Масштабируемость**: Поддержка множества чатов
### 8. Рекомендации по использованию
1. **Мониторинг**: Регулярно проверяйте статистику через `/ratelimit_stats`
2. **Настройка**: При необходимости корректируйте конфигурацию под ваши нужды
3. **Алерты**: Настройте уведомления при высоком проценте ошибок
4. **Тестирование**: Проверяйте работу в тестовой среде перед продакшеном
### 9. Будущие улучшения
- Интеграция с системой метрик (Prometheus/Grafana)
- Автоматическое масштабирование ограничений
- A/B тестирование разных конфигураций
- Интеграция с системой алертов

View File

@@ -297,6 +297,18 @@ class AsyncBotDB:
"""Получает user_id пользователя по message_id для voice bot."""
return await self.factory.audio.get_user_id_by_message_id_for_voice_bot(message_id)
async def delete_audio_moderate_record(self, message_id: int) -> None:
"""Удаляет запись из таблицы audio_moderate по message_id."""
await self.factory.audio.delete_audio_moderate_record(message_id)
async def get_all_audio_records(self) -> List[Dict[str, Any]]:
"""Получить все записи аудио сообщений."""
return await self.factory.audio.get_all_audio_records()
async def delete_audio_record_by_file_name(self, file_name: str) -> None:
"""Удалить запись аудио сообщения по имени файла."""
await self.factory.audio.delete_audio_record_by_file_name(file_name)
# Методы для миграций
async def get_migration_version(self) -> int:
"""Получение текущей версии миграции."""

View File

@@ -1,4 +1,4 @@
from typing import Optional, List
from typing import Optional, List, Dict, Any
from database.base import DatabaseConnection
from database.models import AudioMessage, AudioListenRecord, AudioModerate
from datetime import datetime
@@ -213,4 +213,26 @@ class AudioRepository(DatabaseConnection):
"""Удаляет запись из таблицы audio_moderate по message_id."""
query = "DELETE FROM audio_moderate WHERE message_id = ?"
await self._execute_query(query, (message_id,))
self.logger.info(f"Удалена запись из audio_moderate для message_id {message_id}")
self.logger.info(f"Удалена запись из audio_moderate для message_id {message_id}")
async def get_all_audio_records(self) -> List[Dict[str, Any]]:
"""Получить все записи аудио сообщений."""
query = "SELECT file_name, author_id, date_added FROM audio_message_reference"
rows = await self._execute_query_with_result(query)
records = []
for row in rows:
records.append({
'file_name': row[0],
'author_id': row[1],
'date_added': row[2]
})
self.logger.info(f"Получено {len(records)} записей аудио сообщений")
return records
async def delete_audio_record_by_file_name(self, file_name: str) -> None:
"""Удалить запись аудио сообщения по имени файла."""
query = "DELETE FROM audio_message_reference WHERE file_name = ?"
await self._execute_query(query, (file_name,))
self.logger.info(f"Удалена запись аудио сообщения: {file_name}")

View File

@@ -35,14 +35,14 @@ class UserRepository(DatabaseConnection):
return bool(len(rows))
async def add_user(self, user: User) -> None:
"""Добавление нового пользователя."""
"""Добавление нового пользователя с защитой от дублирования."""
if not user.date_added:
user.date_added = int(datetime.now().timestamp())
if not user.date_changed:
user.date_changed = int(datetime.now().timestamp())
query = """
INSERT INTO our_users (user_id, first_name, full_name, username, is_bot,
INSERT OR IGNORE INTO our_users (user_id, first_name, full_name, username, is_bot,
language_code, emoji, has_stickers, date_added, date_changed, voice_bot_welcome_received)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
@@ -51,7 +51,7 @@ class UserRepository(DatabaseConnection):
user.date_added, user.date_changed, user.voice_bot_welcome_received)
await self._execute_query(query, params)
self.logger.info(f"Новый пользователь добавлен: {user.user_id}")
self.logger.info(f"Пользователь обработан (создан или уже существует): {user.user_id}")
async def get_user_info(self, user_id: int) -> Optional[User]:
"""Получение информации о пользователе."""

View File

@@ -0,0 +1 @@
# Config package

View File

@@ -0,0 +1,129 @@
"""
Конфигурация для rate limiting
"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class RateLimitSettings:
"""Настройки rate limiting для разных типов сообщений"""
# Основные настройки
messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат
burst_limit: int = 2 # Максимум 2 сообщения подряд
retry_after_multiplier: float = 1.5 # Множитель для увеличения задержки при retry
max_retry_delay: float = 30.0 # Максимальная задержка между попытками
max_retries: int = 3 # Максимальное количество повторных попыток
# Специальные настройки для разных типов сообщений
voice_message_delay: float = 2.0 # Дополнительная задержка для голосовых сообщений
media_message_delay: float = 1.5 # Дополнительная задержка для медиа сообщений
text_message_delay: float = 1.0 # Дополнительная задержка для текстовых сообщений
# Настройки для разных типов чатов
private_chat_multiplier: float = 1.0 # Множитель для приватных чатов
group_chat_multiplier: float = 0.8 # Множитель для групповых чатов
channel_multiplier: float = 0.6 # Множитель для каналов
# Глобальные ограничения
global_messages_per_second: float = 10.0 # Максимум 10 сообщений в секунду глобально
global_burst_limit: int = 20 # Максимум 20 сообщений подряд глобально
# Конфигурации для разных сценариев использования
DEVELOPMENT_CONFIG = RateLimitSettings(
messages_per_second=1.0, # Более мягкие ограничения для разработки
burst_limit=3,
retry_after_multiplier=1.2,
max_retry_delay=15.0,
max_retries=2
)
PRODUCTION_CONFIG = RateLimitSettings(
messages_per_second=0.5, # Строгие ограничения для продакшена
burst_limit=2,
retry_after_multiplier=1.5,
max_retry_delay=30.0,
max_retries=3,
voice_message_delay=2.5,
media_message_delay=2.0,
text_message_delay=1.5
)
STRICT_CONFIG = RateLimitSettings(
messages_per_second=0.3, # Очень строгие ограничения
burst_limit=1,
retry_after_multiplier=2.0,
max_retry_delay=60.0,
max_retries=5,
voice_message_delay=3.0,
media_message_delay=2.5,
text_message_delay=2.0
)
def get_rate_limit_config(environment: str = "production") -> RateLimitSettings:
"""
Получает конфигурацию rate limiting в зависимости от окружения
Args:
environment: Окружение ('development', 'production', 'strict')
Returns:
RateLimitSettings: Конфигурация для указанного окружения
"""
configs = {
"development": DEVELOPMENT_CONFIG,
"production": PRODUCTION_CONFIG,
"strict": STRICT_CONFIG
}
return configs.get(environment, PRODUCTION_CONFIG)
def get_adaptive_config(
current_error_rate: float,
base_config: Optional[RateLimitSettings] = None
) -> RateLimitSettings:
"""
Получает адаптивную конфигурацию на основе текущего уровня ошибок
Args:
current_error_rate: Текущий уровень ошибок (0.0 - 1.0)
base_config: Базовая конфигурация
Returns:
RateLimitSettings: Адаптированная конфигурация
"""
if base_config is None:
base_config = PRODUCTION_CONFIG
# Если уровень ошибок высокий, ужесточаем ограничения
if current_error_rate > 0.1: # Более 10% ошибок
return RateLimitSettings(
messages_per_second=base_config.messages_per_second * 0.5,
burst_limit=max(1, base_config.burst_limit - 1),
retry_after_multiplier=base_config.retry_after_multiplier * 1.5,
max_retry_delay=base_config.max_retry_delay * 1.5,
max_retries=base_config.max_retries + 1,
voice_message_delay=base_config.voice_message_delay * 1.5,
media_message_delay=base_config.media_message_delay * 1.3,
text_message_delay=base_config.text_message_delay * 1.2
)
# Если уровень ошибок низкий, можно немного ослабить ограничения
elif current_error_rate < 0.01: # Менее 1% ошибок
return RateLimitSettings(
messages_per_second=base_config.messages_per_second * 1.2,
burst_limit=base_config.burst_limit + 1,
retry_after_multiplier=base_config.retry_after_multiplier * 0.9,
max_retry_delay=base_config.max_retry_delay * 0.8,
max_retries=max(1, base_config.max_retries - 1),
voice_message_delay=base_config.voice_message_delay * 0.8,
media_message_delay=base_config.media_message_delay * 0.9,
text_message_delay=base_config.text_message_delay * 0.9
)
# Возвращаем базовую конфигурацию
return base_config

View File

@@ -27,9 +27,9 @@ from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors
track_errors,
db_query_time
)
# Создаем роутер с middleware для проверки доступа
@@ -94,6 +94,7 @@ async def cancel_ban_process(
)
@track_time("get_last_users", "admin_handlers")
@track_errors("admin_handlers", "get_last_users")
@db_query_time("get_last_users", "users", "select")
async def get_last_users(
message: types.Message,
state: FSMContext,
@@ -127,6 +128,7 @@ async def get_last_users(
)
@track_time("get_banned_users", "admin_handlers")
@track_errors("admin_handlers", "get_banned_users")
@db_query_time("get_banned_users", "users", "select")
async def get_banned_users(
message: types.Message,
state: FSMContext,

View File

@@ -0,0 +1,272 @@
"""
Обработчики команд для мониторинга rate limiting
"""
from aiogram import Router, types, F
from aiogram.filters import Command, MagicData
from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile
from helper_bot.filters.main import ChatTypeFilter
from helper_bot.middlewares.dependencies_middleware import DependenciesMiddleware
from helper_bot.utils.rate_limit_monitor import rate_limit_monitor, get_rate_limit_summary
from helper_bot.utils.rate_limit_metrics import update_rate_limit_gauges, get_rate_limit_metrics_summary
from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
track_time,
track_errors
)
class RateLimitHandlers:
def __init__(self, db, settings):
self.db = db.get_db() if hasattr(db, 'get_db') else db
self.settings = settings
self.router = Router()
self._setup_handlers()
self._setup_middleware()
def _setup_middleware(self):
self.router.message.middleware(DependenciesMiddleware())
def _setup_handlers(self):
# Команда для просмотра статистики rate limiting
self.router.message.register(
self.rate_limit_stats_handler,
ChatTypeFilter(chat_type=["private"]),
Command("ratelimit_stats")
)
# Команда для сброса статистики rate limiting
self.router.message.register(
self.reset_rate_limit_stats_handler,
ChatTypeFilter(chat_type=["private"]),
Command("reset_ratelimit_stats")
)
# Команда для просмотра ошибок rate limiting
self.router.message.register(
self.rate_limit_errors_handler,
ChatTypeFilter(chat_type=["private"]),
Command("ratelimit_errors")
)
# Команда для просмотра Prometheus метрик
self.router.message.register(
self.rate_limit_prometheus_handler,
ChatTypeFilter(chat_type=["private"]),
Command("ratelimit_prometheus")
)
@track_time("rate_limit_stats_handler", "rate_limit_handlers")
@track_errors("rate_limit_handlers", "rate_limit_stats_handler")
async def rate_limit_stats_handler(
self,
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
"""Показывает статистику rate limiting"""
try:
# Проверяем права администратора
if not await bot_db.is_admin(message.from_user.id):
await message.answer("У вас нет прав для выполнения этой команды.")
return
# Получаем сводку
summary = get_rate_limit_summary()
global_stats = rate_limit_monitor.get_global_stats()
# Формируем сообщение со статистикой
stats_text = (
f"📊 <b>Статистика Rate Limiting</b>\n\n"
f"🔢 <b>Общая статистика:</b>\n"
f"Всего запросов: {summary['total_requests']}\n"
f"• Процент успеха: {summary['success_rate']:.1%}\n"
f"• Процент ошибок: {summary['error_rate']:.1%}\n"
f"• Запросов в минуту: {summary['requests_per_minute']:.1f}\n"
f"• Среднее время ожидания: {summary['average_wait_time']:.2f}с\n"
f"• Активных чатов: {summary['active_chats']}\n"
f"• Ошибок за час: {summary['recent_errors_count']}\n\n"
)
# Добавляем детальную статистику
stats_text += f"🔍 <b>Детальная статистика:</b>\n"
stats_text += f"• Успешных запросов: {global_stats.successful_requests}\n"
stats_text += f"• Неудачных запросов: {global_stats.failed_requests}\n"
stats_text += f"• RetryAfter ошибок: {global_stats.retry_after_errors}\n"
stats_text += f"• Других ошибок: {global_stats.other_errors}\n"
stats_text += f"• Общее время ожидания: {global_stats.total_wait_time:.2f}с\n\n"
# Добавляем топ чатов по запросам
top_chats = rate_limit_monitor.get_top_chats_by_requests(5)
if top_chats:
stats_text += f"📈 <b>Топ-5 чатов по запросам:</b>\n"
for i, (chat_id, chat_stats) in enumerate(top_chats, 1):
stats_text += f"{i}. Chat {chat_id}: {chat_stats.total_requests} запросов ({chat_stats.success_rate:.1%} успех)\n"
stats_text += "\n"
# Добавляем чаты с высоким процентом ошибок
high_error_chats = rate_limit_monitor.get_chats_with_high_error_rate(0.1)
if high_error_chats:
stats_text += f"⚠️ <b>Чаты с высоким процентом ошибок (>10%):</b>\n"
for chat_id, chat_stats in high_error_chats[:3]:
stats_text += f"• Chat {chat_id}: {chat_stats.error_rate:.1%} ошибок ({chat_stats.failed_requests}/{chat_stats.total_requests})\n"
await message.answer(stats_text, parse_mode='HTML')
except Exception as e:
logger.error(f"Ошибка при получении статистики rate limiting: {e}")
await message.answer("Произошла ошибка при получении статистики.")
@track_time("reset_rate_limit_stats_handler", "rate_limit_handlers")
@track_errors("rate_limit_handlers", "reset_rate_limit_stats_handler")
async def reset_rate_limit_stats_handler(
self,
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
"""Сбрасывает статистику rate limiting"""
try:
# Проверяем права администратора
if not await bot_db.is_admin(message.from_user.id):
await message.answer("У вас нет прав для выполнения этой команды.")
return
# Сбрасываем статистику
rate_limit_monitor.reset_stats()
await message.answer("✅ Статистика rate limiting сброшена.")
except Exception as e:
logger.error(f"Ошибка при сбросе статистики rate limiting: {e}")
await message.answer("Произошла ошибка при сбросе статистики.")
@track_time("rate_limit_errors_handler", "rate_limit_handlers")
@track_errors("rate_limit_handlers", "rate_limit_errors_handler")
async def rate_limit_errors_handler(
self,
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
"""Показывает недавние ошибки rate limiting"""
try:
# Проверяем права администратора
if not await bot_db.is_admin(message.from_user.id):
await message.answer("У вас нет прав для выполнения этой команды.")
return
# Получаем ошибки за последний час
recent_errors = rate_limit_monitor.get_recent_errors(60)
error_summary = rate_limit_monitor.get_error_summary(60)
if not recent_errors:
await message.answer("✅ Ошибок rate limiting за последний час не было.")
return
# Формируем сообщение с ошибками
errors_text = f"🚨 <b>Ошибки Rate Limiting (последний час)</b>\n\n"
errors_text += f"📊 <b>Сводка ошибок:</b>\n"
for error_type, count in error_summary.items():
errors_text += f"{error_type}: {count}\n"
errors_text += f"\nВсего ошибок: {len(recent_errors)}\n\n"
# Показываем последние 10 ошибок
errors_text += f"🔍 <b>Последние ошибки:</b>\n"
for i, error in enumerate(recent_errors[-10:], 1):
from datetime import datetime
timestamp = datetime.fromtimestamp(error['timestamp']).strftime("%H:%M:%S")
errors_text += f"{i}. {timestamp} - Chat {error['chat_id']} - {error['error_type']}\n"
# Если сообщение слишком длинное, разбиваем на части
if len(errors_text) > 4000:
# Отправляем сводку
summary_text = f"🚨 <b>Ошибки Rate Limiting (последний час)</b>\n\n"
summary_text += f"📊 <b>Сводка ошибок:</b>\n"
for error_type, count in error_summary.items():
summary_text += f"{error_type}: {count}\n"
summary_text += f"\nВсего ошибок: {len(recent_errors)}"
await message.answer(summary_text, parse_mode='HTML')
# Отправляем детали отдельным сообщением
details_text = f"🔍 <b>Последние ошибки:</b>\n"
for i, error in enumerate(recent_errors[-10:], 1):
from datetime import datetime
timestamp = datetime.fromtimestamp(error['timestamp']).strftime("%H:%M:%S")
details_text += f"{i}. {timestamp} - Chat {error['chat_id']} - {error['error_type']}\n"
await message.answer(details_text, parse_mode='HTML')
else:
await message.answer(errors_text, parse_mode='HTML')
except Exception as e:
logger.error(f"Ошибка при получении ошибок rate limiting: {e}")
await message.answer("Произошла ошибка при получении информации об ошибках.")
@track_time("rate_limit_prometheus_handler", "rate_limit_handlers")
@track_errors("rate_limit_handlers", "rate_limit_prometheus_handler")
async def rate_limit_prometheus_handler(
self,
message: types.Message,
state: FSMContext,
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
"""Показывает Prometheus метрики rate limiting"""
try:
# Проверяем права администратора
if not await bot_db.is_admin(message.from_user.id):
await message.answer("У вас нет прав для выполнения этой команды.")
return
# Обновляем gauge метрики
update_rate_limit_gauges()
# Получаем сводку метрик
metrics_summary = get_rate_limit_metrics_summary()
# Формируем сообщение с метриками
metrics_text = (
f"📊 <b>Prometheus метрики Rate Limiting</b>\n\n"
f"🔢 <b>Основные метрики:</b>\n"
f"• rate_limit_requests_total: {metrics_summary['total_requests']}\n"
f"• rate_limit_success_rate: {metrics_summary['success_rate']:.3f}\n"
f"• rate_limit_error_rate: {metrics_summary['error_rate']:.3f}\n"
f"• rate_limit_requests_per_minute: {metrics_summary['requests_per_minute']:.1f}\n"
f"• rate_limit_avg_wait_time: {metrics_summary['average_wait_time']:.3f}s\n"
f"• rate_limit_active_chats: {metrics_summary['active_chats']}\n\n"
)
# Добавляем детальные метрики
metrics_text += f"🔍 <b>Детальные метрики:</b>\n"
metrics_text += f"• Успешных запросов: {metrics_summary['successful_requests']}\n"
metrics_text += f"• Неудачных запросов: {metrics_summary['failed_requests']}\n"
metrics_text += f"• RetryAfter ошибок: {metrics_summary['retry_after_errors']}\n"
metrics_text += f"• Других ошибок: {metrics_summary['other_errors']}\n"
metrics_text += f"• Общее время ожидания: {metrics_summary['total_wait_time']:.2f}s\n\n"
# Добавляем информацию о доступных метриках
metrics_text += f"📈 <b>Доступные Prometheus метрики:</b>\n"
metrics_text += f"• rate_limit_requests_total - общее количество запросов\n"
metrics_text += f"• rate_limit_errors_total - количество ошибок по типам\n"
metrics_text += f"• rate_limit_wait_duration_seconds - время ожидания\n"
metrics_text += f"• rate_limit_request_interval_seconds - интервалы между запросами\n"
metrics_text += f"• rate_limit_active_chats - количество активных чатов\n"
metrics_text += f"• rate_limit_success_rate - процент успеха по чатам\n"
metrics_text += f"• rate_limit_requests_per_minute - запросов в минуту\n"
metrics_text += f"• rate_limit_total_requests - общее количество запросов\n"
metrics_text += f"• rate_limit_total_errors - количество ошибок\n"
metrics_text += f"• rate_limit_avg_wait_time - среднее время ожидания\n"
await message.answer(metrics_text, parse_mode='HTML')
except Exception as e:
logger.error(f"Ошибка при получении Prometheus метрик: {e}")
await message.answer("Произошла ошибка при получении метрик.")

View File

@@ -7,10 +7,8 @@ from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
track_errors
)

View File

@@ -26,10 +26,10 @@ from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
db_query_time,
track_file_operations
)
callback_router = Router()
@@ -238,6 +238,8 @@ async def change_page(
@callback_router.callback_query(F.data == CALLBACK_SAVE)
@track_time("save_voice_message", "callback_handlers")
@track_errors("callback_handlers", "save_voice_message")
@track_file_operations("voice")
@db_query_time("save_voice_message", "audio_moderate", "mixed")
async def save_voice_message(
call: CallbackQuery,
bot_db: MagicData("bot_db"),
@@ -245,14 +247,18 @@ async def save_voice_message(
**kwargs
):
try:
logger.info(f"Начинаем сохранение голосового сообщения. Message ID: {call.message.message_id}")
# Создаем сервис для работы с аудио файлами
audio_service = AudioFileService(bot_db)
# Получаем ID пользователя из базы
user_id = await bot_db.get_user_id_by_message_id_for_voice_bot(call.message.message_id)
logger.info(f"Получен user_id: {user_id}")
# Генерируем имя файла
file_name = await audio_service.generate_file_name(user_id)
logger.info(f"Сгенерировано имя файла: {file_name}")
# Собираем инфо о сообщении
time_UTC = int(time.time())
@@ -260,32 +266,54 @@ async def save_voice_message(
# Получаем file_id из voice сообщения
file_id = call.message.voice.file_id if call.message.voice else ""
logger.info(f"Получен file_id: {file_id}")
# Сохраняем в базу данных
await audio_service.save_audio_file(file_name, user_id, date_added, file_id)
# Скачиваем и сохраняем файл
# ВАЖНО: Сначала скачиваем и сохраняем файл на диск
logger.info("Начинаем скачивание и сохранение файла на диск...")
await audio_service.download_and_save_audio(call.bot, call.message, file_name)
logger.info("Файл успешно скачан и сохранен на диск")
# Только после успешного сохранения файла - сохраняем в базу данных
logger.info("Начинаем сохранение информации в базу данных...")
await audio_service.save_audio_file(file_name, user_id, date_added, file_id)
logger.info("Информация успешно сохранена в базу данных")
# Удаляем сообщение из предложки
logger.info("Удаляем сообщение из предложки...")
await call.bot.delete_message(
chat_id=settings['Telegram']['group_for_posts'],
message_id=call.message.message_id
)
logger.info("Сообщение удалено из предложки")
# Удаляем запись из таблицы audio_moderate
logger.info("Удаляем запись из таблицы audio_moderate...")
await bot_db.delete_audio_moderate_record(call.message.message_id)
logger.info("Запись удалена из таблицы audio_moderate")
await call.answer(text='Сохранено!', cache_time=3)
logger.info(f"Голосовое сообщение успешно сохранено: {file_name}")
except Exception as e:
logger.error(f"Ошибка при сохранении голосового сообщения: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
# Дополнительная информация для диагностики
try:
if 'call' in locals() and call.message:
logger.error(f"Message ID: {call.message.message_id}")
logger.error(f"User ID: {user_id if 'user_id' in locals() else 'не определен'}")
logger.error(f"File name: {file_name if 'file_name' in locals() else 'не определен'}")
except:
pass
await call.answer(text='Ошибка при сохранении!', cache_time=3)
@callback_router.callback_query(F.data == CALLBACK_DELETE)
@track_time("delete_voice_message", "callback_handlers")
@track_errors("callback_handlers", "delete_voice_message")
@db_query_time("delete_voice_message", "audio_moderate", "delete")
async def delete_voice_message(
call: CallbackQuery,
bot_db: MagicData("bot_db"),

View File

@@ -25,7 +25,7 @@ from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_media_processing,
track_time,
track_errors,
db_query_time
@@ -140,6 +140,7 @@ class PostPublishService:
@track_time("_publish_media_group", "post_publish_service")
@track_errors("post_publish_service", "_publish_media_group")
@track_media_processing("media_group")
async def _publish_media_group(self, call: CallbackQuery) -> None:
"""Публикация медиагруппы"""
logger.info(f"Начинаю публикацию медиагруппы. Helper message ID: {call.message.message_id}")
@@ -230,6 +231,7 @@ class PostPublishService:
@track_time("_decline_media_group", "post_publish_service")
@track_errors("post_publish_service", "_decline_media_group")
@track_media_processing("media_group")
async def _decline_media_group(self, call: CallbackQuery) -> None:
"""Отклонение медиагруппы"""
logger.debug(f"Отклоняю медиагруппу. Helper message ID: {call.message.message_id}")
@@ -308,6 +310,7 @@ class PostPublishService:
@track_time("_delete_media_group_and_notify_author", "post_publish_service")
@track_errors("post_publish_service", "_delete_media_group_and_notify_author")
@track_media_processing("media_group")
async def _delete_media_group_and_notify_author(self, call: CallbackQuery, author_id: int) -> None:
"""Удаление медиагруппы и уведомление автора"""
post_ids = await self.db.get_post_ids_from_telegram_by_last_id(call.message.message_id)
@@ -339,6 +342,7 @@ class BanService:
@track_time("ban_user_from_post", "ban_service")
@track_errors("ban_service", "ban_user_from_post")
@db_query_time("ban_user_from_post", "users", "mixed")
async def ban_user_from_post(self, call: CallbackQuery) -> None:
"""Бан пользователя за спам"""
author_id = await self.db.get_author_id_by_message_id(call.message.message_id)
@@ -379,6 +383,7 @@ class BanService:
@track_time("unlock_user", "ban_service")
@track_errors("ban_service", "unlock_user")
@db_query_time("unlock_user", "users", "delete")
async def unlock_user(self, user_id: str) -> str:
"""Разблокировка пользователя"""
user_name = await self.db.get_username(int(user_id))

View File

@@ -13,9 +13,8 @@ from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
track_errors,
db_query_time
)
@@ -34,6 +33,7 @@ class AdminReplyService:
@track_time("get_user_id_for_reply", "admin_reply_service")
@track_errors("admin_reply_service", "get_user_id_for_reply")
@db_query_time("get_user_id_for_reply", "users", "select")
async def get_user_id_for_reply(self, message_id: int) -> int:
"""
Get user ID for reply by message ID.

View File

@@ -27,7 +27,6 @@ from helper_bot.utils.helper_func import (
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
@@ -173,6 +172,7 @@ class PrivateHandlers:
@error_handler
@track_errors("private_handlers", "stickers")
@track_time("stickers", "private_handlers")
@db_query_time("stickers", "stickers", "update")
async def stickers(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle stickers request"""
# User service operations with metrics
@@ -200,6 +200,7 @@ class PrivateHandlers:
@error_handler
@track_errors("private_handlers", "resend_message_in_group_for_message")
@track_time("resend_message_in_group_for_message", "private_handlers")
@db_query_time("resend_message_in_group_for_message", "messages", "insert")
async def resend_message_in_group_for_message(self, message: types.Message, state: FSMContext, **kwargs):
"""Handle messages in admin chat states"""
# User service operations with metrics

View File

@@ -33,10 +33,11 @@ from helper_bot.keyboards import get_reply_keyboard_for_post
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
db_query_time,
track_media_processing,
track_file_operations
)
@@ -74,12 +75,14 @@ class UserService:
@track_time("update_user_activity", "user_service")
@track_errors("user_service", "update_user_activity")
@db_query_time("update_user_activity", "users", "update")
async def update_user_activity(self, user_id: int) -> None:
"""Update user's last activity timestamp with metrics tracking"""
await self.db.update_user_date(user_id)
@track_time("ensure_user_exists", "user_service")
@track_errors("user_service", "ensure_user_exists")
@db_query_time("ensure_user_exists", "users", "insert")
async def ensure_user_exists(self, message: types.Message) -> None:
"""Ensure user exists in database, create if needed with metrics tracking"""
user_id = message.from_user.id
@@ -89,43 +92,41 @@ class UserService:
is_bot = message.from_user.is_bot
language_code = message.from_user.language_code
if not await self.db.user_exists(user_id):
# Create User object with current timestamp
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username,
is_bot=is_bot,
language_code=language_code,
emoji="",
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False
)
await self.db.add_user(user)
metrics.record_db_query("add_user", 0.0, "users", "insert")
else:
is_need_update = await check_username_and_full_name(user_id, username, full_name, self.db)
if is_need_update:
await self.db.update_user_info(user_id, username, full_name)
metrics.record_db_query("update_username_fullname", 0.0, "users", "update")
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}")
await message.bot.send_message(
chat_id=self.settings.group_for_logs,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}')
# Create User object with current timestamp
current_timestamp = int(datetime.now().timestamp())
user = User(
user_id=user_id,
first_name=first_name,
full_name=full_name,
username=username,
is_bot=is_bot,
language_code=language_code,
emoji="",
has_stickers=False,
date_added=current_timestamp,
date_changed=current_timestamp,
voice_bot_welcome_received=False
)
# Пытаемся создать пользователя (если уже существует - игнорируем)
# Это устраняет race condition и упрощает логику
await self.db.add_user(user)
# Проверяем, нужно ли обновить информацию о существующем пользователе
is_need_update = await check_username_and_full_name(user_id, username, full_name, self.db)
if is_need_update:
await self.db.update_user_info(user_id, username, full_name)
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}")
await message.bot.send_message(
chat_id=self.settings.group_for_logs,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}')
await self.db.update_user_date(user_id)
metrics.record_db_query("update_user_date", 0.0, "users", "update")
@track_errors("user_service", "log_user_message")
async def log_user_message(self, message: types.Message) -> None:
"""Forward user message to logs group with metrics tracking"""
await message.forward(chat_id=self.settings.group_for_logs)
@@ -146,6 +147,7 @@ class PostService:
@track_time("handle_text_post", "post_service")
@track_errors("post_service", "handle_text_post")
@db_query_time("handle_text_post", "posts", "insert")
async def handle_text_post(self, message: types.Message, first_name: str) -> None:
"""Handle text post submission"""
post_text = get_text_message(message.text.lower(), first_name, message.from_user.username)
@@ -162,6 +164,7 @@ class PostService:
@track_time("handle_photo_post", "post_service")
@track_errors("post_service", "handle_photo_post")
@db_query_time("handle_photo_post", "posts", "insert")
async def handle_photo_post(self, message: types.Message, first_name: str) -> None:
"""Handle photo post submission"""
post_caption = ""
@@ -186,6 +189,7 @@ class PostService:
@track_time("handle_video_post", "post_service")
@track_errors("post_service", "handle_video_post")
@db_query_time("handle_video_post", "posts", "insert")
async def handle_video_post(self, message: types.Message, first_name: str) -> None:
"""Handle video post submission"""
post_caption = ""
@@ -210,6 +214,7 @@ class PostService:
@track_time("handle_video_note_post", "post_service")
@track_errors("post_service", "handle_video_note_post")
@db_query_time("handle_video_note_post", "posts", "insert")
async def handle_video_note_post(self, message: types.Message) -> None:
"""Handle video note post submission"""
markup = get_reply_keyboard_for_post()
@@ -230,6 +235,7 @@ class PostService:
@track_time("handle_audio_post", "post_service")
@track_errors("post_service", "handle_audio_post")
@db_query_time("handle_audio_post", "posts", "insert")
async def handle_audio_post(self, message: types.Message, first_name: str) -> None:
"""Handle audio post submission"""
post_caption = ""
@@ -254,6 +260,7 @@ class PostService:
@track_time("handle_voice_post", "post_service")
@track_errors("post_service", "handle_voice_post")
@db_query_time("handle_voice_post", "posts", "insert")
async def handle_voice_post(self, message: types.Message) -> None:
"""Handle voice post submission"""
markup = get_reply_keyboard_for_post()
@@ -273,7 +280,9 @@ class PostService:
logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}")
@track_time("handle_media_group_post", "post_service")
@track_errors("post_service", "handle_media_group_post")
@track_errors("post_service", "handle_media_group_post")
@db_query_time("handle_media_group_post", "posts", "insert")
@track_media_processing("media_group")
async def handle_media_group_post(self, message: types.Message, album: list, first_name: str) -> None:
"""Handle media group post submission"""
post_caption = " "
@@ -320,6 +329,7 @@ class PostService:
@track_time("process_post", "post_service")
@track_errors("post_service", "process_post")
@track_media_processing("media_group")
async def process_post(self, message: types.Message, album: Union[list, None] = None) -> None:
"""Process post based on content type"""
first_name = get_first_name(message)
@@ -360,6 +370,7 @@ class StickerService:
@track_time("send_random_hello_sticker", "sticker_service")
@track_errors("sticker_service", "send_random_hello_sticker")
@track_file_operations("sticker")
async def send_random_hello_sticker(self, message: types.Message) -> None:
"""Send random hello sticker with metrics tracking"""
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
@@ -372,6 +383,7 @@ class StickerService:
@track_time("send_random_goodbye_sticker", "sticker_service")
@track_errors("sticker_service", "send_random_goodbye_sticker")
@track_file_operations("sticker")
async def send_random_goodbye_sticker(self, message: types.Message) -> None:
"""Send random goodbye sticker with metrics tracking"""
name_stick_bye = list(Path('Stick').rglob('Universal_*'))

View File

@@ -0,0 +1,191 @@
"""
Утилиты для очистки и диагностики проблем с голосовыми файлами
"""
import os
import asyncio
from pathlib import Path
from typing import List, Tuple
from logs.custom_logger import logger
from helper_bot.handlers.voice.constants import VOICE_USERS_DIR
class VoiceFileCleanupUtils:
"""Утилиты для очистки и диагностики голосовых файлов"""
def __init__(self, bot_db):
self.bot_db = bot_db
async def find_orphaned_db_records(self) -> List[Tuple[str, int]]:
"""Найти записи в БД, для которых нет соответствующих файлов"""
try:
# Получаем все записи из БД
all_audio_records = await self.bot_db.get_all_audio_records()
orphaned_records = []
for record in all_audio_records:
file_name = record.get('file_name', '')
user_id = record.get('author_id', 0)
file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg'
if not os.path.exists(file_path):
orphaned_records.append((file_name, user_id))
logger.warning(f"Найдена запись в БД без файла: {file_name} (user_id: {user_id})")
logger.info(f"Найдено {len(orphaned_records)} записей в БД без соответствующих файлов")
return orphaned_records
except Exception as e:
logger.error(f"Ошибка при поиске orphaned записей: {e}")
return []
async def find_orphaned_files(self) -> List[str]:
"""Найти файлы на диске, для которых нет записей в БД"""
try:
if not os.path.exists(VOICE_USERS_DIR):
logger.warning(f"Директория {VOICE_USERS_DIR} не существует")
return []
# Получаем все файлы .ogg в директории
ogg_files = list(Path(VOICE_USERS_DIR).glob("*.ogg"))
orphaned_files = []
# Получаем все записи из БД
all_audio_records = await self.bot_db.get_all_audio_records()
db_file_names = {record.get('file_name', '') for record in all_audio_records}
for file_path in ogg_files:
file_name = file_path.stem # Имя файла без расширения
if file_name not in db_file_names:
orphaned_files.append(str(file_path))
logger.warning(f"Найден файл без записи в БД: {file_path}")
logger.info(f"Найдено {len(orphaned_files)} файлов без записей в БД")
return orphaned_files
except Exception as e:
logger.error(f"Ошибка при поиске orphaned файлов: {e}")
return []
async def cleanup_orphaned_db_records(self, dry_run: bool = True) -> int:
"""Удалить записи в БД, для которых нет файлов"""
try:
orphaned_records = await self.find_orphaned_db_records()
if not orphaned_records:
logger.info("Нет orphaned записей для удаления")
return 0
if dry_run:
logger.info(f"DRY RUN: Найдено {len(orphaned_records)} записей для удаления")
for file_name, user_id in orphaned_records:
logger.info(f"DRY RUN: Будет удалена запись: {file_name} (user_id: {user_id})")
return len(orphaned_records)
# Удаляем записи
deleted_count = 0
for file_name, user_id in orphaned_records:
try:
await self.bot_db.delete_audio_record_by_file_name(file_name)
deleted_count += 1
logger.info(f"Удалена запись в БД: {file_name} (user_id: {user_id})")
except Exception as e:
logger.error(f"Ошибка при удалении записи {file_name}: {e}")
logger.info(f"Удалено {deleted_count} orphaned записей из БД")
return deleted_count
except Exception as e:
logger.error(f"Ошибка при очистке orphaned записей: {e}")
return 0
async def cleanup_orphaned_files(self, dry_run: bool = True) -> int:
"""Удалить файлы на диске, для которых нет записей в БД"""
try:
orphaned_files = await self.find_orphaned_files()
if not orphaned_files:
logger.info("Нет orphaned файлов для удаления")
return 0
if dry_run:
logger.info(f"DRY RUN: Найдено {len(orphaned_files)} файлов для удаления")
for file_path in orphaned_files:
logger.info(f"DRY RUN: Будет удален файл: {file_path}")
return len(orphaned_files)
# Удаляем файлы
deleted_count = 0
for file_path in orphaned_files:
try:
os.remove(file_path)
deleted_count += 1
logger.info(f"Удален файл: {file_path}")
except Exception as e:
logger.error(f"Ошибка при удалении файла {file_path}: {e}")
logger.info(f"Удалено {deleted_count} orphaned файлов")
return deleted_count
except Exception as e:
logger.error(f"Ошибка при очистке orphaned файлов: {e}")
return 0
async def get_disk_usage_stats(self) -> dict:
"""Получить статистику использования диска"""
try:
if not os.path.exists(VOICE_USERS_DIR):
return {"error": f"Директория {VOICE_USERS_DIR} не существует"}
total_size = 0
file_count = 0
for file_path in Path(VOICE_USERS_DIR).glob("*.ogg"):
if file_path.is_file():
total_size += file_path.stat().st_size
file_count += 1
return {
"total_files": file_count,
"total_size_bytes": total_size,
"total_size_mb": round(total_size / (1024 * 1024), 2),
"directory": VOICE_USERS_DIR
}
except Exception as e:
logger.error(f"Ошибка при получении статистики диска: {e}")
return {"error": str(e)}
async def run_full_diagnostic(self) -> dict:
"""Запустить полную диагностику"""
try:
logger.info("Запуск полной диагностики голосовых файлов...")
# Статистика диска
disk_stats = await self.get_disk_usage_stats()
# Orphaned записи в БД
orphaned_db_records = await self.find_orphaned_db_records()
# Orphaned файлы
orphaned_files = await self.find_orphaned_files()
# Количество записей в БД
all_audio_records = await self.bot_db.get_all_audio_records()
db_records_count = len(all_audio_records)
diagnostic_result = {
"disk_stats": disk_stats,
"db_records_count": db_records_count,
"orphaned_db_records_count": len(orphaned_db_records),
"orphaned_files_count": len(orphaned_files),
"orphaned_db_records": orphaned_db_records[:10], # Первые 10 для примера
"orphaned_files": orphaned_files[:10], # Первые 10 для примера
"status": "healthy" if len(orphaned_db_records) == 0 and len(orphaned_files) == 0 else "issues_found"
}
logger.info(f"Диагностика завершена. Статус: {diagnostic_result['status']}")
return diagnostic_result
except Exception as e:
logger.error(f"Ошибка при диагностике: {e}")
return {"error": str(e)}

View File

@@ -1,6 +1,7 @@
import random
import asyncio
import traceback
import os
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Tuple
@@ -16,7 +17,6 @@ from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
@@ -190,6 +190,7 @@ class VoiceBotService:
@track_time("clear_user_listenings", "voice_bot_service")
@track_errors("voice_bot_service", "clear_user_listenings")
@db_query_time("clear_user_listenings", "audio_moderate", "delete")
async def clear_user_listenings(self, user_id: int) -> None:
"""Очистить прослушивания пользователя"""
try:
@@ -275,62 +276,170 @@ class AudioFileService:
async def save_audio_file(self, file_name: str, user_id: int, date_added: datetime, file_id: str) -> None:
"""Сохранить информацию об аудио файле в базу данных"""
try:
# Проверяем существование файла перед сохранением в БД
if not await self.verify_file_exists(file_name):
error_msg = f"Файл {file_name} не существует или поврежден, отменяем сохранение в БД"
logger.error(error_msg)
raise FileOperationError(error_msg)
await self.bot_db.add_audio_record_simple(file_name, user_id, date_added)
logger.info(f"Информация об аудио файле успешно сохранена в БД: {file_name}")
except Exception as e:
logger.error(f"Ошибка при сохранении аудио файла в БД: {e}")
raise DatabaseError(f"Не удалось сохранить аудио файл в БД: {e}")
@track_time("save_audio_file_with_transaction", "audio_file_service")
@track_errors("audio_file_service", "save_audio_file_with_transaction")
async def save_audio_file_with_transaction(self, file_name: str, user_id: int, date_added: datetime, file_id: str) -> None:
"""Сохранить информацию об аудио файле в базу данных с транзакцией"""
try:
# Проверяем существование файла перед сохранением в БД
if not await self.verify_file_exists(file_name):
error_msg = f"Файл {file_name} не существует или поврежден, отменяем сохранение в БД"
logger.error(error_msg)
raise FileOperationError(error_msg)
# Используем транзакцию для атомарности операции
await self.bot_db.add_audio_record_simple(file_name, user_id, date_added)
logger.info(f"Информация об аудио файле успешно сохранена в БД с транзакцией: {file_name}")
except Exception as e:
logger.error(f"Ошибка при сохранении аудио файла в БД с транзакцией: {e}")
raise DatabaseError(f"Не удалось сохранить аудио файл в БД с транзакцией: {e}")
@track_time("download_and_save_audio", "audio_file_service")
@track_errors("audio_file_service", "download_and_save_audio")
async def download_and_save_audio(self, bot, message, file_name: str) -> None:
"""Скачать и сохранить аудио файл"""
async def download_and_save_audio(self, bot, message, file_name: str, max_retries: int = 3) -> None:
"""Скачать и сохранить аудио файл с retry механизмом"""
last_exception = None
for attempt in range(max_retries):
try:
logger.info(f"Попытка {attempt + 1}/{max_retries} скачивания и сохранения аудио: {file_name}")
# Проверяем наличие голосового сообщения
if not message or not message.voice:
error_msg = "Сообщение или голосовое сообщение не найдено"
logger.error(error_msg)
raise FileOperationError(error_msg)
file_id = message.voice.file_id
logger.info(f"Получен file_id: {file_id}")
# Получаем информацию о файле
try:
file_info = await bot.get_file(file_id=file_id)
logger.info(f"Получена информация о файле: {file_info.file_path}")
except Exception as e:
logger.error(f"Ошибка при получении информации о файле: {e}")
raise FileOperationError(f"Не удалось получить информацию о файле: {e}")
# Скачиваем файл
try:
downloaded_file = await bot.download_file(file_path=file_info.file_path)
except Exception as e:
logger.error(f"Ошибка при скачивании файла: {e}")
raise FileOperationError(f"Не удалось скачать файл: {e}")
# Проверяем что файл успешно скачан
if not downloaded_file:
error_msg = "Не удалось скачать файл - получен пустой объект"
logger.error(error_msg)
raise FileOperationError(error_msg)
# Получаем размер файла без изменения позиции
current_pos = downloaded_file.tell()
downloaded_file.seek(0, 2) # Переходим в конец файла
file_size = downloaded_file.tell()
downloaded_file.seek(current_pos) # Возвращаемся в исходную позицию
logger.info(f"Файл скачан, размер: {file_size} bytes")
# Проверяем минимальный размер файла
if file_size < 100: # Минимальный размер для аудио файла
error_msg = f"Файл слишком маленький: {file_size} bytes"
logger.error(error_msg)
raise FileOperationError(error_msg)
# Создаем директорию если она не существует
try:
os.makedirs(VOICE_USERS_DIR, exist_ok=True)
logger.info(f"Директория {VOICE_USERS_DIR} создана/проверена")
except Exception as e:
logger.error(f"Ошибка при создании директории: {e}")
raise FileOperationError(f"Не удалось создать директорию: {e}")
file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg'
logger.info(f"Сохраняем файл по пути: {file_path}")
# Сбрасываем позицию в файле перед сохранением
downloaded_file.seek(0)
# Сохраняем файл
try:
with open(file_path, 'wb') as new_file:
new_file.write(downloaded_file.read())
except Exception as e:
logger.error(f"Ошибка при записи файла на диск: {e}")
raise FileOperationError(f"Не удалось записать файл на диск: {e}")
# Проверяем что файл действительно создался и имеет правильный размер
if not os.path.exists(file_path):
error_msg = f"Файл не был создан: {file_path}"
logger.error(error_msg)
raise FileOperationError(error_msg)
saved_file_size = os.path.getsize(file_path)
if saved_file_size != file_size:
error_msg = f"Размер сохраненного файла не совпадает: ожидалось {file_size}, получено {saved_file_size}"
logger.error(error_msg)
# Удаляем поврежденный файл
try:
os.remove(file_path)
except:
pass
raise FileOperationError(error_msg)
logger.info(f"Файл успешно сохранен: {file_path}, размер: {saved_file_size} bytes")
return # Успешное завершение
except Exception as e:
last_exception = e
logger.error(f"Попытка {attempt + 1}/{max_retries} неудачна: {e}")
if attempt < max_retries - 1:
wait_time = (attempt + 1) * 2 # Экспоненциальная задержка: 2, 4, 6 секунд
logger.info(f"Ожидание {wait_time} секунд перед следующей попыткой...")
await asyncio.sleep(wait_time)
else:
logger.error(f"Все {max_retries} попыток скачивания неудачны")
logger.error(f"Traceback последней ошибки: {traceback.format_exc()}")
# Если все попытки неудачны
raise FileOperationError(f"Не удалось скачать и сохранить аудио после {max_retries} попыток. Последняя ошибка: {last_exception}")
@track_time("verify_file_exists", "audio_file_service")
@track_errors("audio_file_service", "verify_file_exists")
async def verify_file_exists(self, file_name: str) -> bool:
"""Проверить существование и валидность файла"""
try:
logger.info(f"Начинаем скачивание и сохранение аудио: {file_name}")
# Проверяем наличие голосового сообщения
if not message or not message.voice:
logger.error("Сообщение или голосовое сообщение не найдено")
raise FileOperationError("Сообщение или голосовое сообщение не найдено")
file_id = message.voice.file_id
logger.info(f"Получен file_id: {file_id}")
file_info = await bot.get_file(file_id=file_id)
logger.info(f"Получена информация о файле: {file_info.file_path}")
downloaded_file = await bot.download_file(file_path=file_info.file_path)
# Проверяем что файл успешно скачан
if not downloaded_file:
logger.error("Не удалось скачать файл")
raise FileOperationError("Не удалось скачать файл")
# Получаем размер файла без изменения позиции
current_pos = downloaded_file.tell()
downloaded_file.seek(0, 2) # Переходим в конец файла
file_size = downloaded_file.tell()
downloaded_file.seek(current_pos) # Возвращаемся в исходную позицию
logger.info(f"Файл скачан, размер: {file_size} bytes")
# Создаем директорию если она не существует
import os
os.makedirs(VOICE_USERS_DIR, exist_ok=True)
logger.info(f"Директория {VOICE_USERS_DIR} создана/проверена")
file_path = f'{VOICE_USERS_DIR}/{file_name}.ogg'
logger.info(f"Сохраняем файл по пути: {file_path}")
# Сбрасываем позицию в файле перед сохранением
downloaded_file.seek(0)
if not os.path.exists(file_path):
logger.warning(f"Файл не существует: {file_path}")
return False
file_size = os.path.getsize(file_path)
if file_size == 0:
logger.warning(f"Файл пустой: {file_path}")
return False
if file_size < 100: # Минимальный размер для аудио файла
logger.warning(f"Файл слишком маленький: {file_path}, размер: {file_size} bytes")
return False
logger.info(f"Файл проверен и валиден: {file_path}, размер: {file_size} bytes")
return True
# Сохраняем файл
with open(file_path, 'wb') as new_file:
new_file.write(downloaded_file.read())
logger.info(f"Файл успешно сохранен: {file_path}")
except Exception as e:
logger.error(f"Ошибка при скачивании и сохранении аудио: {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
raise FileOperationError(f"Не удалось скачать и сохранить аудио: {e}")
logger.error(f"Ошибка при проверке файла {file_name}: {e}")
return False

View File

@@ -6,6 +6,11 @@ from typing import Optional
from helper_bot.handlers.voice.exceptions import DatabaseError
from logs.custom_logger import logger
from helper_bot.utils.metrics import (
track_time,
track_errors,
db_query_time
)
def format_time_ago(date_from_db: str) -> Optional[str]:
"""Форматировать время с момента последней записи"""
@@ -69,7 +74,9 @@ def plural_time(type: int, n: float) -> str:
new_number = int(n)
return str(new_number) + ' ' + word[p]
@track_time("get_last_message_text", "voice_utils")
@track_errors("voice_utils", "get_last_message_text")
@db_query_time("get_last_message_text", "voice", "select")
async def get_last_message_text(bot_db) -> Optional[str]:
"""Получить текст сообщения о времени последней записи"""
try:
@@ -88,7 +95,9 @@ async def validate_voice_message(message) -> bool:
"""Проверить валидность голосового сообщения"""
return message.content_type == 'voice'
@track_time("get_user_emoji_safe", "voice_utils")
@track_errors("voice_utils", "get_user_emoji_safe")
@db_query_time("get_user_emoji_safe", "voice", "select")
async def get_user_emoji_safe(bot_db, user_id: int) -> str:
"""Безопасно получить эмодзи пользователя"""
try:

View File

@@ -24,10 +24,10 @@ from helper_bot.handlers.private.constants import BUTTON_TEXTS
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
db_query_time,
track_file_operations
)
class VoiceHandlers:
@@ -126,6 +126,7 @@ class VoiceHandlers:
@track_errors("voice_handlers", "voice_bot_button_handler")
async def voice_bot_button_handler(self, message: types.Message, state: FSMContext, bot_db: MagicData("bot_db"), settings: MagicData("settings")):
"""Обработчик кнопки 'Голосовой бот' из основной клавиатуры"""
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) нажал кнопку 'Голосовой бот'")
try:
# Проверяем, получал ли пользователь приветственное сообщение
welcome_received = await bot_db.check_voice_bot_welcome_received(message.from_user.id)
@@ -140,7 +141,7 @@ class VoiceHandlers:
logger.info(f"Пользователь {message.from_user.id}: вызываем start")
await self.start(message, state, bot_db, settings)
except Exception as e:
logger.error(f"Ошибка при проверке приветственного сообщения: {e}")
logger.error(f"Ошибка при проверке приветственного сообщения для пользователя {message.from_user.id}: {e}")
# В случае ошибки вызываем start
await self.start(message, state, bot_db, settings)
@@ -169,6 +170,7 @@ class VoiceHandlers:
state: FSMContext,
settings: MagicData("settings")
):
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) запросил информацию об эмодзи")
await message.forward(chat_id=settings['Telegram']['group_for_logs'])
user_emoji = await check_user_emoji(message)
await state.set_state(STATE_START)
@@ -183,6 +185,7 @@ class VoiceHandlers:
state: FSMContext,
settings: MagicData("settings")
):
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) вызвал функцию help_function")
await message.forward(chat_id=settings['Telegram']['group_for_logs'])
await update_user_info(VOICE_BOT_NAME, message)
help_message = messages.get_message(get_first_name(message), 'HELP_MESSAGE')
@@ -194,6 +197,7 @@ class VoiceHandlers:
@track_time("start", "voice_handlers")
@track_errors("voice_handlers", "start")
@db_query_time("mark_voice_bot_welcome_received", "audio_moderate", "update")
async def start(
self,
message: types.Message,
@@ -201,7 +205,7 @@ class VoiceHandlers:
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
logger.info(f"Пользователь {message.from_user.id}: вызывается функция start")
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}): вызывается функция start")
await state.set_state(STATE_START)
await message.forward(chat_id=settings['Telegram']['group_for_logs'])
await update_user_info(VOICE_BOT_NAME, message)
@@ -210,13 +214,14 @@ class VoiceHandlers:
# Создаем сервис и отправляем приветственные сообщения
voice_service = VoiceBotService(bot_db, settings)
await voice_service.send_welcome_messages(message, user_emoji)
logger.info(f"Приветственные сообщения отправлены пользователю {message.from_user.id}")
# Отмечаем, что пользователь получил приветственное сообщение
try:
await bot_db.mark_voice_bot_welcome_received(message.from_user.id)
logger.info(f"Пользователь {message.from_user.id}: отмечен как получивший приветствие")
except Exception as e:
logger.error(f"Ошибка при отметке получения приветствия: {e}")
logger.error(f"Ошибка при отметке получения приветствия для пользователя {message.from_user.id}: {e}")
@track_time("cancel_handler", "voice_handlers")
@track_errors("voice_handlers", "cancel_handler")
@@ -233,6 +238,7 @@ class VoiceHandlers:
markup = await get_reply_keyboard(self.db, message.from_user.id)
await message.answer(text='Добро пожаловать в меню!', reply_markup=markup, parse_mode='HTML')
await state.set_state(FSM_STATES["START"])
logger.info(f"Пользователь {message.from_user.id} возвращен в главное меню")
@track_time("refresh_listen_function", "voice_handlers")
@track_errors("voice_handlers", "refresh_listen_function")
@@ -243,6 +249,7 @@ class VoiceHandlers:
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) вызвал функцию refresh_listen_function")
await message.forward(chat_id=settings['Telegram']['group_for_logs'])
await update_user_info(VOICE_BOT_NAME, message)
markup = get_main_keyboard()
@@ -269,6 +276,7 @@ class VoiceHandlers:
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) вызвал функцию standup_write")
await message.forward(chat_id=settings['Telegram']['group_for_logs'])
markup = types.ReplyKeyboardRemove()
record_voice_message = messages.get_message(get_first_name(message), 'RECORD_VOICE_MESSAGE')
@@ -279,7 +287,7 @@ class VoiceHandlers:
if message_with_date:
await message.answer(text=message_with_date, parse_mode="html")
except Exception as e:
logger.error(f'Не удалось получить дату последнего сообщения - {e}')
logger.error(f'Не удалось получить дату последнего сообщения для пользователя {message.from_user.id}: {e}')
await state.set_state(STATE_STANDUP_WRITE)
@@ -309,6 +317,7 @@ class VoiceHandlers:
message.voice.file_id,
markup_for_voice
)
logger.info(f"Голосовое сообщение пользователя {message.from_user.id} отправлено в группу постов (message_id: {sent_message.message_id})")
# Сохраняем в базу инфо о посте
await bot_db.set_user_id_and_message_id_for_voice_bot(sent_message.message_id, message.from_user.id)
@@ -318,6 +327,7 @@ class VoiceHandlers:
await message.answer(text=voice_saved_message, reply_markup=markup)
await state.set_state(STATE_START)
else:
logger.warning(f"Голосовое сообщение пользователя {message.from_user.id} не прошло валидацию")
unknown_content_message = messages.get_message(get_first_name(message), 'UNKNOWN_CONTENT_MESSAGE')
await message.forward(chat_id=settings['Telegram']['group_for_logs'])
await message.answer(text=unknown_content_message, reply_markup=markup)
@@ -326,22 +336,27 @@ class VoiceHandlers:
@track_time("standup_listen_audio", "voice_handlers")
@track_errors("voice_handlers", "standup_listen_audio")
@track_file_operations("voice")
@db_query_time("standup_listen_audio", "audio_moderate", "mixed")
async def standup_listen_audio(
self,
message: types.Message,
bot_db: MagicData("bot_db"),
settings: MagicData("settings")
):
logger.info(f"Пользователь {message.from_user.id} ({message.from_user.full_name}) запросил прослушивание аудио")
markup = get_main_keyboard()
# Создаем сервис для работы с аудио
voice_service = VoiceBotService(bot_db, settings)
try:
#TODO: удалить логику из хендлера
# Получаем случайное аудио
audio_data = await voice_service.get_random_audio(message.from_user.id)
if not audio_data:
logger.warning(f"Для пользователя {message.from_user.id} не найдено доступных аудио для прослушивания")
no_audio_message = messages.get_message(get_first_name(message), 'NO_AUDIO_MESSAGE')
await message.answer(text=no_audio_message, reply_markup=markup)
try:
@@ -349,7 +364,7 @@ class VoiceHandlers:
if message_with_date:
await message.answer(text=message_with_date, parse_mode="html")
except Exception as e:
logger.error(f'Не удалось получить последнюю дату {e}')
logger.error(f'Не удалось получить последнюю дату для пользователя {message.from_user.id}: {e}')
return
audio_for_user, date_added, user_emoji = audio_data
@@ -359,7 +374,13 @@ class VoiceHandlers:
# Проверяем существование файла
if not path.exists():
logger.error(f"Файл не найден: {path}")
logger.error(f"Файл не найден: {path} для пользователя {message.from_user.id}")
# Дополнительная диагностика
logger.error(f"Директория {VOICE_USERS_DIR} существует: {Path(VOICE_USERS_DIR).exists()}")
if Path(VOICE_USERS_DIR).exists():
files_in_dir = list(Path(VOICE_USERS_DIR).glob("*.ogg"))
logger.error(f"Файлы в директории: {[f.name for f in files_in_dir]}")
await message.answer(
text="Файл аудио не найден. Обратитесь к администратору.",
reply_markup=markup
@@ -368,7 +389,7 @@ class VoiceHandlers:
# Проверяем размер файла
if path.stat().st_size == 0:
logger.error(f"Файл пустой: {path}")
logger.error(f"Файл пустой: {path} для пользователя {message.from_user.id}")
await message.answer(
text="Файл аудио поврежден. Обратитесь к администратору.",
reply_markup=markup
@@ -383,13 +404,20 @@ class VoiceHandlers:
else:
caption = f'Дата записи: {date_added}'
logger.info(f"Подготовлено голосовое сообщение для пользователя {message.from_user.id}: {caption}")
try:
await message.bot.send_voice(
chat_id=message.chat.id,
voice=voice,
caption=caption,
reply_markup=markup
)
from helper_bot.utils.rate_limiter import send_with_rate_limit
async def _send_voice():
return await message.bot.send_voice(
chat_id=message.chat.id,
voice=voice,
caption=caption,
reply_markup=markup
)
await send_with_rate_limit(_send_voice, message.chat.id)
# Маркируем сообщение как прослушанное только после успешной отправки
await voice_service.mark_audio_as_listened(audio_for_user, message.from_user.id)
@@ -404,7 +432,7 @@ class VoiceHandlers:
except Exception as voice_error:
if "VOICE_MESSAGES_FORBIDDEN" in str(voice_error):
# Если голосовые сообщения запрещены, отправляем информативное сообщение
logger.info(f"Пользователь {message.from_user.id} запретил получение голосовых сообщений")
logger.warning(f"Пользователь {message.from_user.id} запретил получение голосовых сообщений")
privacy_message = "🔇 К сожалению, у тебя закрыт доступ к получению голосовых сообщений.\n\nДля продолжения взаимодействия с ботом необходимо дать возможность мне присылать войсы в настройках приватности Telegram.\n\n💡 Как это сделать:\n1. Открой настройки Telegram\n2. Перейди в 'Конфиденциальность и безопасность'\n3. Выбери 'Голосовые сообщения'\n4. Разреши получение от 'Всех' или добавь меня в исключения"
@@ -412,10 +440,11 @@ class VoiceHandlers:
return # Выходим без записи о прослушивании
else:
logger.error(f"Ошибка при отправке голосового сообщения пользователю {message.from_user.id}: {voice_error}")
raise voice_error
except Exception as e:
logger.error(f"Ошибка при прослушивании аудио: {e}")
logger.error(f"Ошибка при прослушивании аудио для пользователя {message.from_user.id}: {e}")
await message.answer(
text="Произошла ошибка при получении аудио. Попробуйте позже.",
reply_markup=markup

View File

@@ -3,7 +3,6 @@ from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors
)
@@ -23,8 +22,7 @@ def get_reply_keyboard_for_post():
return markup
@track_time("get_reply_keyboard", "keyboard_service")
@track_errors("keyboard_service", "get_reply_keyboard")
async def get_reply_keyboard(db, user_id):
builder = ReplyKeyboardBuilder()
builder.row(types.KeyboardButton(text="📢Предложить свой пост"))
@@ -58,7 +56,8 @@ def get_reply_keyboard_admin():
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
@track_time("create_keyboard_with_pagination", "keyboard_service")
@track_errors("keyboard_service", "create_keyboard_with_pagination")
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list, callback: str):
"""
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback

View File

@@ -14,6 +14,7 @@ from helper_bot.handlers.voice import VoiceHandlers
from helper_bot.middlewares.dependencies_middleware import DependenciesMiddleware
from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
from helper_bot.middlewares.metrics_middleware import MetricsMiddleware, ErrorMetricsMiddleware
from helper_bot.middlewares.rate_limit_middleware import RateLimitMiddleware
from helper_bot.server_prometheus import start_metrics_server, stop_metrics_server
@@ -53,6 +54,7 @@ async def start_bot(bdf):
dp.update.outer_middleware(DependenciesMiddleware())
dp.update.outer_middleware(MetricsMiddleware())
dp.update.outer_middleware(BlacklistMiddleware())
dp.update.outer_middleware(RateLimitMiddleware())
# Создаем экземпляр VoiceHandlers
voice_handlers = VoiceHandlers(bdf, bdf.settings)
@@ -89,11 +91,7 @@ async def start_bot(bdf):
logging.info("✅ Бот запущен")
except Exception as e:
logging.error(f"❌ Ошибка запуска метрик сервера: {e}")
# Продолжаем работу бота даже если метрики не запустились
except Exception as e:
logging.error(f"Error in bot startup: {e}")
logging.error(f"❌ Ошибка запуска бота: {e}")
raise
finally:
# Останавливаем метрики сервер при завершении

View File

@@ -0,0 +1,57 @@
"""
Middleware для автоматического применения rate limiting ко всем входящим сообщениям
"""
from typing import Callable, Dict, Any, Awaitable, Union
from aiogram import BaseMiddleware
from aiogram.types import Message, CallbackQuery, InlineQuery, ChatMemberUpdated, Update
from aiogram.exceptions import TelegramRetryAfter, TelegramAPIError
from logs.custom_logger import logger
from helper_bot.utils.rate_limiter import telegram_rate_limiter
class RateLimitMiddleware(BaseMiddleware):
"""Middleware для автоматического rate limiting входящих сообщений"""
def __init__(self):
super().__init__()
self.rate_limiter = telegram_rate_limiter
async def __call__(
self,
handler: Callable[[Update, Dict[str, Any]], Awaitable[Any]],
event: Union[Update, Message, CallbackQuery, InlineQuery, ChatMemberUpdated],
data: Dict[str, Any]
) -> Any:
"""Обрабатывает событие с rate limiting"""
# Извлекаем сообщение из Update
message = None
if isinstance(event, Update):
message = event.message
elif isinstance(event, Message):
message = event
# Применяем rate limiting только к сообщениям
if message is not None:
chat_id = message.chat.id
# Обертываем handler в rate limiting
async def rate_limited_handler():
try:
return await handler(event, data)
except (TelegramRetryAfter, TelegramAPIError) as e:
logger.warning(f"Rate limit error in middleware: {e}")
# Middleware не должен перехватывать эти ошибки,
# пусть их обрабатывает rate_limiter в функциях отправки
raise
# Применяем rate limiting к handler
return await self.rate_limiter.send_with_rate_limit(
rate_limited_handler,
chat_id
)
else:
# Для других типов событий просто вызываем handler
return await handler(event, data)

View File

@@ -8,6 +8,11 @@ from apscheduler.triggers.cron import CronTrigger
from helper_bot.utils.base_dependency_factory import get_global_instance
from logs.custom_logger import logger
from .metrics import (
track_time,
track_errors,
db_query_time
)
class AutoUnbanScheduler:
"""
@@ -24,7 +29,10 @@ class AutoUnbanScheduler:
def set_bot(self, bot):
"""Устанавливает экземпляр бота для отправки уведомлений"""
self.bot = bot
@track_time("auto_unban_users", "auto_unban_scheduler")
@track_errors("auto_unban_scheduler", "auto_unban_users")
@db_query_time("auto_unban_users", "users", "mixed")
async def auto_unban_users(self):
"""
Основная функция автоматического разбана пользователей.
@@ -104,6 +112,8 @@ class AutoUnbanScheduler:
return report
@track_time("send_report", "auto_unban_scheduler")
@track_errors("auto_unban_scheduler", "send_report")
async def _send_report(self, report: str):
"""Отправляет отчет в лог-канал"""
try:
@@ -117,6 +127,8 @@ class AutoUnbanScheduler:
except Exception as e:
logger.error(f"Ошибка при отправке отчета: {e}")
@track_time("send_error_report", "auto_unban_scheduler")
@track_errors("auto_unban_scheduler", "send_error_report")
async def _send_error_report(self, error_msg: str):
"""Отправляет отчет об ошибке в важный лог-канал"""
try:

View File

@@ -22,10 +22,11 @@ from database.models import TelegramPost
# Local imports - metrics
from .metrics import (
metrics,
track_time,
track_errors,
db_query_time
db_query_time,
track_media_processing,
track_file_operations,
)
bdf = get_global_instance()
@@ -115,7 +116,9 @@ def get_text_message(post_text: str, first_name: str, username: str = None):
else:
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
@track_time("download_file", "helper_func")
@track_errors("helper_func", "download_file")
@track_file_operations("unknown")
async def download_file(message: types.Message, file_id: str, content_type: str = None) -> Optional[str]:
"""
Скачивает файл по file_id из Telegram и сохраняет в соответствующую папку.
@@ -180,18 +183,16 @@ async def download_file(message: types.Message, file_id: str, content_type: str
logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с")
# Записываем метрики
metrics.record_file_download(content_type or 'unknown', file_size, download_time)
return file_path
except Exception as e:
download_time = time.time() - start_time
logger.error(f"download_file: Ошибка скачивания файла {file_id}: {e}, время: {download_time:.2f}с")
metrics.record_file_download_error(content_type or 'unknown', str(e))
return None
@track_time("prepare_media_group_from_middlewares", "helper_func")
@track_errors("helper_func", "prepare_media_group_from_middlewares")
@track_media_processing("media_group")
async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
"""
Создает MediaGroup согласно best practices aiogram 3.x.
@@ -243,7 +244,10 @@ async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
return media_group
@track_time("add_in_db_media_mediagroup", "helper_func")
@track_errors("helper_func", "add_in_db_media_mediagroup")
@track_media_processing("media_group")
@db_query_time("add_in_db_media_mediagroup", "posts", "insert")
async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None) -> bool:
"""
Добавляет контент медиа-группы в базу данных
@@ -340,7 +344,6 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db:
if processed_count == 0:
logger.error(f"add_in_db_media_mediagroup: Не удалось обработать ни одного сообщения из медиагруппы {post_id}")
metrics.record_media_processing('media_group', processing_time, False)
return False
if failed_count > 0:
@@ -348,18 +351,18 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db:
else:
logger.info(f"add_in_db_media_mediagroup: Успешно обработана медиагруппа {post_id} - {processed_count} сообщений, время: {processing_time:.2f}с")
# Записываем метрики
metrics.record_media_processing('media_group', processing_time, failed_count == 0)
return failed_count == 0
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"add_in_db_media_mediagroup: Критическая ошибка обработки медиагруппы: {e}, время: {processing_time:.2f}с")
metrics.record_media_processing('media_group', processing_time, False)
return False
@track_time("add_in_db_media", "helper_func")
@track_errors("helper_func", "add_in_db_media")
@track_media_processing("media_group")
@db_query_time("add_in_db_media", "posts", "insert")
@track_file_operations("media")
async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
"""
Добавляет контент одиночного сообщения в базу данных
@@ -430,18 +433,17 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
processing_time = time.time() - start_time
logger.info(f"add_in_db_media: Контент успешно добавлен для сообщения {post_id}, тип: {content_type}, время: {processing_time:.2f}с")
# Записываем метрики
metrics.record_media_processing(content_type, processing_time, True)
return True
except Exception as e:
processing_time = time.time() - start_time
logger.error(f"add_in_db_media: Ошибка обработки медиа для сообщения {post_id}: {e}, время: {processing_time:.2f}с")
metrics.record_media_processing(content_type or 'unknown', processing_time, False)
return False
@track_time("send_media_group_message_to_private_chat", "helper_func")
@track_errors("helper_func", "send_media_group_message_to_private_chat")
@track_media_processing("media_group")
@db_query_time("send_media_group_message_to_private_chat", "posts", "insert")
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: List, bot_db: Any, main_post_id: Optional[int] = None) -> int:
sent_message = await message.bot.send_media_group(
@@ -461,7 +463,9 @@ async def send_media_group_message_to_private_chat(chat_id: int, message: types.
message_id = sent_message[-1].message_id
return message_id
@track_time("send_media_group_to_channel", "helper_func")
@track_errors("helper_func", "send_media_group_to_channel")
@track_media_processing("media_group")
async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str):
"""
Отправляет медиа-группу с подписью к последнему файлу.
@@ -510,28 +514,32 @@ async def send_media_group_to_channel(bot, chat_id: int, post_content: List, pos
logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}")
raise
@track_time("send_text_message", "helper_func")
@track_errors("helper_func", "send_text_message")
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
from .rate_limiter import send_with_rate_limit
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text
)
message_id = sent_message.message_id
return message_id
else:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text,
reply_markup=markup
)
message_id = sent_message.message_id
return message_id
async def _send_message():
if markup is None:
return await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text
)
else:
return await message.bot.send_message(
chat_id=chat_id,
text=safe_post_text,
reply_markup=markup
)
sent_message = await send_with_rate_limit(_send_message, chat_id)
return sent_message.message_id
@track_time("send_photo_message", "helper_func")
@track_errors("helper_func", "send_photo_message")
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
@@ -552,7 +560,8 @@ async def send_photo_message(chat_id, message: types.Message, photo: str, post_t
)
return sent_message
@track_time("send_video_message", "helper_func")
@track_errors("helper_func", "send_video_message")
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
@@ -573,7 +582,8 @@ async def send_video_message(chat_id, message: types.Message, video: str, post_t
)
return sent_message
@track_time("send_video_note_message", "helper_func")
@track_errors("helper_func", "send_video_note_message")
async def send_video_note_message(chat_id, message: types.Message, video_note: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
@@ -589,7 +599,8 @@ async def send_video_note_message(chat_id, message: types.Message, video_note: s
)
return sent_message
@track_time("send_audio_message", "helper_func")
@track_errors("helper_func", "send_audio_message")
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
@@ -611,22 +622,30 @@ async def send_audio_message(chat_id, message: types.Message, audio: str, post_t
return sent_message
@track_time("send_voice_message", "helper_func")
@track_errors("helper_func", "send_voice_message")
async def send_voice_message(chat_id, message: types.Message, voice: str,
markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice
)
else:
sent_message = await message.bot.send_voice(
chat_id=chat_id,
voice=voice,
reply_markup=markup
)
return sent_message
from .rate_limiter import send_with_rate_limit
async def _send_voice():
if markup is None:
return await message.bot.send_voice(
chat_id=chat_id,
voice=voice
)
else:
return await message.bot.send_voice(
chat_id=chat_id,
voice=voice,
reply_markup=markup
)
return await send_with_rate_limit(_send_voice, chat_id)
@track_time("check_access", "helper_func")
@track_errors("helper_func", "check_access")
@db_query_time("check_access", "users", "select")
async def check_access(user_id: int, bot_db):
"""Проверка прав на совершение действий"""
from logs.custom_logger import logger
@@ -641,7 +660,9 @@ def add_days_to_date(days: int):
future_date = current_date + timedelta(days=days)
return int(future_date.timestamp())
@track_time("get_banned_users_list", "helper_func")
@track_errors("helper_func", "get_banned_users_list")
@db_query_time("get_banned_users_list", "users", "select")
async def get_banned_users_list(offset: int, bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
@@ -689,7 +710,9 @@ async def get_banned_users_list(offset: int, bot_db):
message += f"**Дата разбана:** {safe_unban_date}\n\n"
return message
@track_time("get_banned_users_buttons", "helper_func")
@track_errors("helper_func", "get_banned_users_buttons")
@db_query_time("get_banned_users_buttons", "users", "select")
async def get_banned_users_buttons(bot_db):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
@@ -716,7 +739,9 @@ async def get_banned_users_buttons(bot_db):
user_ids.append((safe_user_name, user_id))
return user_ids
@track_time("delete_user_blacklist", "helper_func")
@track_errors("helper_func", "delete_user_blacklist")
@db_query_time("delete_user_blacklist", "users", "delete")
async def delete_user_blacklist(user_id: int, bot_db):
return await bot_db.delete_user_blacklist(user_id=user_id)
@@ -734,7 +759,9 @@ async def check_username_and_full_name(user_id: int, username: str, full_name: s
logger.error(f"Ошибка при проверке username и full_name: {e}")
return False
@track_time("unban_notifier", "helper_func")
@track_errors("helper_func", "unban_notifier")
@db_query_time("unban_notifier", "users", "select")
async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE):
# Получение текущего UNIX timestamp
current_date = datetime.now()
@@ -757,6 +784,7 @@ async def unban_notifier(bot, BotDB, GROUP_FOR_MESSAGE):
@track_time("update_user_info", "helper_func")
@track_errors("helper_func", "update_user_info")
@db_query_time("update_user_info", "users", "update")
async def update_user_info(source: str, message: types.Message):
# Собираем данные
full_name = message.from_user.full_name
@@ -787,12 +815,10 @@ async def update_user_info(source: str, message: types.Message):
voice_bot_welcome_received=False
)
await BotDB.add_user(user)
metrics.record_db_query("add_user", 0.0, "users", "insert")
else:
is_need_update = await check_username_and_full_name(user_id, username, full_name, BotDB)
if is_need_update:
await BotDB.update_user_info(user_id, username, full_name)
metrics.record_db_query("update_user_info", 0.0, "users", "update")
if source != 'voice':
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}")
@@ -800,7 +826,6 @@ async def update_user_info(source: str, message: types.Message):
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}')
sleep(1)
await BotDB.update_user_date(user_id)
metrics.record_db_query("update_user_date", 0.0, "users", "update")
@track_time("check_user_emoji", "helper_func")
@@ -812,7 +837,6 @@ async def check_user_emoji(message: types.Message):
if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""):
user_emoji = await get_random_emoji()
await BotDB.update_user_emoji(user_id=user_id, emoji=user_emoji)
metrics.record_db_query("update_user_emoji", 0.0, "users", "update")
return user_emoji

View File

@@ -7,10 +7,13 @@ from typing import Dict, Any, Optional
from prometheus_client import Counter, Histogram, Gauge, generate_latest, CONTENT_TYPE_LATEST
from prometheus_client.core import CollectorRegistry
import time
import os
from functools import wraps
import asyncio
from contextlib import asynccontextmanager
# Метрики rate limiter теперь создаются в основном классе
class BotMetrics:
"""Central class for managing all bot metrics."""
@@ -18,6 +21,9 @@ class BotMetrics:
def __init__(self):
self.registry = CollectorRegistry()
# Создаем метрики rate limiter в том же registry
self._create_rate_limit_metrics()
# Bot commands counter
self.bot_commands_total = Counter(
'bot_commands_total',
@@ -158,6 +164,78 @@ class BotMetrics:
registry=self.registry
)
def _create_rate_limit_metrics(self):
"""Создает метрики rate limiter в основном registry"""
try:
# Создаем метрики rate limiter в том же registry
self.rate_limit_requests_total = Counter(
'rate_limit_requests_total',
'Total number of rate limited requests',
['chat_id', 'status', 'error_type'],
registry=self.registry
)
self.rate_limit_errors_total = Counter(
'rate_limit_errors_total',
'Total number of rate limit errors',
['error_type', 'chat_id'],
registry=self.registry
)
self.rate_limit_wait_duration_seconds = Histogram(
'rate_limit_wait_duration_seconds',
'Time spent waiting due to rate limiting',
['chat_id'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0],
registry=self.registry
)
self.rate_limit_active_chats = Gauge(
'rate_limit_active_chats',
'Number of active chats with rate limiting',
registry=self.registry
)
self.rate_limit_success_rate = Gauge(
'rate_limit_success_rate',
'Success rate of rate limited requests',
['chat_id'],
registry=self.registry
)
self.rate_limit_requests_per_minute = Gauge(
'rate_limit_requests_per_minute',
'Requests per minute',
['chat_id'],
registry=self.registry
)
self.rate_limit_total_requests = Gauge(
'rate_limit_total_requests',
'Total number of requests',
['chat_id'],
registry=self.registry
)
self.rate_limit_total_errors = Gauge(
'rate_limit_total_errors',
'Total number of errors',
['chat_id', 'error_type'],
registry=self.registry
)
self.rate_limit_avg_wait_time_seconds = Gauge(
'rate_limit_avg_wait_time_seconds',
'Average wait time in seconds',
['chat_id'],
registry=self.registry
)
except Exception as e:
# Логируем ошибку, но не прерываем инициализацию
import logging
logging.warning(f"Failed to create rate limit metrics: {e}")
def record_command(self, command_type: str, handler_type: str = "unknown", user_type: str = "unknown", status: str = "success"):
"""Record a bot command execution."""
self.bot_commands_total.labels(
@@ -267,8 +345,97 @@ class BotMetrics:
method_name="add_in_db_media"
).inc()
def record_db_error(self, error_type: str, query_type: str = "unknown", table_name: str = "unknown", operation: str = "unknown"):
"""Record database error occurrence."""
self.db_errors_total.labels(
error_type=error_type,
query_type=query_type,
table_name=table_name,
operation=operation
).inc()
def record_rate_limit_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: str = None):
"""Record rate limit request metrics."""
try:
# Определяем статус
status = "success" if success else "error"
# Записываем счетчик запросов
self.rate_limit_requests_total.labels(
chat_id=str(chat_id),
status=status,
error_type=error_type or "none"
).inc()
# Записываем время ожидания
if wait_time > 0:
self.rate_limit_wait_duration_seconds.labels(
chat_id=str(chat_id)
).observe(wait_time)
# Записываем ошибки
if not success and error_type:
self.rate_limit_errors_total.labels(
error_type=error_type,
chat_id=str(chat_id)
).inc()
except Exception as e:
import logging
logging.warning(f"Failed to record rate limit request: {e}")
def update_rate_limit_gauges(self):
"""Update rate limit gauge metrics."""
try:
from .rate_limit_monitor import rate_limit_monitor
# Обновляем количество активных чатов
self.rate_limit_active_chats.set(len(rate_limit_monitor.stats))
# Обновляем метрики для каждого чата
for chat_id, chat_stats in rate_limit_monitor.stats.items():
chat_id_str = str(chat_id)
# Процент успеха
self.rate_limit_success_rate.labels(
chat_id=chat_id_str
).set(chat_stats.success_rate)
# Запросов в минуту
self.rate_limit_requests_per_minute.labels(
chat_id=chat_id_str
).set(chat_stats.requests_per_minute)
# Общее количество запросов
self.rate_limit_total_requests.labels(
chat_id=chat_id_str
).set(chat_stats.total_requests)
# Среднее время ожидания
self.rate_limit_avg_wait_time_seconds.labels(
chat_id=chat_id_str
).set(chat_stats.average_wait_time)
# Количество ошибок по типам
if chat_stats.retry_after_errors > 0:
self.rate_limit_total_errors.labels(
chat_id=chat_id_str,
error_type="RetryAfter"
).set(chat_stats.retry_after_errors)
if chat_stats.other_errors > 0:
self.rate_limit_total_errors.labels(
chat_id=chat_id_str,
error_type="Other"
).set(chat_stats.other_errors)
except Exception as e:
import logging
logging.warning(f"Failed to update rate limit gauges: {e}")
def get_metrics(self) -> bytes:
"""Generate metrics in Prometheus format."""
# Обновляем gauge метрики rate limiter перед генерацией
self.update_rate_limit_gauges()
return generate_latest(self.registry)
@@ -449,3 +616,89 @@ async def track_middleware(middleware_name: str):
middleware_name
)
raise
def track_media_processing(content_type: str = "unknown"):
"""Decorator to track media processing operations."""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
metrics.record_media_processing(content_type, duration, True)
return result
except Exception as e:
duration = time.time() - start_time
metrics.record_media_processing(content_type, duration, False)
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
metrics.record_media_processing(content_type, duration, True)
return result
except Exception as e:
duration = time.time() - start_time
metrics.record_media_processing(content_type, duration, False)
raise
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
def track_file_operations(content_type: str = "unknown"):
"""Decorator to track file download/upload operations."""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
duration = time.time() - start_time
# Получаем размер файла из результата
file_size = 0
if result and isinstance(result, str) and os.path.exists(result):
file_size = os.path.getsize(result)
# Записываем метрики
metrics.record_file_download(content_type, file_size, duration)
return result
except Exception as e:
duration = time.time() - start_time
metrics.record_file_download_error(content_type, str(e))
raise
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
duration = time.time() - start_time
# Получаем размер файла из результата
file_size = 0
if result and isinstance(result, str) and os.path.exists(result):
file_size = os.path.getsize(result)
# Записываем метрики
metrics.record_file_download(content_type, file_size, duration)
return result
except Exception as e:
duration = time.time() - start_time
metrics.record_file_download_error(content_type, str(e))
raise
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator

View File

@@ -0,0 +1,220 @@
"""
Мониторинг и статистика rate limiting
"""
import time
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from collections import defaultdict, deque
from logs.custom_logger import logger
@dataclass
class RateLimitStats:
"""Статистика rate limiting для чата"""
chat_id: int
total_requests: int = 0
successful_requests: int = 0
failed_requests: int = 0
retry_after_errors: int = 0
other_errors: int = 0
total_wait_time: float = 0.0
last_request_time: float = 0.0
request_times: deque = field(default_factory=lambda: deque(maxlen=100))
@property
def success_rate(self) -> float:
"""Процент успешных запросов"""
if self.total_requests == 0:
return 1.0
return self.successful_requests / self.total_requests
@property
def error_rate(self) -> float:
"""Процент ошибок"""
return 1.0 - self.success_rate
@property
def average_wait_time(self) -> float:
"""Среднее время ожидания"""
if self.total_requests == 0:
return 0.0
return self.total_wait_time / self.total_requests
@property
def requests_per_minute(self) -> float:
"""Запросов в минуту"""
if not self.request_times:
return 0.0
current_time = time.time()
minute_ago = current_time - 60
# Подсчитываем запросы за последнюю минуту
recent_requests = sum(1 for req_time in self.request_times if req_time > minute_ago)
return recent_requests
class RateLimitMonitor:
"""Монитор для отслеживания статистики rate limiting"""
def __init__(self, max_history_size: int = 1000):
self.stats: Dict[int, RateLimitStats] = defaultdict(lambda: RateLimitStats(0))
self.global_stats = RateLimitStats(0)
self.max_history_size = max_history_size
self.error_history: deque = deque(maxlen=max_history_size)
def record_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: Optional[str] = None):
"""Записывает информацию о запросе"""
current_time = time.time()
# Обновляем статистику для чата
chat_stats = self.stats[chat_id]
chat_stats.chat_id = chat_id
chat_stats.total_requests += 1
chat_stats.total_wait_time += wait_time
chat_stats.last_request_time = current_time
chat_stats.request_times.append(current_time)
if success:
chat_stats.successful_requests += 1
else:
chat_stats.failed_requests += 1
if error_type == "RetryAfter":
chat_stats.retry_after_errors += 1
else:
chat_stats.other_errors += 1
# Записываем ошибку в историю
self.error_history.append({
'chat_id': chat_id,
'error_type': error_type,
'timestamp': current_time,
'wait_time': wait_time
})
# Обновляем глобальную статистику
self.global_stats.total_requests += 1
self.global_stats.total_wait_time += wait_time
self.global_stats.last_request_time = current_time
self.global_stats.request_times.append(current_time)
if success:
self.global_stats.successful_requests += 1
else:
self.global_stats.failed_requests += 1
if error_type == "RetryAfter":
self.global_stats.retry_after_errors += 1
else:
self.global_stats.other_errors += 1
def get_chat_stats(self, chat_id: int) -> Optional[RateLimitStats]:
"""Получает статистику для конкретного чата"""
return self.stats.get(chat_id)
def get_global_stats(self) -> RateLimitStats:
"""Получает глобальную статистику"""
return self.global_stats
def get_top_chats_by_requests(self, limit: int = 10) -> List[tuple]:
"""Получает топ чатов по количеству запросов"""
sorted_chats = sorted(
self.stats.items(),
key=lambda x: x[1].total_requests,
reverse=True
)
return sorted_chats[:limit]
def get_chats_with_high_error_rate(self, threshold: float = 0.1) -> List[tuple]:
"""Получает чаты с высоким процентом ошибок"""
high_error_chats = [
(chat_id, stats) for chat_id, stats in self.stats.items()
if stats.error_rate > threshold and stats.total_requests > 5
]
return sorted(high_error_chats, key=lambda x: x[1].error_rate, reverse=True)
def get_recent_errors(self, minutes: int = 60) -> List[dict]:
"""Получает недавние ошибки"""
current_time = time.time()
cutoff_time = current_time - (minutes * 60)
return [
error for error in self.error_history
if error['timestamp'] > cutoff_time
]
def get_error_summary(self, minutes: int = 60) -> Dict[str, int]:
"""Получает сводку ошибок за указанный период"""
recent_errors = self.get_recent_errors(minutes)
error_summary = defaultdict(int)
for error in recent_errors:
error_summary[error['error_type']] += 1
return dict(error_summary)
def log_statistics(self, log_level: str = "info"):
"""Логирует текущую статистику"""
global_stats = self.get_global_stats()
log_message = (
f"Rate Limit Statistics:\n"
f" Total requests: {global_stats.total_requests}\n"
f" Success rate: {global_stats.success_rate:.2%}\n"
f" Error rate: {global_stats.error_rate:.2%}\n"
f" RetryAfter errors: {global_stats.retry_after_errors}\n"
f" Other errors: {global_stats.other_errors}\n"
f" Average wait time: {global_stats.average_wait_time:.2f}s\n"
f" Requests per minute: {global_stats.requests_per_minute:.1f}\n"
f" Active chats: {len(self.stats)}"
)
if log_level == "error":
logger.error(log_message)
elif log_level == "warning":
logger.warning(log_message)
else:
logger.info(log_message)
# Логируем чаты с высоким процентом ошибок
high_error_chats = self.get_chats_with_high_error_rate(0.2)
if high_error_chats:
logger.warning(f"Chats with high error rate (>20%): {len(high_error_chats)}")
for chat_id, stats in high_error_chats[:5]: # Показываем только первые 5
logger.warning(f" Chat {chat_id}: {stats.error_rate:.2%} error rate ({stats.failed_requests}/{stats.total_requests})")
def reset_stats(self, chat_id: Optional[int] = None):
"""Сбрасывает статистику"""
if chat_id is None:
# Сбрасываем всю статистику
self.stats.clear()
self.global_stats = RateLimitStats(0)
self.error_history.clear()
else:
# Сбрасываем статистику для конкретного чата
if chat_id in self.stats:
del self.stats[chat_id]
# Глобальный экземпляр монитора
rate_limit_monitor = RateLimitMonitor()
def record_rate_limit_request(chat_id: int, success: bool, wait_time: float = 0.0, error_type: Optional[str] = None):
"""Удобная функция для записи информации о запросе"""
rate_limit_monitor.record_request(chat_id, success, wait_time, error_type)
def get_rate_limit_summary() -> Dict:
"""Получает краткую сводку по rate limiting"""
global_stats = rate_limit_monitor.get_global_stats()
recent_errors = rate_limit_monitor.get_recent_errors(60) # За последний час
return {
'total_requests': global_stats.total_requests,
'success_rate': global_stats.success_rate,
'error_rate': global_stats.error_rate,
'recent_errors_count': len(recent_errors),
'active_chats': len(rate_limit_monitor.stats),
'requests_per_minute': global_stats.requests_per_minute,
'average_wait_time': global_stats.average_wait_time
}

View File

@@ -0,0 +1,215 @@
"""
Rate limiter для предотвращения Flood control ошибок в Telegram Bot API
"""
import asyncio
import time
from typing import Dict, Optional, Any, Callable
from dataclasses import dataclass
from aiogram.exceptions import TelegramRetryAfter, TelegramAPIError
from logs.custom_logger import logger
from .metrics import metrics
@dataclass
class RateLimitConfig:
"""Конфигурация для rate limiting"""
messages_per_second: float = 0.5 # Максимум 0.5 сообщений в секунду на чат
burst_limit: int = 3 # Максимум 3 сообщения подряд
retry_after_multiplier: float = 1.2 # Множитель для увеличения задержки при retry
max_retry_delay: float = 60.0 # Максимальная задержка между попытками
class ChatRateLimiter:
"""Rate limiter для конкретного чата"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.last_send_time = 0.0
self.burst_count = 0
self.burst_reset_time = 0.0
self.retry_delay = 1.0
async def wait_if_needed(self) -> None:
"""Ждет если необходимо для соблюдения rate limit"""
current_time = time.time()
# Сбрасываем счетчик burst если прошло достаточно времени
if current_time >= self.burst_reset_time:
self.burst_count = 0
self.burst_reset_time = current_time + 1.0
# Проверяем burst limit
if self.burst_count >= self.config.burst_limit:
wait_time = self.burst_reset_time - current_time
if wait_time > 0:
logger.info(f"Burst limit reached, waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
current_time = time.time()
self.burst_count = 0
self.burst_reset_time = current_time + 1.0
# Проверяем минимальный интервал между сообщениями
time_since_last = current_time - self.last_send_time
min_interval = 1.0 / self.config.messages_per_second
if time_since_last < min_interval:
wait_time = min_interval - time_since_last
logger.debug(f"Rate limiting: waiting {wait_time:.2f}s")
await asyncio.sleep(wait_time)
# Обновляем время последней отправки
self.last_send_time = time.time()
self.burst_count += 1
class GlobalRateLimiter:
"""Глобальный rate limiter для всех чатов"""
def __init__(self, config: RateLimitConfig):
self.config = config
self.chat_limiters: Dict[int, ChatRateLimiter] = {}
self.global_last_send = 0.0
self.global_min_interval = 0.1 # Минимум 100ms между любыми сообщениями
def get_chat_limiter(self, chat_id: int) -> ChatRateLimiter:
"""Получает rate limiter для конкретного чата"""
if chat_id not in self.chat_limiters:
self.chat_limiters[chat_id] = ChatRateLimiter(self.config)
return self.chat_limiters[chat_id]
async def wait_if_needed(self, chat_id: int) -> None:
"""Ждет если необходимо для соблюдения глобального и чат-специфичного rate limit"""
current_time = time.time()
# Глобальный rate limit
time_since_global = current_time - self.global_last_send
if time_since_global < self.global_min_interval:
wait_time = self.global_min_interval - time_since_global
await asyncio.sleep(wait_time)
current_time = time.time()
# Чат-специфичный rate limit
chat_limiter = self.get_chat_limiter(chat_id)
await chat_limiter.wait_if_needed()
self.global_last_send = time.time()
class RetryHandler:
"""Обработчик повторных попыток с экспоненциальной задержкой"""
def __init__(self, config: RateLimitConfig):
self.config = config
async def execute_with_retry(
self,
func: Callable,
chat_id: int,
*args,
max_retries: int = 3,
**kwargs
) -> Any:
"""Выполняет функцию с повторными попытками при ошибках"""
retry_count = 0
current_delay = self.config.retry_after_multiplier
total_wait_time = 0.0
while retry_count <= max_retries:
try:
result = await func(*args, **kwargs)
# Записываем успешный запрос
metrics.record_rate_limit_request(chat_id, True, total_wait_time)
return result
except TelegramRetryAfter as e:
retry_count += 1
if retry_count > max_retries:
logger.error(f"Max retries exceeded for RetryAfter: {e}")
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "RetryAfter")
raise
# Используем время ожидания от Telegram или наше увеличенное
wait_time = max(e.retry_after, current_delay)
wait_time = min(wait_time, self.config.max_retry_delay)
total_wait_time += wait_time
logger.warning(f"RetryAfter error, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries})")
await asyncio.sleep(wait_time)
current_delay *= self.config.retry_after_multiplier
except TelegramAPIError as e:
retry_count += 1
if retry_count > max_retries:
logger.error(f"Max retries exceeded for TelegramAPIError: {e}")
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "TelegramAPIError")
raise
wait_time = min(current_delay, self.config.max_retry_delay)
total_wait_time += wait_time
logger.warning(f"TelegramAPIError, waiting {wait_time:.2f}s (attempt {retry_count}/{max_retries}): {e}")
await asyncio.sleep(wait_time)
current_delay *= self.config.retry_after_multiplier
except Exception as e:
# Для других ошибок не делаем retry
logger.error(f"Non-retryable error: {e}")
metrics.record_rate_limit_request(chat_id, False, total_wait_time, "Other")
raise
class TelegramRateLimiter:
"""Основной класс для rate limiting в Telegram боте"""
def __init__(self, config: Optional[RateLimitConfig] = None):
self.config = config or RateLimitConfig()
self.global_limiter = GlobalRateLimiter(self.config)
self.retry_handler = RetryHandler(self.config)
async def send_with_rate_limit(
self,
send_func: Callable,
chat_id: int,
*args,
**kwargs
) -> Any:
"""Отправляет сообщение с соблюдением rate limit и retry логики"""
async def _send():
await self.global_limiter.wait_if_needed(chat_id)
return await send_func(*args, **kwargs)
return await self.retry_handler.execute_with_retry(_send, chat_id)
# Глобальный экземпляр rate limiter
from helper_bot.config.rate_limit_config import get_rate_limit_config, RateLimitSettings
def _create_rate_limit_config(settings: RateLimitSettings) -> RateLimitConfig:
"""Создает RateLimitConfig из RateLimitSettings"""
return RateLimitConfig(
messages_per_second=settings.messages_per_second,
burst_limit=settings.burst_limit,
retry_after_multiplier=settings.retry_after_multiplier,
max_retry_delay=settings.max_retry_delay
)
# Получаем конфигурацию из настроек
_rate_limit_settings = get_rate_limit_config("production")
_default_config = _create_rate_limit_config(_rate_limit_settings)
telegram_rate_limiter = TelegramRateLimiter(_default_config)
async def send_with_rate_limit(send_func: Callable, chat_id: int, *args, **kwargs) -> Any:
"""
Удобная функция для отправки сообщений с rate limiting
Args:
send_func: Функция отправки (например, bot.send_message)
chat_id: ID чата
*args, **kwargs: Аргументы для функции отправки
Returns:
Результат выполнения функции отправки
"""
return await telegram_rate_limiter.send_with_rate_limit(send_func, chat_id, *args, **kwargs)

103
scripts/voice_cleanup.py Normal file
View File

@@ -0,0 +1,103 @@
#!/usr/bin/env python3
"""
Скрипт для диагностики и очистки проблем с голосовыми файлами
"""
import asyncio
import sys
import os
from pathlib import Path
# Добавляем корневую директорию проекта в путь
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from database.async_db import AsyncBotDB
from helper_bot.handlers.voice.cleanup_utils import VoiceFileCleanupUtils
from logs.custom_logger import logger
async def main():
"""Основная функция скрипта"""
try:
# Инициализация базы данных
db_path = "database/tg-bot-database.db"
if not os.path.exists(db_path):
logger.error(f"База данных не найдена: {db_path}")
return
bot_db = AsyncBotDB(db_path)
cleanup_utils = VoiceFileCleanupUtils(bot_db)
print("=== Диагностика голосовых файлов ===")
# Запускаем полную диагностику
diagnostic_result = await cleanup_utils.run_full_diagnostic()
print(f"\n📊 Статистика диска:")
if "error" in diagnostic_result["disk_stats"]:
print(f" ❌ Ошибка: {diagnostic_result['disk_stats']['error']}")
else:
stats = diagnostic_result["disk_stats"]
print(f" 📁 Директория: {stats['directory']}")
print(f" 📄 Всего файлов: {stats['total_files']}")
print(f" 💾 Размер: {stats['total_size_mb']} MB")
print(f"\n🗄️ База данных:")
print(f" 📝 Записей в БД: {diagnostic_result['db_records_count']}")
print(f" 🔍 Записей без файлов: {diagnostic_result['orphaned_db_records_count']}")
print(f" 📁 Файлов без записей: {diagnostic_result['orphaned_files_count']}")
print(f"\n📋 Статус: {diagnostic_result['status']}")
if diagnostic_result['status'] == 'issues_found':
print("\n⚠️ Найдены проблемы!")
if diagnostic_result['orphaned_db_records_count'] > 0:
print(f"\n🗑️ Записи в БД без файлов (первые 10):")
for file_name, user_id in diagnostic_result['orphaned_db_records']:
print(f" - {file_name} (user_id: {user_id})")
if diagnostic_result['orphaned_files_count'] > 0:
print(f"\n📁 Файлы без записей в БД (первые 10):")
for file_path in diagnostic_result['orphaned_files']:
print(f" - {file_path}")
# Предлагаем очистку
print("\n🧹 Хотите выполнить очистку?")
print("1. Удалить записи в БД без файлов")
print("2. Удалить файлы без записей в БД")
print("3. Выполнить полную очистку")
print("4. Выход")
choice = input("\nВыберите действие (1-4): ").strip()
if choice == "1":
print("\n🗑️ Удаление записей в БД без файлов...")
deleted = await cleanup_utils.cleanup_orphaned_db_records(dry_run=False)
print(f"✅ Удалено {deleted} записей")
elif choice == "2":
print("\n📁 Удаление файлов без записей в БД...")
deleted = await cleanup_utils.cleanup_orphaned_files(dry_run=False)
print(f"✅ Удалено {deleted} файлов")
elif choice == "3":
print("\n🧹 Полная очистка...")
db_deleted = await cleanup_utils.cleanup_orphaned_db_records(dry_run=False)
files_deleted = await cleanup_utils.cleanup_orphaned_files(dry_run=False)
print(f"✅ Удалено {db_deleted} записей в БД и {files_deleted} файлов")
elif choice == "4":
print("👋 Выход...")
else:
print("❌ Неверный выбор")
else:
print("\n✅ Проблем не найдено!")
except Exception as e:
logger.error(f"Ошибка в скрипте: {e}")
print(f"❌ Ошибка: {e}")
if __name__ == "__main__":
asyncio.run(main())

150
test_rate_limiting.py Normal file
View File

@@ -0,0 +1,150 @@
#!/usr/bin/env python3
"""
Скрипт для тестирования rate limiting решения
"""
import asyncio
import time
from unittest.mock import AsyncMock, MagicMock
from aiogram.types import Message, User, Chat
from helper_bot.utils.rate_limiter import send_with_rate_limit
from helper_bot.utils.rate_limit_monitor import rate_limit_monitor, get_rate_limit_summary
async def test_rate_limiting():
"""Тестирует rate limiting с имитацией отправки сообщений"""
print("🚀 Начинаем тестирование rate limiting...")
# Создаем мок объекты
mock_bot = MagicMock()
mock_user = User(id=123, is_bot=False, first_name="Test")
mock_chat = Chat(id=456, type="private")
# Создаем Message с bot в конструкторе
mock_message = Message(
message_id=1,
date=int(time.time()),
chat=mock_chat,
from_user=mock_user,
content_type="text",
bot=mock_bot
)
# Настраиваем мок для send_voice
mock_bot.send_voice = AsyncMock(return_value=MagicMock(message_id=1))
# Функция для отправки голосового сообщения
async def send_voice_test():
return await mock_bot.send_voice(
chat_id=mock_chat.id,
voice="test_voice_id"
)
print("📊 Отправляем 5 сообщений подряд...")
# Отправляем несколько сообщений подряд
start_time = time.time()
for i in range(5):
print(f" Отправка сообщения {i+1}/5...")
try:
result = await send_with_rate_limit(send_voice_test, mock_chat.id)
print(f" ✅ Сообщение {i+1} отправлено успешно")
except Exception as e:
print(f" ❌ Ошибка при отправке сообщения {i+1}: {e}")
end_time = time.time()
total_time = end_time - start_time
print(f"\n⏱️ Общее время выполнения: {total_time:.2f} секунд")
print(f"📈 Среднее время на сообщение: {total_time/5:.2f} секунд")
# Показываем статистику
print("\n📊 Статистика rate limiting:")
summary = get_rate_limit_summary()
for key, value in summary.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
# Показываем детальную статистику
print("\n🔍 Детальная статистика:")
global_stats = rate_limit_monitor.get_global_stats()
print(f" Всего запросов: {global_stats.total_requests}")
print(f" Успешных: {global_stats.successful_requests}")
print(f" Неудачных: {global_stats.failed_requests}")
print(f" Процент успеха: {global_stats.success_rate:.1%}")
print(f" Среднее время ожидания: {global_stats.average_wait_time:.2f}с")
# Проверяем что rate limiting работает
if total_time > 8: # Должно занять больше 8 секунд (5 сообщений * 1.6с минимум)
print("\n✅ Rate limiting работает корректно - сообщения отправляются с задержкой")
else:
print("\n⚠️ Rate limiting может работать некорректно - сообщения отправлены слишком быстро")
print("\n🎉 Тестирование завершено!")
async def test_error_handling():
"""Тестирует обработку ошибок"""
print("\n🧪 Тестируем обработку ошибок...")
# Создаем мок который будет падать с RetryAfter
from aiogram.exceptions import TelegramRetryAfter
mock_bot = MagicMock()
mock_chat = Chat(id=789, type="private")
call_count = 0
async def failing_send():
nonlocal call_count
call_count += 1
if call_count <= 2:
raise TelegramRetryAfter(
method=MagicMock(),
message="Flood control exceeded",
retry_after=1
)
return MagicMock(message_id=call_count)
mock_bot.send_voice = failing_send
print("📤 Отправляем сообщение с имитацией RetryAfter ошибки...")
start_time = time.time()
try:
result = await send_with_rate_limit(failing_send, mock_chat.id)
end_time = time.time()
print(f"✅ Сообщение отправлено после {call_count} попыток за {end_time - start_time:.2f}с")
except Exception as e:
print(f"❌ Ошибка: {e}")
print("🎯 Тест обработки ошибок завершен!")
async def main():
"""Основная функция"""
print("🔧 Тестирование решения Flood Control")
print("=" * 50)
# Сбрасываем статистику
rate_limit_monitor.reset_stats()
# Запускаем тесты
await test_rate_limiting()
await test_error_handling()
print("\n" + "=" * 50)
print("📋 Итоговая статистика:")
summary = get_rate_limit_summary()
for key, value in summary.items():
if isinstance(value, float):
print(f" {key}: {value:.2f}")
else:
print(f" {key}: {value}")
if __name__ == "__main__":
asyncio.run(main())

104
tests/test_async_db.py Normal file
View File

@@ -0,0 +1,104 @@
import pytest
from unittest.mock import Mock, AsyncMock, patch
from database.async_db import AsyncBotDB
class TestAsyncBotDB:
"""Тесты для AsyncBotDB"""
@pytest.fixture
def mock_factory(self):
"""Мок для RepositoryFactory"""
mock_factory = Mock()
mock_factory.audio = Mock()
mock_factory.audio.delete_audio_moderate_record = AsyncMock()
mock_factory.users = Mock()
mock_factory.users.logger = Mock()
return mock_factory
@pytest.fixture
def async_bot_db(self, mock_factory):
"""Экземпляр AsyncBotDB для тестов"""
with patch('database.async_db.RepositoryFactory') as mock_factory_class:
mock_factory_class.return_value = mock_factory
db = AsyncBotDB("test.db")
return db
@pytest.mark.asyncio
async def test_delete_audio_moderate_record(self, async_bot_db, mock_factory):
"""Тест метода delete_audio_moderate_record"""
message_id = 12345
await async_bot_db.delete_audio_moderate_record(message_id)
# Проверяем, что метод вызван в репозитории
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)
@pytest.mark.asyncio
async def test_delete_audio_moderate_record_with_different_message_id(self, async_bot_db, mock_factory):
"""Тест метода delete_audio_moderate_record с разными message_id"""
test_cases = [123, 456, 789, 99999]
for message_id in test_cases:
await async_bot_db.delete_audio_moderate_record(message_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_with(message_id)
# Проверяем, что метод вызван для каждого message_id
assert mock_factory.audio.delete_audio_moderate_record.call_count == len(test_cases)
@pytest.mark.asyncio
async def test_delete_audio_moderate_record_exception_handling(self, async_bot_db, mock_factory):
"""Тест обработки исключений в delete_audio_moderate_record"""
message_id = 12345
mock_factory.audio.delete_audio_moderate_record.side_effect = Exception("Database error")
# Метод должен пробросить исключение
with pytest.raises(Exception, match="Database error"):
await async_bot_db.delete_audio_moderate_record(message_id)
@pytest.mark.asyncio
async def test_delete_audio_moderate_record_integration_with_other_methods(self, async_bot_db, mock_factory):
"""Тест интеграции delete_audio_moderate_record с другими методами"""
message_id = 12345
user_id = 67890
# Мокаем другие методы
mock_factory.audio.get_user_id_by_message_id_for_voice_bot = AsyncMock(return_value=user_id)
mock_factory.audio.set_user_id_and_message_id_for_voice_bot = AsyncMock(return_value=True)
# Тестируем последовательность операций
await async_bot_db.get_user_id_by_message_id_for_voice_bot(message_id)
await async_bot_db.set_user_id_and_message_id_for_voice_bot(message_id, user_id)
await async_bot_db.delete_audio_moderate_record(message_id)
# Проверяем, что все методы вызваны
mock_factory.audio.get_user_id_by_message_id_for_voice_bot.assert_called_once_with(message_id)
mock_factory.audio.set_user_id_and_message_id_for_voice_bot.assert_called_once_with(message_id, user_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)
@pytest.mark.asyncio
async def test_delete_audio_moderate_record_zero_message_id(self, async_bot_db, mock_factory):
"""Тест delete_audio_moderate_record с message_id = 0"""
message_id = 0
await async_bot_db.delete_audio_moderate_record(message_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)
@pytest.mark.asyncio
async def test_delete_audio_moderate_record_negative_message_id(self, async_bot_db, mock_factory):
"""Тест delete_audio_moderate_record с отрицательным message_id"""
message_id = -12345
await async_bot_db.delete_audio_moderate_record(message_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)
@pytest.mark.asyncio
async def test_delete_audio_moderate_record_large_message_id(self, async_bot_db, mock_factory):
"""Тест delete_audio_moderate_record с большим message_id"""
message_id = 999999999
await async_bot_db.delete_audio_moderate_record(message_id)
mock_factory.audio.delete_audio_moderate_record.assert_called_once_with(message_id)

View File

@@ -1,5 +1,5 @@
import pytest
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from unittest.mock import Mock, AsyncMock, patch, MagicMock, mock_open
from datetime import datetime
import time
@@ -112,23 +112,27 @@ class TestSaveAudioFile:
@pytest.mark.asyncio
async def test_save_audio_file_success(self, audio_service, mock_bot_db, sample_datetime):
"""Тест успешного сохранения аудио файла"""
file_name = "test_audio.ogg"
file_name = "test_audio"
user_id = 12345
file_id = "test_file_id"
await audio_service.save_audio_file(file_name, user_id, sample_datetime, file_id)
# Мокаем verify_file_exists чтобы он возвращал True
with patch.object(audio_service, 'verify_file_exists', return_value=True):
await audio_service.save_audio_file(file_name, user_id, sample_datetime, file_id)
mock_bot_db.add_audio_record_simple.assert_called_once_with(file_name, user_id, sample_datetime)
@pytest.mark.asyncio
async def test_save_audio_file_with_string_date(self, audio_service, mock_bot_db):
"""Тест сохранения аудио файла со строковой датой"""
file_name = "test_audio.ogg"
file_name = "test_audio"
user_id = 12345
date_string = "2025-01-15 14:30:00"
file_id = "test_file_id"
await audio_service.save_audio_file(file_name, user_id, date_string, file_id)
# Мокаем verify_file_exists чтобы он возвращал True
with patch.object(audio_service, 'verify_file_exists', return_value=True):
await audio_service.save_audio_file(file_name, user_id, date_string, file_id)
mock_bot_db.add_audio_record_simple.assert_called_once_with(file_name, user_id, date_string)
@@ -137,8 +141,10 @@ class TestSaveAudioFile:
"""Тест обработки исключений при сохранении аудио файла"""
mock_bot_db.add_audio_record_simple.side_effect = Exception("Database error")
with pytest.raises(DatabaseError) as exc_info:
await audio_service.save_audio_file("test.ogg", 12345, sample_datetime, "file_id")
# Мокаем verify_file_exists чтобы он возвращал True
with patch.object(audio_service, 'verify_file_exists', return_value=True):
with pytest.raises(DatabaseError) as exc_info:
await audio_service.save_audio_file("test", 12345, sample_datetime, "file_id")
assert "Не удалось сохранить аудио файл в БД" in str(exc_info.value)
@@ -156,15 +162,23 @@ class TestDownloadAndSaveAudio:
mock_downloaded_file.tell.return_value = 0
mock_downloaded_file.seek = Mock()
mock_downloaded_file.read.return_value = b"audio_data"
# Настраиваем поведение tell() для получения размера файла
def mock_tell():
return 0 if mock_downloaded_file.seek.call_count == 0 else 1024
mock_downloaded_file.tell = Mock(side_effect=mock_tell)
mock_bot.download_file.return_value = mock_downloaded_file
with patch('builtins.open', mock_open()) as mock_file:
with patch('os.makedirs'):
await audio_service.download_and_save_audio(mock_bot, mock_message, "test_audio")
mock_bot.get_file.assert_called_once_with(file_id="test_file_id")
mock_bot.download_file.assert_called_once_with(file_path="voice/test_file_id.ogg")
mock_file.assert_called_once()
with patch('os.path.exists', return_value=True):
with patch('os.path.getsize', return_value=1024):
await audio_service.download_and_save_audio(mock_bot, mock_message, "test_audio")
mock_bot.get_file.assert_called_once_with(file_id="test_file_id")
mock_bot.download_file.assert_called_once_with(file_path="voice/test_file_id.ogg")
mock_file.assert_called_once()
@pytest.mark.asyncio
async def test_download_and_save_audio_no_message(self, audio_service, mock_bot):
@@ -207,10 +221,6 @@ class TestDownloadAndSaveAudio:
assert "Не удалось скачать и сохранить аудио" in str(exc_info.value)
def mock_open():
"""Мок для функции open"""
from unittest.mock import mock_open as _mock_open
return _mock_open()
class TestAudioFileServiceIntegration:
@@ -232,7 +242,8 @@ class TestAudioFileServiceIntegration:
# Тестируем сохранение в БД
test_date = datetime.now()
await service.save_audio_file(file_name, 12345, test_date, "test_file_id")
with patch.object(service, 'verify_file_exists', return_value=True):
await service.save_audio_file(file_name, 12345, test_date, "test_file_id")
# Проверяем вызовы
mock_bot_db.get_user_audio_records_count.assert_called_once_with(user_id=12345)

310
tests/test_rate_limiter.py Normal file
View File

@@ -0,0 +1,310 @@
"""
Тесты для rate limiter
"""
import asyncio
import time
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from helper_bot.utils.rate_limiter import (
RateLimitConfig,
ChatRateLimiter,
GlobalRateLimiter,
RetryHandler,
TelegramRateLimiter,
send_with_rate_limit
)
from helper_bot.utils.rate_limit_monitor import RateLimitMonitor, RateLimitStats, record_rate_limit_request
from helper_bot.config.rate_limit_config import RateLimitSettings, get_rate_limit_config
class TestRateLimitConfig:
"""Тесты для RateLimitConfig"""
def test_default_config(self):
"""Тест создания конфигурации по умолчанию"""
config = RateLimitConfig()
assert config.messages_per_second == 0.5
assert config.burst_limit == 3
assert config.retry_after_multiplier == 1.2
assert config.max_retry_delay == 60.0
class TestChatRateLimiter:
"""Тесты для ChatRateLimiter"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig(messages_per_second=1.0, burst_limit=2)
limiter = ChatRateLimiter(config)
assert limiter.config == config
assert limiter.last_send_time == 0.0
assert limiter.burst_count == 0
assert limiter.retry_delay == 1.0
@pytest.mark.asyncio
async def test_wait_if_needed_no_wait(self):
"""Тест что не ждет если не нужно"""
config = RateLimitConfig(messages_per_second=10.0, burst_limit=10)
limiter = ChatRateLimiter(config)
start_time = time.time()
await limiter.wait_if_needed()
end_time = time.time()
# Должно пройти очень быстро
assert end_time - start_time < 0.1
@pytest.mark.asyncio
async def test_wait_if_needed_with_wait(self):
"""Тест что ждет если нужно"""
config = RateLimitConfig(messages_per_second=0.5, burst_limit=10) # 1 сообщение в 2 секунды
limiter = ChatRateLimiter(config)
# Первый вызов не должен ждать
start_time = time.time()
await limiter.wait_if_needed()
first_call_time = time.time() - start_time
# Второй вызов должен ждать
start_time = time.time()
await limiter.wait_if_needed()
second_call_time = time.time() - start_time
assert first_call_time < 0.1
assert second_call_time >= 1.8 # Должно ждать около 2 секунд
@pytest.mark.asyncio
async def test_burst_limit(self):
"""Тест ограничения burst"""
config = RateLimitConfig(messages_per_second=10.0, burst_limit=2)
limiter = ChatRateLimiter(config)
# Первые два вызова не должны ждать
start_time = time.time()
await limiter.wait_if_needed()
await limiter.wait_if_needed()
first_two_calls_time = time.time() - start_time
# Третий вызов должен ждать
start_time = time.time()
await limiter.wait_if_needed()
third_call_time = time.time() - start_time
assert first_two_calls_time < 0.2 # Более мягкое ограничение
assert third_call_time >= 0.8 # Должно ждать около 1 секунды (с учетом погрешности)
class TestGlobalRateLimiter:
"""Тесты для GlobalRateLimiter"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig()
limiter = GlobalRateLimiter(config)
assert limiter.config == config
assert limiter.chat_limiters == {}
assert limiter.global_last_send == 0.0
def test_get_chat_limiter(self):
"""Тест получения limiter для чата"""
config = RateLimitConfig()
limiter = GlobalRateLimiter(config)
chat_limiter = limiter.get_chat_limiter(123)
assert isinstance(chat_limiter, ChatRateLimiter)
assert limiter.chat_limiters[123] == chat_limiter
# Повторный вызов должен вернуть тот же объект
same_limiter = limiter.get_chat_limiter(123)
assert same_limiter is chat_limiter
class TestRetryHandler:
"""Тесты для RetryHandler"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig()
handler = RetryHandler(config)
assert handler.config == config
@pytest.mark.asyncio
async def test_execute_with_retry_success(self):
"""Тест успешного выполнения без retry"""
config = RateLimitConfig()
handler = RetryHandler(config)
mock_func = AsyncMock(return_value="success")
result = await handler.execute_with_retry(mock_func, 123)
assert result == "success"
mock_func.assert_called_once()
@pytest.mark.asyncio
async def test_execute_with_retry_retry_after(self):
"""Тест retry после RetryAfter ошибки"""
from aiogram.exceptions import TelegramRetryAfter
config = RateLimitConfig(retry_after_multiplier=1.0, max_retry_delay=1.0)
handler = RetryHandler(config)
mock_func = AsyncMock()
# Создаем мок для TelegramRetryAfter
from unittest.mock import MagicMock
retry_after_error = TelegramRetryAfter(
method=MagicMock(),
message="Flood control exceeded",
retry_after=1 # 1 секунда
)
mock_func.side_effect = [
retry_after_error, # Первый вызов - ошибка
"success" # Второй вызов - успех
]
start_time = time.time()
result = await handler.execute_with_retry(mock_func, 123, max_retries=1)
end_time = time.time()
assert result == "success"
assert mock_func.call_count == 2
assert end_time - start_time >= 0.1 # Должно ждать
class TestTelegramRateLimiter:
"""Тесты для TelegramRateLimiter"""
def test_initialization(self):
"""Тест инициализации"""
config = RateLimitConfig()
limiter = TelegramRateLimiter(config)
assert limiter.config == config
assert isinstance(limiter.global_limiter, GlobalRateLimiter)
assert isinstance(limiter.retry_handler, RetryHandler)
@pytest.mark.asyncio
async def test_send_with_rate_limit(self):
"""Тест отправки с rate limiting"""
config = RateLimitConfig(messages_per_second=10.0, burst_limit=10)
limiter = TelegramRateLimiter(config)
mock_send_func = AsyncMock(return_value="sent")
result = await limiter.send_with_rate_limit(mock_send_func, 123)
assert result == "sent"
mock_send_func.assert_called_once()
class TestRateLimitMonitor:
"""Тесты для RateLimitMonitor"""
def test_initialization(self):
"""Тест инициализации"""
monitor = RateLimitMonitor()
assert monitor.stats == {}
assert isinstance(monitor.global_stats, RateLimitStats)
assert monitor.max_history_size == 1000
def test_record_request_success(self):
"""Тест записи успешного запроса"""
monitor = RateLimitMonitor()
monitor.record_request(123, True, 0.5)
assert 123 in monitor.stats
chat_stats = monitor.stats[123]
assert chat_stats.total_requests == 1
assert chat_stats.successful_requests == 1
assert chat_stats.failed_requests == 0
assert chat_stats.total_wait_time == 0.5
def test_record_request_failure(self):
"""Тест записи неудачного запроса"""
monitor = RateLimitMonitor()
monitor.record_request(123, False, 1.0, "RetryAfter")
assert 123 in monitor.stats
chat_stats = monitor.stats[123]
assert chat_stats.total_requests == 1
assert chat_stats.successful_requests == 0
assert chat_stats.failed_requests == 1
assert chat_stats.retry_after_errors == 1
assert chat_stats.total_wait_time == 1.0
def test_get_chat_stats(self):
"""Тест получения статистики чата"""
monitor = RateLimitMonitor()
# Статистика для несуществующего чата
assert monitor.get_chat_stats(999) is None
# Записываем запрос
monitor.record_request(123, True, 0.5)
# Получаем статистику
stats = monitor.get_chat_stats(123)
assert stats is not None
assert stats.chat_id == 123
assert stats.total_requests == 1
def test_success_rate_calculation(self):
"""Тест расчета процента успеха"""
monitor = RateLimitMonitor()
# 3 успешных, 1 неудачный
monitor.record_request(123, True, 0.1)
monitor.record_request(123, True, 0.2)
monitor.record_request(123, True, 0.3)
monitor.record_request(123, False, 0.4, "RetryAfter")
stats = monitor.get_chat_stats(123)
assert stats.success_rate == 0.75 # 3/4
assert stats.error_rate == 0.25 # 1/4
class TestRateLimitConfig:
"""Тесты для конфигурации rate limiting"""
def test_get_rate_limit_config(self):
"""Тест получения конфигурации"""
# Тест production конфигурации
prod_config = get_rate_limit_config("production")
assert prod_config.messages_per_second == 0.5
assert prod_config.burst_limit == 2
# Тест development конфигурации
dev_config = get_rate_limit_config("development")
assert dev_config.messages_per_second == 1.0
assert dev_config.burst_limit == 3
# Тест strict конфигурации
strict_config = get_rate_limit_config("strict")
assert strict_config.messages_per_second == 0.3
assert strict_config.burst_limit == 1
# Тест неизвестной конфигурации (должна вернуть production)
unknown_config = get_rate_limit_config("unknown")
assert unknown_config.messages_per_second == 0.5
@pytest.mark.asyncio
async def test_send_with_rate_limit_integration():
"""Интеграционный тест для send_with_rate_limit"""
mock_send_func = AsyncMock(return_value="message_sent")
result = await send_with_rate_limit(mock_send_func, 123)
assert result == "message_sent"
mock_send_func.assert_called_once()
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -295,7 +295,7 @@ class TestDownloadFile:
with patch('os.path.getsize', return_value=1024):
with patch('os.path.basename', return_value='file_123.jpg'):
with patch('os.path.splitext', return_value=('file_123', '.jpg')):
with patch('helper_bot.utils.helper_func.metrics') as mock_metrics:
with patch('helper_bot.utils.metrics.metrics') as mock_metrics:
result = await download_file(mock_message, "file_id_123", "photo")
assert result == "files/photos/file_123.jpg"