""" Мониторинг и статистика rate limiting """ import time from typing import Dict, List, Optional from dataclasses import dataclass, field from collections import defaultdict, deque from logs.custom_logger import logger @dataclass class RateLimitStats: """Статистика rate limiting для чата""" chat_id: int total_requests: int = 0 successful_requests: int = 0 failed_requests: int = 0 retry_after_errors: int = 0 other_errors: int = 0 total_wait_time: float = 0.0 last_request_time: float = 0.0 request_times: deque = field(default_factory=lambda: deque(maxlen=100)) @property def success_rate(self) -> float: """Процент успешных запросов""" if self.total_requests == 0: return 1.0 return self.successful_requests / self.total_requests @property def error_rate(self) -> float: """Процент ошибок""" return 1.0 - self.success_rate @property def average_wait_time(self) -> float: """Среднее время ожидания""" if self.total_requests == 0: return 0.0 return self.total_wait_time / self.total_requests @property def requests_per_minute(self) -> float: """Запросов в минуту""" if not self.request_times: return 0.0 current_time = time.time() minute_ago = current_time - 60 # Подсчитываем запросы за последнюю минуту recent_requests = sum(1 for req_time in self.request_times if req_time > minute_ago) return recent_requests class RateLimitMonitor: """Монитор для отслеживания статистики rate limiting""" def __init__(self, max_history_size: int = 1000): self.stats: Dict[int, RateLimitStats] = defaultdict(lambda: RateLimitStats(0)) self.global_stats = RateLimitStats(0) self.max_history_size = max_history_size self.error_history: deque = deque(maxlen=max_history_size) def record_request(self, chat_id: int, success: bool, wait_time: float = 0.0, error_type: Optional[str] = None): """Записывает информацию о запросе""" current_time = time.time() # Обновляем статистику для чата chat_stats = self.stats[chat_id] chat_stats.chat_id = chat_id chat_stats.total_requests += 1 chat_stats.total_wait_time += wait_time chat_stats.last_request_time = current_time chat_stats.request_times.append(current_time) if success: chat_stats.successful_requests += 1 else: chat_stats.failed_requests += 1 if error_type == "RetryAfter": chat_stats.retry_after_errors += 1 else: chat_stats.other_errors += 1 # Записываем ошибку в историю self.error_history.append({ 'chat_id': chat_id, 'error_type': error_type, 'timestamp': current_time, 'wait_time': wait_time }) # Обновляем глобальную статистику self.global_stats.total_requests += 1 self.global_stats.total_wait_time += wait_time self.global_stats.last_request_time = current_time self.global_stats.request_times.append(current_time) if success: self.global_stats.successful_requests += 1 else: self.global_stats.failed_requests += 1 if error_type == "RetryAfter": self.global_stats.retry_after_errors += 1 else: self.global_stats.other_errors += 1 def get_chat_stats(self, chat_id: int) -> Optional[RateLimitStats]: """Получает статистику для конкретного чата""" return self.stats.get(chat_id) def get_global_stats(self) -> RateLimitStats: """Получает глобальную статистику""" return self.global_stats def get_top_chats_by_requests(self, limit: int = 10) -> List[tuple]: """Получает топ чатов по количеству запросов""" sorted_chats = sorted( self.stats.items(), key=lambda x: x[1].total_requests, reverse=True ) return sorted_chats[:limit] def get_chats_with_high_error_rate(self, threshold: float = 0.1) -> List[tuple]: """Получает чаты с высоким процентом ошибок""" high_error_chats = [ (chat_id, stats) for chat_id, stats in self.stats.items() if stats.error_rate > threshold and stats.total_requests > 5 ] return sorted(high_error_chats, key=lambda x: x[1].error_rate, reverse=True) def get_recent_errors(self, minutes: int = 60) -> List[dict]: """Получает недавние ошибки""" current_time = time.time() cutoff_time = current_time - (minutes * 60) return [ error for error in self.error_history if error['timestamp'] > cutoff_time ] def get_error_summary(self, minutes: int = 60) -> Dict[str, int]: """Получает сводку ошибок за указанный период""" recent_errors = self.get_recent_errors(minutes) error_summary = defaultdict(int) for error in recent_errors: error_summary[error['error_type']] += 1 return dict(error_summary) def log_statistics(self, log_level: str = "info"): """Логирует текущую статистику""" global_stats = self.get_global_stats() log_message = ( f"Rate Limit Statistics:\n" f" Total requests: {global_stats.total_requests}\n" f" Success rate: {global_stats.success_rate:.2%}\n" f" Error rate: {global_stats.error_rate:.2%}\n" f" RetryAfter errors: {global_stats.retry_after_errors}\n" f" Other errors: {global_stats.other_errors}\n" f" Average wait time: {global_stats.average_wait_time:.2f}s\n" f" Requests per minute: {global_stats.requests_per_minute:.1f}\n" f" Active chats: {len(self.stats)}" ) if log_level == "error": logger.error(log_message) elif log_level == "warning": logger.warning(log_message) else: logger.info(log_message) # Логируем чаты с высоким процентом ошибок high_error_chats = self.get_chats_with_high_error_rate(0.2) if high_error_chats: logger.warning(f"Chats with high error rate (>20%): {len(high_error_chats)}") for chat_id, stats in high_error_chats[:5]: # Показываем только первые 5 logger.warning(f" Chat {chat_id}: {stats.error_rate:.2%} error rate ({stats.failed_requests}/{stats.total_requests})") def reset_stats(self, chat_id: Optional[int] = None): """Сбрасывает статистику""" if chat_id is None: # Сбрасываем всю статистику self.stats.clear() self.global_stats = RateLimitStats(0) self.error_history.clear() else: # Сбрасываем статистику для конкретного чата if chat_id in self.stats: del self.stats[chat_id] # Глобальный экземпляр монитора rate_limit_monitor = RateLimitMonitor() def record_rate_limit_request(chat_id: int, success: bool, wait_time: float = 0.0, error_type: Optional[str] = None): """Удобная функция для записи информации о запросе""" rate_limit_monitor.record_request(chat_id, success, wait_time, error_type) def get_rate_limit_summary() -> Dict: """Получает краткую сводку по rate limiting""" global_stats = rate_limit_monitor.get_global_stats() recent_errors = rate_limit_monitor.get_recent_errors(60) # За последний час return { 'total_requests': global_stats.total_requests, 'success_rate': global_stats.success_rate, 'error_rate': global_stats.error_rate, 'recent_errors_count': len(recent_errors), 'active_chats': len(rate_limit_monitor.stats), 'requests_per_minute': global_stats.requests_per_minute, 'average_wait_time': global_stats.average_wait_time }