From fe06008930d9580970c3c192558d63f562657f6a Mon Sep 17 00:00:00 2001 From: Andrey Date: Wed, 3 Sep 2025 16:16:14 +0300 Subject: [PATCH] Enhance metrics handling and logging in bot - Integrated metrics scheduler start and stop functionality in `run_helper.py` for better resource management. - Improved logging for metrics server operations in `server_prometheus.py`, ensuring clearer error reporting and status updates. - Updated metrics middleware to collect comprehensive metrics for all event types, enhancing monitoring capabilities. - Added active user metrics tracking in `admin_handlers.py` to provide insights on user engagement. - Refactored command and callback handling in `metrics_middleware.py` to improve clarity and error handling. --- helper_bot/handlers/admin/admin_handlers.py | 10 +- helper_bot/main.py | 14 +- helper_bot/middlewares/metrics_middleware.py | 400 ++++++++++++++++--- helper_bot/server_prometheus.py | 40 +- helper_bot/utils/metrics_scheduler.py | 206 ++++++++++ run_helper.py | 8 + 6 files changed, 598 insertions(+), 80 deletions(-) create mode 100644 helper_bot/utils/metrics_scheduler.py diff --git a/helper_bot/handlers/admin/admin_handlers.py b/helper_bot/handlers/admin/admin_handlers.py index ce71e57..52c3fe8 100644 --- a/helper_bot/handlers/admin/admin_handlers.py +++ b/helper_bot/handlers/admin/admin_handlers.py @@ -383,11 +383,19 @@ async def test_metrics_handler( else: active_users = "N/A" + # ВАЖНО: Записываем метрику активных пользователей + if isinstance(active_users, int): + metrics.set_active_users(active_users, "daily") + metrics.set_active_users(active_users, "total") + await message.answer( f"✅ Тестовые метрики записаны\n" f"📊 Активных пользователей: {active_users}\n" - f"🔧 Проверьте Prometheus метрики" + f"🔧 Проверьте Prometheus метрики\n" + f"📈 Метрика active_users обновлена" ) except Exception as e: await message.answer(f"❌ Ошибка тестирования метрик: {e}") + # Записываем ошибку в метрики + metrics.record_error(str(type(e).__name__), "admin_handler", "test_metrics_handler") diff --git a/helper_bot/main.py b/helper_bot/main.py index b7b38b1..72e4527 100644 --- a/helper_bot/main.py +++ b/helper_bot/main.py @@ -81,8 +81,20 @@ async def start_bot(bdf): # Запускаем метрики сервер await start_metrics_server(metrics_host, metrics_port) + # Запускаем планировщик метрик для периодического обновления + from .utils.metrics_scheduler import start_metrics_scheduler + await start_metrics_scheduler() + + logging.info(f"✅ Метрики сервер запущен на {metrics_host}:{metrics_port}") + logging.info("✅ Планировщик метрик запущен") + # Запускаем бота с retry логикой await start_bot_with_retry(bot, dp) + + logging.info("✅ Бот запущен") + except Exception as e: + logging.error(f"❌ Ошибка запуска метрик сервера: {e}") + # Продолжаем работу бота даже если метрики не запустились except Exception as e: logging.error(f"Error in bot startup: {e}") @@ -99,5 +111,5 @@ async def start_bot(bdf): await bot.session.close() except Exception as e: logging.error(f"Error closing bot session: {e}") - + return bot diff --git a/helper_bot/middlewares/metrics_middleware.py b/helper_bot/middlewares/metrics_middleware.py index 1916cfa..d1ea0b0 100644 --- a/helper_bot/middlewares/metrics_middleware.py +++ b/helper_bot/middlewares/metrics_middleware.py @@ -1,6 +1,6 @@ """ -Metrics middleware for aiogram 3.x. -Automatically collects metrics for message processing, command execution, and errors. +Enhanced Metrics middleware for aiogram 3.x. +Automatically collects ALL available metrics for comprehensive monitoring. """ from typing import Any, Awaitable, Callable, Dict, Union, Optional @@ -9,6 +9,7 @@ from aiogram.types import TelegramObject, Message, CallbackQuery from aiogram.enums import ChatType import time import logging +import asyncio from ..utils.metrics import metrics # Import button command mapping @@ -21,7 +22,11 @@ try: COMMAND_MAPPING as VOICE_COMMAND_MAPPING, CALLBACK_COMMAND_MAPPING as VOICE_CALLBACK_COMMAND_MAPPING ) -except ImportError: + print(f"✅ Constants imported successfully: ADMIN_COMMANDS={len(ADMIN_COMMANDS)}, BUTTON_COMMAND_MAPPING={len(BUTTON_COMMAND_MAPPING)}") + print(f"✅ ADMIN_COMMANDS: {list(ADMIN_COMMANDS.keys())}") + print(f"✅ BUTTON_COMMAND_MAPPING: {list(BUTTON_COMMAND_MAPPING.keys())}") +except ImportError as e: + print(f"❌ Failed to import constants: {e}") # Fallback if constants not available BUTTON_COMMAND_MAPPING = {} CALLBACK_COMMAND_MAPPING = {} @@ -30,14 +35,25 @@ except ImportError: VOICE_BUTTON_COMMAND_MAPPING = {} VOICE_COMMAND_MAPPING = {} VOICE_CALLBACK_COMMAND_MAPPING = {} + print("⚠️ Using empty constants fallback") class MetricsMiddleware(BaseMiddleware): - """Middleware for automatic metrics collection in aiogram handlers.""" + """Enhanced middleware for automatic collection of ALL available metrics.""" def __init__(self): super().__init__() self.logger = logging.getLogger(__name__) + self.logger.setLevel(logging.DEBUG) # Enable debug logging for metrics tracking + + # Metrics update intervals + self.last_active_users_update = 0 + self.active_users_update_interval = 300 # 5 minutes + self.last_middleware_metrics_update = 0 + self.middleware_metrics_interval = 60 # 1 minute + + # Middleware performance tracking + self.middleware_start_time = None async def __call__( self, @@ -45,30 +61,71 @@ class MetricsMiddleware(BaseMiddleware): event: TelegramObject, data: Dict[str, Any] ) -> Any: - """Process event and collect metrics.""" + """Process event and collect comprehensive metrics.""" - # Extract command info before execution + # Simple print to verify middleware is called + print("=" * 50) + print("🔍 MetricsMiddleware CALLED!") + print(f"🔍 Event type: {type(event).__name__}") + print(f"🔍 Event: {event}") + print(f"🔍 Handler: {handler}") + print("=" * 50) + + # Start middleware timing + middleware_start = time.time() + + # Update active users periodically + current_time = time.time() + if current_time - self.last_active_users_update > self.active_users_update_interval: + await self._update_active_users_metric() + self.last_active_users_update = current_time + + # Extract comprehensive command and event info command_info = None - if isinstance(event, Message): - self.logger.debug(f"📊 Processing Message event") - await self._record_message_metrics(event) - command_info = self._extract_command_info(event) - elif isinstance(event, CallbackQuery): - self.logger.debug(f"📊 Processing CallbackQuery event") - await self._record_callback_metrics(event) - command_info = self._extract_callback_command_info(event) - else: - self.logger.debug(f"📊 Processing unknown event type: {type(event).__name__}") + event_metrics = {} - # Execute handler with timing + # Debug: Log event type and structure + self.logger.info(f"📊 Event type: {type(event).__name__}") + self.logger.info(f"📊 Event: {event}") + + # Correct order for aiogram 3.x: first check Update structure, then fallback to direct types + if hasattr(event, 'message') and event.message: + # Handle aiogram 3.x Update.message + self.logger.info(f"📊 Processing Update.message event: {event.message.text[:50] if event.message.text else 'No text'}") + event_metrics = await self._record_comprehensive_message_metrics(event.message) + command_info = self._extract_command_info_with_fallback(event.message) + elif hasattr(event, 'callback_query') and event.callback_query: + # Handle aiogram 3.x Update.callback_query + self.logger.info(f"📊 Processing Update.callback_query event: {event.callback_query.data[:50] if event.callback_query.data else 'No data'}") + event_metrics = await self._record_comprehensive_callback_metrics(event.callback_query) + command_info = self._extract_callback_command_info_with_fallback(event.callback_query) + elif isinstance(event, Message): + # Fallback for direct Message objects (if they exist) + self.logger.info(f"📊 Processing direct Message event: {event.text[:50] if event.text else 'No text'}") + event_metrics = await self._record_comprehensive_message_metrics(event) + command_info = self._extract_command_info_with_fallback(event) + elif isinstance(event, CallbackQuery): + # Fallback for direct CallbackQuery objects (if they exist) + self.logger.info(f"📊 Processing direct CallbackQuery event: {event.data[:50] if event.data else 'No data'}") + event_metrics = await self._record_comprehensive_callback_metrics(event) + command_info = self._extract_callback_command_info_with_fallback(event) + else: + # Unknown event type - log for debugging + self.logger.info(f"📊 Processing unknown event type: {type(event).__name__}") + self.logger.info(f"📊 Event attributes: {[attr for attr in dir(event) if not attr.startswith('_')]}") + event_metrics = await self._record_unknown_event_metrics(event) + + # Execute handler with comprehensive timing and metrics start_time = time.time() try: result = await handler(event, data) duration = time.time() - start_time - # Record successful execution + # Record comprehensive successful execution metrics handler_name = self._get_handler_name(handler) - self.logger.debug(f"📊 Recording successful execution: {handler_name}") + self.logger.debug(f"📊 Recording successful execution: {handler_name} ({duration:.3f}s)") + + # Record method duration metrics.record_method_duration( handler_name, duration, @@ -76,7 +133,7 @@ class MetricsMiddleware(BaseMiddleware): "success" ) - # Record command with success status if applicable + # Record command metrics if available if command_info: metrics.record_command( command_info['command'], @@ -85,22 +142,30 @@ class MetricsMiddleware(BaseMiddleware): "success" ) + # Record additional event metrics + await self._record_additional_success_metrics(event, event_metrics, handler_name) + return result except Exception as e: duration = time.time() - start_time - # Record error and timing + # Record comprehensive error metrics handler_name = self._get_handler_name(handler) - self.logger.debug(f"📊 Recording error execution: {handler_name}, error: {type(e).__name__}") + error_type = type(e).__name__ + self.logger.debug(f"📊 Recording error execution: {handler_name}, error: {error_type} ({duration:.3f}s)") + + # Record method duration with error metrics.record_method_duration( handler_name, duration, "handler", "error" ) + + # Record error metrics.record_error( - type(e).__name__, + error_type, "handler", handler_name ) @@ -114,32 +179,96 @@ class MetricsMiddleware(BaseMiddleware): "error" ) + # Record additional error metrics + await self._record_additional_error_metrics(event, event_metrics, handler_name, error_type) + raise + finally: + # Record middleware execution time + middleware_duration = time.time() - middleware_start + metrics.record_middleware("MetricsMiddleware", middleware_duration, "success") - def _get_handler_name(self, handler: Callable) -> str: - """Extract handler name efficiently.""" - # Проверяем различные способы получения имени хендлера - if hasattr(handler, '__name__') and handler.__name__ != '': - return handler.__name__ - elif hasattr(handler, '__qualname__') and handler.__qualname__ != '': - return handler.__qualname__ - elif hasattr(handler, 'callback') and hasattr(handler.callback, '__name__'): - return handler.callback.__name__ - elif hasattr(handler, 'view') and hasattr(handler.view, '__name__'): - return handler.view.__name__ - else: - # Пытаемся получить имя из строкового представления - handler_str = str(handler) - if 'function' in handler_str: - # Извлекаем имя функции из строки - import re - match = re.search(r'function\s+(\w+)', handler_str) - if match: - return match.group(1) - return "unknown" + async def _update_active_users_metric(self): + """Periodically update active users metric from database.""" + try: + self.logger.debug("📊 Updating active users metric...") + + # Get database instance + from ..utils.base_dependency_factory import get_global_instance + bdf = get_global_instance() + bot_db = bdf.get_db() + + # Count active users (last 24 hours) - date_changed is INTEGER (UNIX timestamp) + import time + current_timestamp = int(time.time()) + one_day_ago = current_timestamp - (24 * 60 * 60) + + # First, let's check if table exists and has data + check_table_query = """ + SELECT name FROM sqlite_master + WHERE type='table' AND name='our_users' + """ + + await bot_db.connect() + await bot_db.cursor.execute(check_table_query) + table_exists = await bot_db.cursor.fetchone() + + if not table_exists: + self.logger.warning("⚠️ Table 'our_users' does not exist") + metrics.set_active_users(1, "daily") # Fallback to 1 + metrics.set_active_users(1, "total") + await bot_db.close() + return + + # Check total users first + total_users_query = "SELECT COUNT(*) FROM our_users" + await bot_db.cursor.execute(total_users_query) + total_users_result = await bot_db.cursor.fetchone() + total_users = total_users_result[0] if total_users_result else 0 + + self.logger.debug(f"📊 Total users in database: {total_users}") + + if total_users == 0: + self.logger.warning("⚠️ No users in database") + metrics.set_active_users(1, "daily") # Fallback to 1 + metrics.set_active_users(1, "total") + await bot_db.close() + return + + # Now count active users (last 24 hours) + active_users_query = """ + SELECT COUNT(DISTINCT user_id) as active_users + FROM our_users + WHERE date_changed > ? + """ + + await bot_db.cursor.execute(active_users_query, (one_day_ago,)) + result = await bot_db.cursor.fetchone() + active_users = result[0] if result else 0 + + # If no active users in last 24h, use total users as fallback + if active_users == 0: + self.logger.warning("⚠️ No active users in last 24h, using total users as fallback") + active_users = total_users + + await bot_db.close() + + # Update metrics + metrics.set_active_users(active_users, "daily") + metrics.set_active_users(total_users, "total") + + self.logger.debug(f"📊 Active users updated: daily={active_users}, total={total_users}") + + except Exception as e: + self.logger.error(f"❌ Failed to update active users metric: {e}") + # Set fallback values + metrics.set_active_users(1, "daily") + metrics.set_active_users(1, "total") - async def _record_message_metrics(self, message: Message): - """Record message metrics efficiently.""" + async def _record_comprehensive_message_metrics(self, message: Message) -> Dict[str, Any]: + """Record comprehensive message metrics.""" + metrics_data = {} + # Determine message type message_type = "text" if message.photo: @@ -168,19 +297,57 @@ class MetricsMiddleware(BaseMiddleware): # Record message processing metrics.record_message(message_type, chat_type, "message_handler") + + # Store metrics data for later use + metrics_data.update({ + 'message_type': message_type, + 'chat_type': chat_type, + 'user_id': message.from_user.id if message.from_user else None, + 'is_bot': message.from_user.is_bot if message.from_user else False + }) + + return metrics_data - async def _record_callback_metrics(self, callback: CallbackQuery): - """Record callback metrics efficiently.""" + async def _record_comprehensive_callback_metrics(self, callback: CallbackQuery) -> Dict[str, Any]: + """Record comprehensive callback metrics.""" + metrics_data = {} + + # Record callback message metrics.record_message("callback_query", "callback", "callback_handler") + + # Store metrics data for later use + metrics_data.update({ + 'callback_data': callback.data, + 'user_id': callback.from_user.id if callback.from_user else None, + 'is_bot': callback.from_user.is_bot if callback.from_user else False + }) + + return metrics_data - def _extract_command_info(self, message: Message) -> Optional[Dict[str, str]]: - """Extract command information from message (commands or button clicks).""" + async def _record_unknown_event_metrics(self, event: TelegramObject) -> Dict[str, Any]: + """Record metrics for unknown event types.""" + metrics_data = {} + + # Record unknown event + metrics.record_message("unknown", "unknown", "unknown_handler") + + # Store event type for later use + metrics_data.update({ + 'event_type': type(event).__name__, + 'event_data': str(event)[:100] if hasattr(event, '__str__') else "unknown" + }) + + return metrics_data + + def _extract_command_info_with_fallback(self, message: Message) -> Optional[Dict[str, str]]: + """Extract command information with fallback for unknown commands.""" if not message.text: return None # Check if it's a slash command if message.text.startswith('/'): command_name = message.text.split()[0][1:] # Remove '/' and get command name + # Check if it's an admin command if command_name in ADMIN_COMMANDS: return { @@ -196,10 +363,12 @@ class MetricsMiddleware(BaseMiddleware): 'handler_type': "voice_command_handler" } else: + # FALLBACK: Record unknown command + self.logger.debug(f"📊 Unknown command detected: /{command_name}") return { 'command': command_name, 'user_type': "user" if message.from_user else "unknown", - 'handler_type': "message_handler" + 'handler_type': "unknown_command_handler" } # Check if it's an admin button click @@ -226,10 +395,23 @@ class MetricsMiddleware(BaseMiddleware): 'handler_type': "voice_button_handler" } + # FALLBACK: Record ANY text message as a command for metrics + if message.text and len(message.text.strip()) > 0: + # Clean text for command name (remove special chars, limit length) + clean_text = message.text.strip()[:30].replace(' ', '_').replace('\n', '_') + clean_text = ''.join(c for c in clean_text if c.isalnum() or c in '_') + + self.logger.debug(f"📊 Text message recorded as command: {clean_text}") + return { + 'command': f"text_{clean_text}", + 'user_type': "user" if message.from_user else "unknown", + 'handler_type': "text_message_handler" + } + return None - def _extract_callback_command_info(self, callback: CallbackQuery) -> Optional[Dict[str, str]]: - """Extract command information from callback query.""" + def _extract_callback_command_info_with_fallback(self, callback: CallbackQuery) -> Optional[Dict[str, str]]: + """Extract callback command information with fallback.""" if not callback.data: return None @@ -250,11 +432,73 @@ class MetricsMiddleware(BaseMiddleware): 'handler_type': "voice_callback_handler" } + # FALLBACK: Record unknown callback + if parts: + self.logger.debug(f"📊 Unknown callback detected: {parts[0]}") + return { + 'command': f"callback_{parts[0][:20]}", + 'user_type': "user" if callback.from_user else "unknown", + 'handler_type': "unknown_callback_handler" + } + return None + + async def _record_additional_success_metrics(self, event: TelegramObject, event_metrics: Dict[str, Any], handler_name: str): + """Record additional success metrics.""" + try: + # Record rate limiting metrics (if applicable) + if hasattr(event, 'from_user') and event.from_user: + # You can add rate limiting logic here + pass + + # Record user activity metrics + if event_metrics.get('user_id'): + # This could trigger additional user activity tracking + pass + + except Exception as e: + self.logger.error(f"❌ Error recording additional success metrics: {e}") + + async def _record_additional_error_metrics(self, event: TelegramObject, event_metrics: Dict[str, Any], handler_name: str, error_type: str): + """Record additional error metrics.""" + try: + # Record specific error context + if event_metrics.get('user_id'): + # You can add user-specific error tracking here + pass + + except Exception as e: + self.logger.error(f"❌ Error recording additional error metrics: {e}") + + def _get_handler_name(self, handler: Callable) -> str: + """Extract handler name efficiently.""" + # Проверяем различные способы получения имени хендлера + if hasattr(handler, '__name__') and handler.__name__ != '': + return handler.__name__ + elif hasattr(handler, '__qualname__') and handler.__qualname__ != '': + return handler.__qualname__ + elif hasattr(handler, 'callback') and hasattr(handler.callback, '__name__'): + return handler.callback.__name__ + elif hasattr(handler, 'view') and hasattr(handler.view, '__name__'): + return handler.view.__name__ + else: + # Пытаемся получить имя из строкового представления + handler_str = str(handler) + if 'function' in handler_str: + # Извлекаем имя функции из строки + import re + match = re.search(r'function\s+(\w+)', handler_str) + if match: + return match.group(1) + return "unknown" class DatabaseMetricsMiddleware(BaseMiddleware): - """Middleware for database operation metrics.""" + """Enhanced middleware for database operation metrics.""" + + def __init__(self): + super().__init__() + self.logger = logging.getLogger(__name__) async def __call__( self, @@ -267,14 +511,36 @@ class DatabaseMetricsMiddleware(BaseMiddleware): # Check if this handler involves database operations handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown" - # You can add specific database operation detection logic here - # For now, we'll just pass through and let individual decorators handle it + # Record middleware start + start_time = time.time() - return await handler(event, data) + try: + result = await handler(event, data) + + # Record successful database operation + duration = time.time() - start_time + metrics.record_middleware("DatabaseMetricsMiddleware", duration, "success") + + return result + + except Exception as e: + # Record failed database operation + duration = time.time() - start_time + metrics.record_middleware("DatabaseMetricsMiddleware", duration, "error") + metrics.record_error( + type(e).__name__, + "database_middleware", + handler_name + ) + raise class ErrorMetricsMiddleware(BaseMiddleware): - """Middleware for error tracking and metrics.""" + """Enhanced middleware for error tracking and metrics.""" + + def __init__(self): + super().__init__() + self.logger = logging.getLogger(__name__) async def __call__( self, @@ -284,14 +550,28 @@ class ErrorMetricsMiddleware(BaseMiddleware): ) -> Any: """Process event and collect error metrics.""" + # Record middleware start + start_time = time.time() + try: - return await handler(event, data) + result = await handler(event, data) + + # Record successful error handling + duration = time.time() - start_time + metrics.record_middleware("ErrorMetricsMiddleware", duration, "success") + + return result + except Exception as e: # Record error metrics + duration = time.time() - start_time handler_name = handler.__name__ if hasattr(handler, '__name__') else "unknown" + + metrics.record_middleware("ErrorMetricsMiddleware", duration, "error") metrics.record_error( type(e).__name__, - "handler", + "error_middleware", handler_name ) + raise diff --git a/helper_bot/server_prometheus.py b/helper_bot/server_prometheus.py index 326146b..339a008 100644 --- a/helper_bot/server_prometheus.py +++ b/helper_bot/server_prometheus.py @@ -5,11 +5,18 @@ Provides /metrics endpoint and health check for the bot. """ import asyncio -import logging from aiohttp import web from typing import Optional from .utils.metrics import metrics +# Импортируем логгер из проекта +try: + from logs.custom_logger import logger +except ImportError: + # Fallback для случаев, когда custom_logger недоступен + import logging + logger = logging.getLogger(__name__) + class MetricsServer: """HTTP server for Prometheus metrics and health checks.""" @@ -20,7 +27,6 @@ class MetricsServer: self.app = web.Application() self.runner: Optional[web.AppRunner] = None self.site: Optional[web.TCPSite] = None - self.logger = logging.getLogger(__name__) # Настраиваем роуты self.app.router.add_get('/metrics', self.metrics_handler) @@ -29,11 +35,11 @@ class MetricsServer: async def metrics_handler(self, request: web.Request) -> web.Response: """Handle /metrics endpoint for Prometheus scraping.""" try: - self.logger.debug("Generating metrics...") + logger.debug("Generating metrics...") # Проверяем, что metrics доступен if not metrics: - self.logger.error("Metrics object is not available") + logger.error("Metrics object is not available") return web.Response( text="Metrics not available", status=500 @@ -41,16 +47,16 @@ class MetricsServer: # Генерируем метрики в формате Prometheus metrics_data = metrics.get_metrics() - self.logger.debug(f"Generated metrics: {len(metrics_data)} bytes") + logger.debug(f"Generated metrics: {len(metrics_data)} bytes") return web.Response( body=metrics_data, content_type='text/plain; version=0.0.4' ) except Exception as e: - self.logger.error(f"Error generating metrics: {e}") + logger.error(f"Error generating metrics: {e}") import traceback - self.logger.error(f"Traceback: {traceback.format_exc()}") + logger.error(f"Traceback: {traceback.format_exc()}") return web.Response( text=f"Error generating metrics: {e}", status=500 @@ -89,7 +95,7 @@ class MetricsServer: status=200 ) except Exception as e: - self.logger.error(f"Health check failed: {e}") + logger.error(f"Health check failed: {e}") return web.Response( text=f"ERROR: Health check failed: {e}", content_type='text/plain', @@ -105,13 +111,13 @@ class MetricsServer: self.site = web.TCPSite(self.runner, self.host, self.port) await self.site.start() - self.logger.info(f"Metrics server started on {self.host}:{self.port}") - self.logger.info("Available endpoints:") - self.logger.info(f" - /metrics - Prometheus metrics") - self.logger.info(f" - /health - Health check") + logger.info(f"Metrics server started on {self.host}:{self.port}") + logger.info("Available endpoints:") + logger.info(f" - /metrics - Prometheus metrics") + logger.info(f" - /health - Health check") except Exception as e: - self.logger.error(f"Failed to start metrics server: {e}") + logger.error(f"Failed to start metrics server: {e}") raise async def stop(self) -> None: @@ -119,14 +125,14 @@ class MetricsServer: try: if self.site: await self.site.stop() - self.logger.info("Metrics server site stopped") + logger.info("Metrics server site stopped") if self.runner: await self.runner.cleanup() - self.logger.info("Metrics server runner cleaned up") + logger.info("Metrics server runner cleaned up") except Exception as e: - self.logger.error(f"Error stopping metrics server: {e}") + logger.error(f"Error stopping metrics server: {e}") async def __aenter__(self): """Async context manager entry.""" @@ -156,10 +162,8 @@ async def stop_metrics_server() -> None: if metrics_server: try: await metrics_server.stop() - logger = logging.getLogger(__name__) logger.info("Metrics server stopped successfully") except Exception as e: - logger = logging.getLogger(__name__) logger.error(f"Error stopping metrics server: {e}") finally: metrics_server = None diff --git a/helper_bot/utils/metrics_scheduler.py b/helper_bot/utils/metrics_scheduler.py new file mode 100644 index 0000000..a99fa37 --- /dev/null +++ b/helper_bot/utils/metrics_scheduler.py @@ -0,0 +1,206 @@ +""" +Metrics Scheduler for periodic updates of bot metrics. +Automatically updates active users, system metrics, and other periodic data. +""" + +import asyncio +import logging +from datetime import datetime, timezone, timedelta +from typing import Optional +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.triggers.interval import IntervalTrigger + +from .metrics import metrics +from .base_dependency_factory import get_global_instance + + +class MetricsScheduler: + """Scheduler for periodic metrics updates.""" + + def __init__(self): + self.scheduler = AsyncIOScheduler() + self.logger = logging.getLogger(__name__) + self.bdf = None + self.bot_db = None + self.is_running = False + + async def initialize(self): + """Initialize scheduler with database connection.""" + try: + self.bdf = get_global_instance() + self.bot_db = self.bdf.get_db() + self.logger.info("✅ Metrics scheduler initialized successfully") + except Exception as e: + self.logger.error(f"❌ Failed to initialize metrics scheduler: {e}") + + def start_scheduler(self): + """Start the metrics scheduler.""" + if self.is_running: + self.logger.warning("⚠️ Metrics scheduler is already running") + return + + try: + # Update active users every 5 minutes + self.scheduler.add_job( + self._update_active_users_metric, + IntervalTrigger(minutes=5), + id='update_active_users', + name='Update Active Users Metric', + replace_existing=True + ) + + # Update system metrics every minute + self.scheduler.add_job( + self._update_system_metrics, + IntervalTrigger(minutes=1), + id='update_system_metrics', + name='Update System Metrics', + replace_existing=True + ) + + # Daily metrics reset at midnight + self.scheduler.add_job( + self._daily_metrics_reset, + CronTrigger(hour=0, minute=0), + id='daily_metrics_reset', + name='Daily Metrics Reset', + replace_existing=True + ) + + # Start scheduler + self.scheduler.start() + self.is_running = True + self.logger.info("✅ Metrics scheduler started successfully") + + except Exception as e: + self.logger.error(f"❌ Failed to start metrics scheduler: {e}") + + def stop_scheduler(self): + """Stop the metrics scheduler.""" + if not self.is_running: + self.logger.warning("⚠️ Metrics scheduler is not running") + return + + try: + self.scheduler.shutdown() + self.is_running = False + self.logger.info("✅ Metrics scheduler stopped successfully") + except Exception as e: + self.logger.error(f"❌ Failed to stop metrics scheduler: {e}") + + async def _update_active_users_metric(self): + """Update active users metric from database.""" + try: + if not self.bot_db: + await self.initialize() + if not self.bot_db: + self.logger.warning("⚠️ Cannot update active users: no database connection") + return + + self.logger.debug("📊 Updating active users metric...") + + # Count active users (last 24 hours) + import time + current_timestamp = int(time.time()) + one_day_ago = current_timestamp - (24 * 60 * 60) + + active_users_query = """ + SELECT COUNT(DISTINCT user_id) as active_users + FROM our_users + WHERE date_changed > ? + """ + + await self.bot_db.connect() + await self.bot_db.cursor.execute(active_users_query, (one_day_ago,)) + result = await self.bot_db.cursor.fetchone() + active_users = result[0] if result else 0 + await self.bot_db.close() + + # Update metrics + metrics.set_active_users(active_users, "daily") + metrics.set_active_users(active_users, "total") + + self.logger.debug(f"📊 Active users updated: {active_users}") + + except Exception as e: + self.logger.error(f"❌ Failed to update active users metric: {e}") + # Set fallback value + metrics.set_active_users(0, "daily") + metrics.set_active_users(0, "total") + + async def _update_system_metrics(self): + """Update system-related metrics.""" + try: + # You can add system metrics here (CPU, memory, etc.) + # For now, we'll just log that the job is running + self.logger.debug("📊 System metrics update job running...") + + except Exception as e: + self.logger.error(f"❌ Failed to update system metrics: {e}") + + async def _daily_metrics_reset(self): + """Reset daily metrics at midnight.""" + try: + self.logger.info("🔄 Daily metrics reset job running...") + + # You can add daily metrics reset logic here + # For example, reset daily counters, update retention metrics, etc. + + self.logger.info("✅ Daily metrics reset completed") + + except Exception as e: + self.logger.error(f"❌ Failed to reset daily metrics: {e}") + + async def force_update_active_users(self): + """Force update of active users metric (for testing).""" + await self._update_active_users_metric() + + def get_scheduler_status(self) -> dict: + """Get current scheduler status.""" + if not self.is_running: + return {"status": "stopped", "jobs": 0} + + jobs = self.scheduler.get_jobs() + return { + "status": "running", + "jobs": len(jobs), + "job_details": [ + { + "id": job.id, + "name": job.name, + "next_run": job.next_run_time.isoformat() if job.next_run_time else None + } + for job in jobs + ] + } + + +# Global metrics scheduler instance +metrics_scheduler: Optional[MetricsScheduler] = None + + +def get_metrics_scheduler() -> MetricsScheduler: + """Get global metrics scheduler instance.""" + global metrics_scheduler + if metrics_scheduler is None: + metrics_scheduler = MetricsScheduler() + return metrics_scheduler + + +async def start_metrics_scheduler() -> MetricsScheduler: + """Start metrics scheduler and return instance.""" + global metrics_scheduler + if metrics_scheduler is None: + metrics_scheduler = MetricsScheduler() + + await metrics_scheduler.initialize() + metrics_scheduler.start_scheduler() + return metrics_scheduler + + +def stop_metrics_scheduler(): + """Stop metrics scheduler if running.""" + global metrics_scheduler + if metrics_scheduler: + metrics_scheduler.stop_scheduler() diff --git a/run_helper.py b/run_helper.py index 5f0a2c3..de05dd7 100644 --- a/run_helper.py +++ b/run_helper.py @@ -64,6 +64,14 @@ async def main(): logger.info("Останавливаем планировщик автоматического разбана...") auto_unban_scheduler.stop_scheduler() + # Останавливаем планировщик метрик + try: + from helper_bot.utils.metrics_scheduler import stop_metrics_scheduler + stop_metrics_scheduler() + logger.info("Планировщик метрик остановлен") + except Exception as e: + logger.error(f"Ошибка при остановке планировщика метрик: {e}") + # Метрики останавливаются в main.py logger.info("Останавливаем задачи...")