""" Сервис для периодического обновления метрик """ import asyncio import time from typing import Optional from .metrics import get_metrics_service from .database import DatabaseService from .logger import get_logger class MetricsUpdater: """Сервис для периодического обновления метрик""" def __init__(self, update_interval: int = 30, db_path: str = None): self.update_interval = update_interval self.metrics_service = get_metrics_service() self.database_service: Optional[DatabaseService] = None self.db_path = db_path self._running = False self._task: Optional[asyncio.Task] = None self.logger = get_logger(__name__) async def start(self): """Запустить обновление метрик""" if self._running: self.logger.warning("MetricsUpdater уже запущен") return # Создаем DatabaseService если путь к БД указан if self.db_path: self.database_service = DatabaseService(self.db_path) await self.database_service.init() self._running = True self._task = asyncio.create_task(self._update_loop()) self.logger.info(f"📊 MetricsUpdater запущен с интервалом {self.update_interval} секунд") async def stop(self): """Остановить обновление метрик""" if not self._running: return self._running = False if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass # Логгер недоступен в stop, так как объект может быть уже уничтожен pass async def _update_loop(self): """Основной цикл обновления метрик""" while self._running: try: await self._update_metrics() await asyncio.sleep(self.update_interval) except asyncio.CancelledError: break except Exception as e: self.logger.error(f"Ошибка при обновлении метрик: {e}") await asyncio.sleep(self.update_interval) async def _update_metrics(self): """Обновление всех метрик""" try: # Обновляем активных пользователей await self._update_active_users() # Обновляем активные вопросы await self._update_active_questions() # Обновляем метрики БД await self._update_database_metrics() except Exception as e: self.logger.error(f"Ошибка при обновлении метрик: {e}") self.metrics_service.increment_errors(type(e).__name__, "metrics_updater") async def _update_active_users(self): """Обновление количества активных пользователей""" try: if not self.database_service: return # Подсчитываем активных пользователей за последние 24 часа async with self.database_service.get_connection() as conn: cursor = await conn.execute(""" SELECT COUNT(*) FROM users WHERE is_active = 1 AND updated_at > datetime('now', '-24 hours') """) result = await cursor.fetchone() active_users_count = result[0] if result else 0 self.metrics_service.set_active_users(active_users_count) self.logger.debug(f"Обновлено количество активных пользователей: {active_users_count}") except Exception as e: self.logger.error(f"Ошибка при обновлении активных пользователей: {e}") async def _update_active_questions(self): """Обновление количества активных вопросов""" try: if not self.database_service: return # Подсчитываем активные вопросы (pending и processing) async with self.database_service.get_connection() as conn: cursor = await conn.execute(""" SELECT COUNT(*) FROM questions WHERE status IN ('pending', 'processing') """) result = await cursor.fetchone() active_questions_count = result[0] if result else 0 self.metrics_service.set_active_questions(active_questions_count) self.logger.debug(f"Обновлено количество активных вопросов: {active_questions_count}") except Exception as e: self.logger.error(f"Ошибка при обновлении активных вопросов: {e}") async def _update_database_metrics(self): """Обновление метрик базы данных""" try: if not self.database_service: return # Проверяем соединение с БД start_time = time.time() try: await self.database_service.check_connection() duration = time.time() - start_time # Записываем успешное соединение (только для статистики, не для активных соединений) self.metrics_service.record_db_query("health_check", "connection", "success", duration) # Обновляем метрики пула соединений await self._update_pool_metrics() except Exception as e: duration = time.time() - start_time # Записываем неудачное соединение (только для статистики) self.metrics_service.record_db_query("health_check", "connection", "error", duration) self.metrics_service.increment_errors(type(e).__name__, "database_health_check") except Exception as e: self.logger.error(f"Ошибка при обновлении метрик БД: {e}") async def _update_pool_metrics(self): """Обновление метрик пула соединений""" try: from database.crud import get_connection_pool pool = get_connection_pool(self.database_service.db_path) pool_stats = pool.get_pool_stats() self.metrics_service.update_db_pool_metrics(pool_stats) # Обновляем реальное количество активных соединений из пула created_connections = pool_stats.get("created_connections", 0) self.metrics_service.update_db_connections_from_pool(created_connections) # Логируем предупреждение если утилизация пула превышает 80% if pool_stats.get("utilization_percent", 0) > 80: self.logger.warning(f"Высокая утилизация пула соединений: {pool_stats}") except Exception as e: self.logger.error(f"Ошибка при обновлении метрик пула: {e}") # Глобальный экземпляр _metrics_updater: Optional[MetricsUpdater] = None def get_metrics_updater(update_interval: int = 30, db_path: str = None) -> MetricsUpdater: """Получить экземпляр MetricsUpdater""" global _metrics_updater if _metrics_updater is None: _metrics_updater = MetricsUpdater(update_interval, db_path) return _metrics_updater async def start_metrics_updater(update_interval: int = 30, db_path: str = None): """Запустить обновление метрик""" updater = get_metrics_updater(update_interval, db_path) await updater.start() async def stop_metrics_updater(): """Остановить обновление метрик""" global _metrics_updater if _metrics_updater: await _metrics_updater.stop() _metrics_updater = None