- Updated `.dockerignore` to include additional development and temporary files, enhancing build efficiency. - Modified `.gitignore` to remove unnecessary entries and streamline ignored files. - Enhanced `docker-compose.yml` with health checks, resource limits, and improved environment variable handling for better service management. - Refactored `Dockerfile.bot` to utilize a multi-stage build for optimized image size and security. - Improved `Makefile` with new commands for deployment, migration, and backup, along with enhanced help documentation. - Updated `requirements.txt` to include new dependencies for environment variable management. - Refactored metrics handling in the bot to ensure proper initialization and collection.
259 lines
9.4 KiB
Python
259 lines
9.4 KiB
Python
"""
|
|
Metrics exporter for Prometheus.
|
|
Provides HTTP endpoint for metrics collection and background metrics collection.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from aiohttp import web
|
|
from typing import Optional, Dict, Any, Protocol
|
|
from .metrics import metrics
|
|
|
|
|
|
class DatabaseProvider(Protocol):
|
|
"""Protocol for database operations."""
|
|
|
|
async def fetch_one(self, query: str) -> Optional[Dict[str, Any]]:
|
|
"""Execute query and return single result."""
|
|
...
|
|
|
|
|
|
class MetricsCollector(Protocol):
|
|
"""Protocol for metrics collection operations."""
|
|
|
|
async def collect_user_metrics(self, db: DatabaseProvider) -> None:
|
|
"""Collect user-related metrics."""
|
|
...
|
|
|
|
|
|
class UserMetricsCollector:
|
|
"""Concrete implementation of user metrics collection."""
|
|
|
|
def __init__(self, logger: logging.Logger):
|
|
self.logger = logger
|
|
|
|
async def collect_user_metrics(self, db: DatabaseProvider) -> None:
|
|
"""Collect user-related metrics from database."""
|
|
try:
|
|
# Проверяем, есть ли метод fetch_one (асинхронная БД)
|
|
if hasattr(db, 'fetch_one'):
|
|
active_users_query = """
|
|
SELECT COUNT(DISTINCT user_id) as active_users
|
|
FROM our_users
|
|
WHERE date_changed > datetime('now', '-1 day')
|
|
"""
|
|
result = await db.fetch_one(active_users_query)
|
|
if result:
|
|
metrics.set_active_users(result['active_users'], 'daily')
|
|
self.logger.debug(f"Updated active users: {result['active_users']}")
|
|
else:
|
|
metrics.set_active_users(0, 'daily')
|
|
self.logger.debug("Updated active users: 0")
|
|
# Проверяем синхронную БД BotDB
|
|
elif hasattr(db, 'connect') and hasattr(db, 'cursor'):
|
|
# Используем синхронный запрос для BotDB в отдельном потоке
|
|
import asyncio
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
active_users_query = """
|
|
SELECT COUNT(DISTINCT user_id) as active_users
|
|
FROM our_users
|
|
WHERE date_changed > datetime('now', '-1 day')
|
|
"""
|
|
|
|
def sync_db_query():
|
|
try:
|
|
db.connect()
|
|
db.cursor.execute(active_users_query)
|
|
result = db.cursor.fetchone()
|
|
return result[0] if result else 0
|
|
finally:
|
|
db.close()
|
|
|
|
# Выполняем синхронный запрос в отдельном потоке
|
|
loop = asyncio.get_event_loop()
|
|
with ThreadPoolExecutor() as executor:
|
|
result = await loop.run_in_executor(executor, sync_db_query)
|
|
|
|
metrics.set_active_users(result, 'daily')
|
|
self.logger.debug(f"Updated active users: {result}")
|
|
else:
|
|
metrics.set_active_users(0, 'daily')
|
|
self.logger.warning("Database doesn't support fetch_one or connect methods")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error collecting user metrics: {e}")
|
|
metrics.set_active_users(0, 'daily')
|
|
|
|
|
|
class DependencyProvider(Protocol):
|
|
"""Protocol for dependency injection."""
|
|
|
|
def get_db(self) -> DatabaseProvider:
|
|
"""Get database instance."""
|
|
...
|
|
|
|
|
|
class BackgroundMetricsCollector:
|
|
"""Background service for collecting periodic metrics using dependency injection."""
|
|
|
|
def __init__(
|
|
self,
|
|
dependency_provider: DependencyProvider,
|
|
metrics_collector: MetricsCollector,
|
|
interval: int = 60
|
|
):
|
|
self.dependency_provider = dependency_provider
|
|
self.metrics_collector = metrics_collector
|
|
self.interval = interval
|
|
self.running = False
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def start(self):
|
|
"""Start background metrics collection."""
|
|
self.running = True
|
|
self.logger.info("Background metrics collector started")
|
|
|
|
while self.running:
|
|
try:
|
|
await self._collect_metrics()
|
|
await asyncio.sleep(self.interval)
|
|
except Exception as e:
|
|
self.logger.error(f"Error in background metrics collection: {e}")
|
|
await asyncio.sleep(self.interval)
|
|
|
|
async def stop(self):
|
|
"""Stop background metrics collection."""
|
|
self.running = False
|
|
self.logger.info("Background metrics collector stopped")
|
|
|
|
async def _collect_metrics(self):
|
|
"""Collect periodic metrics using dependency injection."""
|
|
try:
|
|
db = self.dependency_provider.get_db()
|
|
if db:
|
|
await self.metrics_collector.collect_user_metrics(db)
|
|
else:
|
|
self.logger.warning("Database not available for metrics collection")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error collecting metrics: {e}")
|
|
|
|
|
|
class MetricsExporter:
|
|
"""HTTP server for exposing Prometheus metrics."""
|
|
|
|
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
|
|
self.host = host
|
|
self.port = port
|
|
self.app = web.Application()
|
|
self.runner: Optional[web.AppRunner] = None
|
|
self.site: Optional[web.TCPSite] = None
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Setup routes
|
|
self.app.router.add_get('/metrics', self.metrics_handler)
|
|
self.app.router.add_get('/health', self.health_handler)
|
|
self.app.router.add_get('/', self.root_handler)
|
|
|
|
async def start(self):
|
|
"""Start the metrics server."""
|
|
try:
|
|
self.runner = web.AppRunner(self.app)
|
|
await self.runner.setup()
|
|
|
|
self.site = web.TCPSite(self.runner, self.host, self.port)
|
|
await self.site.start()
|
|
|
|
self.logger.info(f"Metrics server started on {self.host}:{self.port}")
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to start metrics server: {e}")
|
|
raise
|
|
|
|
async def stop(self):
|
|
"""Stop the metrics server."""
|
|
if self.site:
|
|
await self.site.stop()
|
|
if self.runner:
|
|
await self.runner.cleanup()
|
|
self.logger.info("Metrics server stopped")
|
|
|
|
async def metrics_handler(self, request: web.Request) -> web.Response:
|
|
"""Handle /metrics endpoint for Prometheus."""
|
|
try:
|
|
metrics_data = metrics.get_metrics()
|
|
self.logger.debug(f"Generated metrics: {len(metrics_data)} bytes")
|
|
|
|
return web.Response(
|
|
body=metrics_data,
|
|
content_type='text/plain; version=0.0.4'
|
|
)
|
|
except Exception as e:
|
|
self.logger.error(f"Error generating metrics: {e}")
|
|
return web.Response(
|
|
text=f"Error generating metrics: {e}",
|
|
status=500
|
|
)
|
|
|
|
async def health_handler(self, request: web.Request) -> web.Response:
|
|
"""Handle /health endpoint for health checks."""
|
|
return web.json_response({
|
|
"status": "healthy",
|
|
"service": "telegram-bot-metrics"
|
|
})
|
|
|
|
async def root_handler(self, request: web.Request) -> web.Response:
|
|
"""Handle root endpoint with basic info."""
|
|
return web.json_response({
|
|
"service": "Telegram Bot Metrics Exporter",
|
|
"endpoints": {
|
|
"/metrics": "Prometheus metrics",
|
|
"/health": "Health check",
|
|
"/": "This info"
|
|
}
|
|
})
|
|
|
|
|
|
class MetricsManager:
|
|
"""Main class for managing metrics collection and export."""
|
|
|
|
def __init__(self, host: str = "0.0.0.0", port: int = 8000):
|
|
self.exporter = MetricsExporter(host, port)
|
|
|
|
# Dependency injection setup
|
|
from helper_bot.utils.base_dependency_factory import get_global_instance
|
|
dependency_provider = get_global_instance()
|
|
metrics_collector = UserMetricsCollector(logging.getLogger(__name__))
|
|
|
|
self.collector = BackgroundMetricsCollector(
|
|
dependency_provider=dependency_provider,
|
|
metrics_collector=metrics_collector
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def start(self):
|
|
"""Start metrics collection and export."""
|
|
try:
|
|
# Start metrics exporter
|
|
await self.exporter.start()
|
|
|
|
# Start background collector
|
|
asyncio.create_task(self.collector.start())
|
|
|
|
self.logger.info("Metrics manager started successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to start metrics manager: {e}")
|
|
raise
|
|
|
|
async def stop(self):
|
|
"""Stop metrics collection and export."""
|
|
try:
|
|
await self.collector.stop()
|
|
await self.exporter.stop()
|
|
self.logger.info("Metrics manager stopped successfully")
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error stopping metrics manager: {e}")
|
|
raise
|