- Added a new `/status` endpoint in `server_prometheus.py` to provide process status information, including uptime and resource usage metrics. - Implemented a PID manager in `run_helper.py` to track the bot's process, improving monitoring capabilities. - Introduced a method to delete audio moderation records in `audio_repository.py`, enhancing database management. - Updated voice message handling in callback handlers to ensure proper deletion of audio moderation records. - Improved error handling and logging in various services, ensuring better tracking of media processing and file downloads. - Refactored media handling functions to streamline operations and improve code readability. - Enhanced metrics tracking for file downloads and media processing, providing better insights into bot performance.
262 lines
9.6 KiB
Python
262 lines
9.6 KiB
Python
|
||
"""
|
||
HTTP server for metrics endpoint integration with centralized Prometheus monitoring.
|
||
Provides /metrics endpoint and health check for the bot.
|
||
"""
|
||
|
||
import asyncio
|
||
from aiohttp import web
|
||
from typing import Optional
|
||
from .utils.metrics import metrics
|
||
|
||
# Импортируем логгер из проекта
|
||
try:
|
||
from logs.custom_logger import logger
|
||
except ImportError:
|
||
# Fallback для случаев, когда custom_logger недоступен
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MetricsServer:
|
||
"""HTTP server for Prometheus metrics and health checks."""
|
||
|
||
def __init__(self, host: str = '0.0.0.0', port: int = 8080):
|
||
self.host = host
|
||
self.port = port
|
||
self.app = web.Application()
|
||
self.runner: Optional[web.AppRunner] = None
|
||
self.site: Optional[web.TCPSite] = None
|
||
|
||
# Настраиваем роуты
|
||
self.app.router.add_get('/metrics', self.metrics_handler)
|
||
self.app.router.add_get('/health', self.health_handler)
|
||
self.app.router.add_get('/status', self.status_handler)
|
||
|
||
async def metrics_handler(self, request: web.Request) -> web.Response:
|
||
"""Handle /metrics endpoint for Prometheus scraping."""
|
||
try:
|
||
logger.debug("Generating metrics...")
|
||
|
||
# Проверяем, что metrics доступен
|
||
if not metrics:
|
||
logger.error("Metrics object is not available")
|
||
return web.Response(
|
||
text="Metrics not available",
|
||
status=500
|
||
)
|
||
|
||
# Генерируем метрики в формате Prometheus
|
||
metrics_data = metrics.get_metrics()
|
||
logger.debug(f"Generated metrics: {len(metrics_data)} bytes")
|
||
|
||
return web.Response(
|
||
body=metrics_data,
|
||
content_type='text/plain; version=0.0.4'
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Error generating metrics: {e}")
|
||
import traceback
|
||
logger.error(f"Traceback: {traceback.format_exc()}")
|
||
return web.Response(
|
||
text=f"Error generating metrics: {e}",
|
||
status=500
|
||
)
|
||
|
||
async def health_handler(self, request: web.Request) -> web.Response:
|
||
"""Handle /health endpoint for health checks."""
|
||
try:
|
||
# Проверяем доступность метрик
|
||
if not metrics:
|
||
return web.Response(
|
||
text="ERROR: Metrics not available",
|
||
content_type='text/plain',
|
||
status=503
|
||
)
|
||
|
||
# Проверяем, что можем получить метрики
|
||
try:
|
||
metrics_data = metrics.get_metrics()
|
||
if not metrics_data:
|
||
return web.Response(
|
||
text="ERROR: Empty metrics",
|
||
content_type='text/plain',
|
||
status=503
|
||
)
|
||
except Exception as e:
|
||
return web.Response(
|
||
text=f"ERROR: Metrics generation failed: {e}",
|
||
content_type='text/plain',
|
||
status=503
|
||
)
|
||
|
||
return web.Response(
|
||
text="OK",
|
||
content_type='text/plain',
|
||
status=200
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Health check failed: {e}")
|
||
return web.Response(
|
||
text=f"ERROR: Health check failed: {e}",
|
||
content_type='text/plain',
|
||
status=500
|
||
)
|
||
|
||
async def status_handler(self, request: web.Request) -> web.Response:
|
||
"""Handle /status endpoint for process status information."""
|
||
try:
|
||
import os
|
||
import time
|
||
import psutil
|
||
|
||
# Получаем PID текущего процесса
|
||
current_pid = os.getpid()
|
||
|
||
try:
|
||
# Получаем информацию о процессе
|
||
process = psutil.Process(current_pid)
|
||
create_time = process.create_time()
|
||
uptime_seconds = time.time() - create_time
|
||
|
||
# Логируем для диагностики
|
||
import datetime
|
||
create_time_str = datetime.datetime.fromtimestamp(create_time).strftime('%Y-%m-%d %H:%M:%S')
|
||
current_time_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
logger.info(f"Process PID {current_pid}: created at {create_time_str}, current time {current_time_str}, uptime {uptime_seconds:.1f}s")
|
||
|
||
# Форматируем uptime
|
||
if uptime_seconds < 60:
|
||
uptime_str = f"{int(uptime_seconds)}с"
|
||
elif uptime_seconds < 3600:
|
||
minutes = int(uptime_seconds // 60)
|
||
uptime_str = f"{minutes}м"
|
||
elif uptime_seconds < 86400:
|
||
hours = int(uptime_seconds // 3600)
|
||
minutes = int((uptime_seconds % 3600) // 60)
|
||
uptime_str = f"{hours}ч {minutes}м"
|
||
else:
|
||
days = int(uptime_seconds // 86400)
|
||
hours = int((uptime_seconds % 86400) // 3600)
|
||
uptime_str = f"{days}д {hours}ч"
|
||
|
||
# Проверяем, что процесс активен
|
||
if process.is_running():
|
||
status = "running"
|
||
else:
|
||
status = "stopped"
|
||
|
||
# Формируем ответ
|
||
response_data = {
|
||
"status": status,
|
||
"pid": current_pid,
|
||
"uptime": uptime_str,
|
||
"memory_usage_mb": round(process.memory_info().rss / 1024 / 1024, 2),
|
||
"cpu_percent": process.cpu_percent(),
|
||
"timestamp": time.time()
|
||
}
|
||
|
||
import json
|
||
return web.Response(
|
||
text=json.dumps(response_data, ensure_ascii=False),
|
||
content_type='application/json',
|
||
status=200
|
||
)
|
||
|
||
except psutil.NoSuchProcess:
|
||
# Процесс не найден
|
||
response_data = {
|
||
"status": "not_found",
|
||
"error": "Process not found",
|
||
"timestamp": time.time()
|
||
}
|
||
|
||
import json
|
||
return web.Response(
|
||
text=json.dumps(response_data, ensure_ascii=False),
|
||
content_type='application/json',
|
||
status=404
|
||
)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Status check failed: {e}")
|
||
import json
|
||
response_data = {
|
||
"status": "error",
|
||
"error": str(e),
|
||
"timestamp": time.time()
|
||
}
|
||
|
||
return web.Response(
|
||
text=json.dumps(response_data, ensure_ascii=False),
|
||
content_type='application/json',
|
||
status=500
|
||
)
|
||
|
||
async def start(self) -> None:
|
||
"""Start the HTTP server."""
|
||
try:
|
||
self.runner = web.AppRunner(self.app)
|
||
await self.runner.setup()
|
||
|
||
self.site = web.TCPSite(self.runner, self.host, self.port)
|
||
await self.site.start()
|
||
|
||
logger.info(f"Metrics server started on {self.host}:{self.port}")
|
||
logger.info("Available endpoints:")
|
||
logger.info(f" - /metrics - Prometheus metrics")
|
||
logger.info(f" - /health - Health check")
|
||
logger.info(f" - /status - Process status")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Failed to start metrics server: {e}")
|
||
raise
|
||
|
||
async def stop(self) -> None:
|
||
"""Stop the HTTP server."""
|
||
try:
|
||
if self.site:
|
||
await self.site.stop()
|
||
logger.info("Metrics server site stopped")
|
||
|
||
if self.runner:
|
||
await self.runner.cleanup()
|
||
logger.info("Metrics server runner cleaned up")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Error stopping metrics server: {e}")
|
||
|
||
async def __aenter__(self):
|
||
"""Async context manager entry."""
|
||
await self.start()
|
||
return self
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
"""Async context manager exit."""
|
||
await self.stop()
|
||
|
||
|
||
# Глобальный экземпляр сервера для использования в main.py
|
||
metrics_server: Optional[MetricsServer] = None
|
||
|
||
|
||
async def start_metrics_server(host: str = '0.0.0.0', port: int = 8080) -> MetricsServer:
|
||
"""Start metrics server and return instance."""
|
||
global metrics_server
|
||
metrics_server = MetricsServer(host, port)
|
||
await metrics_server.start()
|
||
return metrics_server
|
||
|
||
|
||
async def stop_metrics_server() -> None:
|
||
"""Stop metrics server if running."""
|
||
global metrics_server
|
||
if metrics_server:
|
||
try:
|
||
await metrics_server.stop()
|
||
logger.info("Metrics server stopped successfully")
|
||
except Exception as e:
|
||
logger.error(f"Error stopping metrics server: {e}")
|
||
finally:
|
||
metrics_server = None
|