Files
telegram-helper-bot/helper_bot/utils/metrics_exporter.py
Andrey 1c6a37bc12 Enhance bot functionality and refactor database interactions
- Added `ca-certificates` installation to Dockerfile for improved network security.
- Updated health check command in Dockerfile to include better timeout handling.
- Refactored `run_helper.py` to implement proper signal handling and logging during shutdown.
- Transitioned database operations to an asynchronous model in `async_db.py`, improving performance and responsiveness.
- Updated database schema to support new foreign key relationships and optimized indexing for better query performance.
- Enhanced various bot handlers to utilize async database methods, improving overall efficiency and user experience.
- Removed obsolete database and fix scripts to streamline the project structure.
2025-09-02 18:22:02 +03:00

291 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Metrics exporter for Prometheus.
Provides HTTP endpoint for metrics collection and background metrics collection.
"""
import asyncio
import logging
from aiohttp import web
from typing import Optional, Dict, Any, Protocol
from .metrics import metrics
import time
class DatabaseProvider(Protocol):
"""Protocol for database operations."""
async def fetch_one(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
"""Execute query and return single result."""
...
class MetricsCollector(Protocol):
"""Protocol for metrics collection operations."""
async def collect_user_metrics(self, db: DatabaseProvider) -> None:
"""Collect user-related metrics."""
...
class UserMetricsCollector:
"""Concrete implementation of user metrics collection."""
def __init__(self, logger: logging.Logger):
self.logger = logger
async def collect_user_metrics(self, db: DatabaseProvider) -> None:
"""Collect user-related metrics from database."""
try:
# Проверяем, есть ли метод fetch_one (асинхронная БД)
if hasattr(db, 'fetch_one'):
# Используем UNIX timestamp для сравнения с date_changed
current_timestamp = int(time.time())
one_day_ago = current_timestamp - (24 * 60 * 60) # 24 часа назад
active_users_query = """
SELECT COUNT(DISTINCT user_id) as active_users
FROM our_users
WHERE date_changed > ?
"""
result = await db.fetch_one(active_users_query, (one_day_ago,))
if result:
metrics.set_active_users(result['active_users'], 'daily')
self.logger.debug(f"Updated active users: {result['active_users']}")
else:
metrics.set_active_users(0, 'daily')
self.logger.debug("Updated active users: 0")
# Проверяем синхронную БД BotDB
elif hasattr(db, 'connect') and hasattr(db, 'cursor'):
# Используем синхронный запрос для BotDB в отдельном потоке
import asyncio
from concurrent.futures import ThreadPoolExecutor
current_timestamp = int(time.time())
one_day_ago = current_timestamp - (24 * 60 * 60) # 24 часа назад
active_users_query = """
SELECT COUNT(DISTINCT user_id) as active_users
FROM our_users
WHERE date_changed > ?
"""
def sync_db_query():
try:
db.connect()
db.cursor.execute(active_users_query, (one_day_ago,))
result = db.cursor.fetchone()
return result[0] if result else 0
finally:
db.close()
# Выполняем синхронный запрос в отдельном потоке
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, sync_db_query)
metrics.set_active_users(result, 'daily')
self.logger.debug(f"Updated active users: {result}")
else:
metrics.set_active_users(0, 'daily')
self.logger.warning("Database doesn't support fetch_one or connect methods")
except Exception as e:
self.logger.error(f"Error collecting user metrics: {e}")
metrics.set_active_users(0, 'daily')
class DependencyProvider(Protocol):
"""Protocol for dependency injection."""
def get_db(self) -> DatabaseProvider:
"""Get database instance."""
...
class BackgroundMetricsCollector:
"""Background service for collecting periodic metrics using dependency injection."""
def __init__(
self,
dependency_provider: DependencyProvider,
metrics_collector: MetricsCollector,
interval: int = 60
):
self.dependency_provider = dependency_provider
self.metrics_collector = metrics_collector
self.interval = interval
self.running = False
self.logger = logging.getLogger(__name__)
async def start(self):
"""Start background metrics collection."""
self.running = True
self.logger.info("Background metrics collector started")
while self.running:
try:
await self._collect_metrics()
await asyncio.sleep(self.interval)
except Exception as e:
self.logger.error(f"Error in background metrics collection: {e}")
await asyncio.sleep(self.interval)
async def stop(self):
"""Stop background metrics collection."""
self.running = False
self.logger.info("Background metrics collector stopped")
async def _collect_metrics(self):
"""Collect periodic metrics using dependency injection."""
try:
db = self.dependency_provider.get_db()
if db:
await self.metrics_collector.collect_user_metrics(db)
else:
self.logger.warning("Database not available for metrics collection")
except Exception as e:
self.logger.error(f"Error collecting metrics: {e}")
class MetricsExporter:
"""HTTP server for exposing Prometheus metrics."""
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
self.host = host
self.port = port
self.app = web.Application()
self.runner: Optional[web.AppRunner] = None
self.site: Optional[web.TCPSite] = None
self.logger = logging.getLogger(__name__)
# Setup routes
self.app.router.add_get('/metrics', self.metrics_handler)
self.app.router.add_get('/health', self.health_handler)
self.app.router.add_get('/', self.root_handler)
async def start(self):
"""Start the metrics server."""
try:
self.runner = web.AppRunner(self.app)
await self.runner.setup()
self.site = web.TCPSite(self.runner, self.host, self.port)
await self.site.start()
self.logger.info(f"Metrics server started on {self.host}:{self.port}")
except Exception as e:
self.logger.error(f"Failed to start metrics server: {e}")
raise
async def stop(self):
"""Stop the metrics server."""
try:
if self.site:
await self.site.stop()
self.logger.info("Metrics server site stopped")
if self.runner:
await self.runner.cleanup()
self.logger.info("Metrics server runner cleaned up")
except Exception as e:
self.logger.error(f"Error stopping metrics server: {e}")
finally:
# Очищаем ссылки
self.site = None
self.runner = None
# Даем время на закрытие всех соединений
await asyncio.sleep(0.1)
self.logger.info("Metrics server stopped")
async def metrics_handler(self, request: web.Request) -> web.Response:
"""Handle /metrics endpoint for Prometheus."""
try:
metrics_data = metrics.get_metrics()
self.logger.debug(f"Generated metrics: {len(metrics_data)} bytes")
return web.Response(
body=metrics_data,
content_type='text/plain; version=0.0.4'
)
except Exception as e:
self.logger.error(f"Error generating metrics: {e}")
return web.Response(
text=f"Error generating metrics: {e}",
status=500
)
async def health_handler(self, request: web.Request) -> web.Response:
"""Handle /health endpoint for health checks."""
return web.json_response({
"status": "healthy",
"service": "telegram-bot-metrics"
})
async def root_handler(self, request: web.Request) -> web.Response:
"""Handle root endpoint with basic info."""
return web.json_response({
"service": "Telegram Bot Metrics Exporter",
"endpoints": {
"/metrics": "Prometheus metrics",
"/health": "Health check",
"/": "This info"
}
})
class MetricsManager:
"""Main class for managing metrics collection and export."""
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
self.exporter = MetricsExporter(host, port)
# Dependency injection setup
from helper_bot.utils.base_dependency_factory import get_global_instance
dependency_provider = get_global_instance()
metrics_collector = UserMetricsCollector(logging.getLogger(__name__))
self.collector = BackgroundMetricsCollector(
dependency_provider=dependency_provider,
metrics_collector=metrics_collector
)
self.logger = logging.getLogger(__name__)
async def start(self):
"""Start metrics collection and export."""
try:
# Start metrics exporter
await self.exporter.start()
# Start background collector
asyncio.create_task(self.collector.start())
self.logger.info("Metrics manager started successfully")
except Exception as e:
self.logger.error(f"Failed to start metrics manager: {e}")
raise
async def stop(self):
"""Stop metrics collection and export."""
try:
# Останавливаем background collector
if hasattr(self, 'collector'):
await self.collector.stop()
self.logger.info("Background metrics collector stopped")
# Останавливаем exporter
if hasattr(self, 'exporter'):
await self.exporter.stop()
self.logger.info("Metrics exporter stopped")
except Exception as e:
self.logger.error(f"Error stopping metrics manager: {e}")
# Не вызываем raise, чтобы не прерывать процесс завершения
finally:
# Очищаем ссылки
self.collector = None
self.exporter = None
self.logger.info("Metrics manager stopped successfully")