""" Metrics exporter for Prometheus. Provides HTTP endpoint for metrics collection and background metrics collection. """ import asyncio import logging from aiohttp import web from typing import Optional, Dict, Any, Protocol from .metrics import metrics class DatabaseProvider(Protocol): """Protocol for database operations.""" async def fetch_one(self, query: str) -> Optional[Dict[str, Any]]: """Execute query and return single result.""" ... class MetricsCollector(Protocol): """Protocol for metrics collection operations.""" async def collect_user_metrics(self, db: DatabaseProvider) -> None: """Collect user-related metrics.""" ... class UserMetricsCollector: """Concrete implementation of user metrics collection.""" def __init__(self, logger: logging.Logger): self.logger = logger async def collect_user_metrics(self, db: DatabaseProvider) -> None: """Collect user-related metrics from database.""" try: # Проверяем, есть ли метод fetch_one (асинхронная БД) if hasattr(db, 'fetch_one'): active_users_query = """ SELECT COUNT(DISTINCT user_id) as active_users FROM our_users WHERE date_changed > datetime('now', '-1 day') """ result = await db.fetch_one(active_users_query) if result: metrics.set_active_users(result['active_users'], 'daily') self.logger.debug(f"Updated active users: {result['active_users']}") else: metrics.set_active_users(0, 'daily') self.logger.debug("Updated active users: 0") # Проверяем синхронную БД BotDB elif hasattr(db, 'connect') and hasattr(db, 'cursor'): # Используем синхронный запрос для BotDB в отдельном потоке import asyncio from concurrent.futures import ThreadPoolExecutor active_users_query = """ SELECT COUNT(DISTINCT user_id) as active_users FROM our_users WHERE date_changed > datetime('now', '-1 day') """ def sync_db_query(): try: db.connect() db.cursor.execute(active_users_query) result = db.cursor.fetchone() return result[0] if result else 0 finally: db.close() # Выполняем синхронный запрос в отдельном потоке loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: result = await loop.run_in_executor(executor, sync_db_query) metrics.set_active_users(result, 'daily') self.logger.debug(f"Updated active users: {result}") else: metrics.set_active_users(0, 'daily') self.logger.warning("Database doesn't support fetch_one or connect methods") except Exception as e: self.logger.error(f"Error collecting user metrics: {e}") metrics.set_active_users(0, 'daily') class DependencyProvider(Protocol): """Protocol for dependency injection.""" def get_db(self) -> DatabaseProvider: """Get database instance.""" ... class BackgroundMetricsCollector: """Background service for collecting periodic metrics using dependency injection.""" def __init__( self, dependency_provider: DependencyProvider, metrics_collector: MetricsCollector, interval: int = 60 ): self.dependency_provider = dependency_provider self.metrics_collector = metrics_collector self.interval = interval self.running = False self.logger = logging.getLogger(__name__) async def start(self): """Start background metrics collection.""" self.running = True self.logger.info("Background metrics collector started") while self.running: try: await self._collect_metrics() await asyncio.sleep(self.interval) except Exception as e: self.logger.error(f"Error in background metrics collection: {e}") await asyncio.sleep(self.interval) async def stop(self): """Stop background metrics collection.""" self.running = False self.logger.info("Background metrics collector stopped") async def _collect_metrics(self): """Collect periodic metrics using dependency injection.""" try: db = self.dependency_provider.get_db() if db: await self.metrics_collector.collect_user_metrics(db) else: self.logger.warning("Database not available for metrics collection") except Exception as e: self.logger.error(f"Error collecting metrics: {e}") class MetricsExporter: """HTTP server for exposing Prometheus metrics.""" def __init__(self, host: str = "0.0.0.0", port: int = 8000): self.host = host self.port = port self.app = web.Application() self.runner: Optional[web.AppRunner] = None self.site: Optional[web.TCPSite] = None self.logger = logging.getLogger(__name__) # Setup routes self.app.router.add_get('/metrics', self.metrics_handler) self.app.router.add_get('/health', self.health_handler) self.app.router.add_get('/', self.root_handler) async def start(self): """Start the metrics server.""" try: self.runner = web.AppRunner(self.app) await self.runner.setup() self.site = web.TCPSite(self.runner, self.host, self.port) await self.site.start() self.logger.info(f"Metrics server started on {self.host}:{self.port}") except Exception as e: self.logger.error(f"Failed to start metrics server: {e}") raise async def stop(self): """Stop the metrics server.""" if self.site: await self.site.stop() if self.runner: await self.runner.cleanup() self.logger.info("Metrics server stopped") async def metrics_handler(self, request: web.Request) -> web.Response: """Handle /metrics endpoint for Prometheus.""" try: metrics_data = metrics.get_metrics() self.logger.debug(f"Generated metrics: {len(metrics_data)} bytes") return web.Response( body=metrics_data, content_type='text/plain; version=0.0.4' ) except Exception as e: self.logger.error(f"Error generating metrics: {e}") return web.Response( text=f"Error generating metrics: {e}", status=500 ) async def health_handler(self, request: web.Request) -> web.Response: """Handle /health endpoint for health checks.""" return web.json_response({ "status": "healthy", "service": "telegram-bot-metrics" }) async def root_handler(self, request: web.Request) -> web.Response: """Handle root endpoint with basic info.""" return web.json_response({ "service": "Telegram Bot Metrics Exporter", "endpoints": { "/metrics": "Prometheus metrics", "/health": "Health check", "/": "This info" } }) class MetricsManager: """Main class for managing metrics collection and export.""" def __init__(self, host: str = "0.0.0.0", port: int = 8000): self.exporter = MetricsExporter(host, port) # Dependency injection setup from helper_bot.utils.base_dependency_factory import get_global_instance dependency_provider = get_global_instance() metrics_collector = UserMetricsCollector(logging.getLogger(__name__)) self.collector = BackgroundMetricsCollector( dependency_provider=dependency_provider, metrics_collector=metrics_collector ) self.logger = logging.getLogger(__name__) async def start(self): """Start metrics collection and export.""" try: # Start metrics exporter await self.exporter.start() # Start background collector asyncio.create_task(self.collector.start()) self.logger.info("Metrics manager started successfully") except Exception as e: self.logger.error(f"Failed to start metrics manager: {e}") raise async def stop(self): """Stop metrics collection and export.""" try: await self.collector.stop() await self.exporter.stop() self.logger.info("Metrics manager stopped successfully") except Exception as e: self.logger.error(f"Error stopping metrics manager: {e}") raise