""" Metrics exporter for Prometheus. Provides HTTP endpoint for metrics collection and background metrics collection. """ import asyncio import logging from aiohttp import web from typing import Optional, Dict, Any from .metrics import metrics class MetricsExporter: """HTTP server for exposing Prometheus metrics.""" def __init__(self, host: str = "0.0.0.0", port: int = 8000): self.host = host self.port = port self.app = web.Application() self.runner: Optional[web.AppRunner] = None self.site: Optional[web.TCPSite] = None self.logger = logging.getLogger(__name__) # Setup routes self.app.router.add_get('/metrics', self.metrics_handler) self.app.router.add_get('/health', self.health_handler) self.app.router.add_get('/', self.root_handler) async def start(self): """Start the metrics server.""" try: self.runner = web.AppRunner(self.app) await self.runner.setup() self.site = web.TCPSite(self.runner, self.host, self.port) await self.site.start() self.logger.info(f"Metrics server started on {self.host}:{self.port}") except Exception as e: self.logger.error(f"Failed to start metrics server: {e}") raise async def stop(self): """Stop the metrics server.""" if self.site: await self.site.stop() if self.runner: await self.runner.cleanup() self.logger.info("Metrics server stopped") async def metrics_handler(self, request: web.Request) -> web.Response: """Handle /metrics endpoint for Prometheus.""" try: # Log request for debugging self.logger.info(f"Metrics request from {request.remote}: {request.headers.get('User-Agent', 'Unknown')}") metrics_data = metrics.get_metrics() self.logger.debug(f"Generated metrics: {len(metrics_data)} bytes") return web.Response( body=metrics_data, content_type='text/plain; version=0.0.4' ) except Exception as e: self.logger.error(f"Error generating metrics: {e}") return web.Response( text=f"Error generating metrics: {e}", status=500 ) async def health_handler(self, request: web.Request) -> web.Response: """Handle /health endpoint for health checks.""" return web.json_response({ "status": "healthy", "service": "telegram-bot-metrics" }) async def root_handler(self, request: web.Request) -> web.Response: """Handle root endpoint with basic info.""" return web.json_response({ "service": "Telegram Bot Metrics Exporter", "endpoints": { "/metrics": "Prometheus metrics", "/health": "Health check", "/": "This info" } }) class BackgroundMetricsCollector: """Background service for collecting periodic metrics.""" def __init__(self, db: Optional[Any] = None, interval: int = 60): self.db = db self.interval = interval self.running = False self.logger = logging.getLogger(__name__) async def start(self): """Start background metrics collection.""" self.running = True self.logger.info("Background metrics collector started") while self.running: try: await self._collect_metrics() await asyncio.sleep(self.interval) except Exception as e: self.logger.error(f"Error in background metrics collection: {e}") await asyncio.sleep(self.interval) async def stop(self): """Stop background metrics collection.""" self.running = False self.logger.info("Background metrics collector stopped") async def _collect_metrics(self): """Collect periodic metrics.""" try: # Collect active users count if database is available if self.db: await self._collect_user_metrics() # Collect system metrics await self._collect_system_metrics() except Exception as e: self.logger.error(f"Error collecting metrics: {e}") async def _collect_user_metrics(self): """Collect user-related metrics from database.""" try: if hasattr(self.db, 'fetch_one'): # Try to get active users from database if it has async methods try: active_users_query = """ SELECT COUNT(DISTINCT user_id) as active_users FROM our_users WHERE date_added > datetime('now', '-1 day') """ result = await self.db.fetch_one(active_users_query) if result: metrics.set_active_users(result['active_users'], 'daily') else: metrics.set_active_users(0, 'daily') except Exception as db_error: self.logger.warning(f"Database query failed, using placeholder: {db_error}") metrics.set_active_users(0, 'daily') else: # For now, set a placeholder value metrics.set_active_users(0, 'daily') except Exception as e: self.logger.error(f"Error collecting user metrics: {e}") metrics.set_active_users(0, 'daily') async def _collect_system_metrics(self): """Collect system-level metrics.""" try: # Example: collect memory usage, CPU usage, etc. # This can be extended based on your needs pass except Exception as e: self.logger.error(f"Error collecting system metrics: {e}") class MetricsManager: """Main class for managing metrics collection and export.""" def __init__(self, host: str = "0.0.0.0", port: int = 8000, db: Optional[Any] = None): self.exporter = MetricsExporter(host, port) self.collector = BackgroundMetricsCollector(db) self.logger = logging.getLogger(__name__) async def start(self): """Start metrics collection and export.""" try: # Start metrics exporter await self.exporter.start() # Start background collector asyncio.create_task(self.collector.start()) self.logger.info("Metrics manager started successfully") except Exception as e: self.logger.error(f"Failed to start metrics manager: {e}") raise async def stop(self): """Stop metrics collection and export.""" try: await self.collector.stop() await self.exporter.stop() self.logger.info("Metrics manager stopped successfully") except Exception as e: self.logger.error(f"Error stopping metrics manager: {e}") raise