261 lines
11 KiB
Python
261 lines
11 KiB
Python
"""
|
|
HTTP сервер для эндпоинтов метрик и health check
|
|
"""
|
|
import asyncio
|
|
import time
|
|
from typing import Optional
|
|
|
|
from aiohttp import ClientSession, web
|
|
from aiohttp.web import Request, Response, json_response
|
|
from loguru import logger
|
|
|
|
from config.constants import DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, APP_VERSION, HTTP_STATUS_OK, HTTP_STATUS_SERVICE_UNAVAILABLE, HTTP_STATUS_INTERNAL_SERVER_ERROR
|
|
from dependencies import get_database_service
|
|
from .metrics import get_metrics_service
|
|
|
|
|
|
class HTTPServer:
|
|
"""HTTP сервер для метрик и health check"""
|
|
|
|
def __init__(self, host: str = DEFAULT_HTTP_HOST, port: int = DEFAULT_HTTP_PORT):
|
|
self.host = host
|
|
self.port = port
|
|
self.app = web.Application()
|
|
self.metrics_service = get_metrics_service()
|
|
self.database_service = get_database_service()
|
|
self.start_time = time.time()
|
|
self._setup_routes()
|
|
|
|
def _setup_routes(self):
|
|
"""Настройка маршрутов"""
|
|
self.app.router.add_get('/metrics', self.metrics_handler)
|
|
self.app.router.add_get('/health', self.health_handler)
|
|
self.app.router.add_get('/ready', self.ready_handler)
|
|
self.app.router.add_get('/', self.root_handler)
|
|
|
|
async def metrics_handler(self, request: Request) -> Response:
|
|
"""Обработчик эндпоинта /metrics"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Получаем метрики
|
|
metrics_data = self.metrics_service.get_metrics()
|
|
if isinstance(metrics_data, bytes):
|
|
metrics_data = metrics_data.decode('utf-8')
|
|
|
|
# Записываем метрику HTTP запроса
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/metrics", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/metrics", HTTP_STATUS_OK)
|
|
|
|
return Response(
|
|
text=metrics_data,
|
|
content_type='text/plain; version=0.0.4',
|
|
status=HTTP_STATUS_OK
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in metrics handler: {e}")
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/metrics", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/metrics", HTTP_STATUS_INTERNAL_SERVER_ERROR)
|
|
self.metrics_service.increment_errors(type(e).__name__, "metrics_handler")
|
|
|
|
return Response(
|
|
text="Internal Server Error",
|
|
status=HTTP_STATUS_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
async def health_handler(self, request: Request) -> Response:
|
|
"""Обработчик эндпоинта /health"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Проверяем состояние сервисов
|
|
health_status = {
|
|
"status": "healthy",
|
|
"timestamp": time.time(),
|
|
"uptime": time.time() - self.start_time,
|
|
"version": APP_VERSION,
|
|
"services": {}
|
|
}
|
|
|
|
# Проверяем базу данных
|
|
try:
|
|
await self.database_service.check_connection()
|
|
health_status["services"]["database"] = "healthy"
|
|
except Exception as e:
|
|
health_status["services"]["database"] = f"unhealthy: {str(e)}"
|
|
health_status["status"] = "unhealthy"
|
|
|
|
# Проверяем метрики
|
|
try:
|
|
self.metrics_service.get_metrics()
|
|
health_status["services"]["metrics"] = "healthy"
|
|
except Exception as e:
|
|
health_status["services"]["metrics"] = f"unhealthy: {str(e)}"
|
|
health_status["status"] = "unhealthy"
|
|
|
|
# Определяем HTTP статус
|
|
http_status = HTTP_STATUS_OK if health_status["status"] == "healthy" else HTTP_STATUS_SERVICE_UNAVAILABLE
|
|
|
|
# Записываем метрику HTTP запроса
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/health", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/health", http_status)
|
|
|
|
return json_response(
|
|
health_status,
|
|
status=http_status
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in health handler: {e}")
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/health", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/health", 500)
|
|
self.metrics_service.increment_errors(type(e).__name__, "health_handler")
|
|
|
|
return json_response(
|
|
{"status": "error", "message": str(e)},
|
|
status=HTTP_STATUS_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
async def ready_handler(self, request: Request) -> Response:
|
|
"""Обработчик эндпоинта /ready (readiness probe)"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Проверяем готовность сервисов
|
|
ready_status = {
|
|
"status": "ready",
|
|
"timestamp": time.time(),
|
|
"services": {}
|
|
}
|
|
|
|
# Проверяем базу данных
|
|
try:
|
|
await self.database_service.check_connection()
|
|
ready_status["services"]["database"] = "ready"
|
|
except Exception as e:
|
|
ready_status["services"]["database"] = f"not_ready: {str(e)}"
|
|
ready_status["status"] = "not_ready"
|
|
|
|
# Определяем HTTP статус
|
|
http_status = HTTP_STATUS_OK if ready_status["status"] == "ready" else HTTP_STATUS_SERVICE_UNAVAILABLE
|
|
|
|
# Записываем метрику HTTP запроса
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/ready", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/ready", http_status)
|
|
|
|
return json_response(
|
|
ready_status,
|
|
status=http_status
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in ready handler: {e}")
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/ready", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/ready", 500)
|
|
self.metrics_service.increment_errors(type(e).__name__, "ready_handler")
|
|
|
|
return json_response(
|
|
{"status": "error", "message": str(e)},
|
|
status=HTTP_STATUS_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
|
|
async def root_handler(self, request: Request) -> Response:
|
|
"""Обработчик корневого эндпоинта"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
info = {
|
|
"service": "AnonBot",
|
|
"version": APP_VERSION,
|
|
"endpoints": {
|
|
"metrics": "/metrics",
|
|
"health": "/health",
|
|
"ready": "/ready"
|
|
},
|
|
"uptime": time.time() - self.start_time
|
|
}
|
|
|
|
# Записываем метрику HTTP запроса
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/", 200)
|
|
|
|
return json_response(
|
|
info,
|
|
status=HTTP_STATUS_OK
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in root handler: {e}")
|
|
duration = time.time() - start_time
|
|
self.metrics_service.record_http_request_duration("GET", "/", duration)
|
|
self.metrics_service.increment_http_requests("GET", "/", 500)
|
|
self.metrics_service.increment_errors(type(e).__name__, "root_handler")
|
|
|
|
return json_response(
|
|
{"error": str(e)},
|
|
status=HTTP_STATUS_INTERNAL_SERVER_ERROR
|
|
)
|
|
|
|
async def start(self):
|
|
"""Запуск HTTP сервера"""
|
|
try:
|
|
runner = web.AppRunner(self.app)
|
|
await runner.setup()
|
|
site = web.TCPSite(runner, self.host, self.port)
|
|
await site.start()
|
|
|
|
logger.info(f"HTTP server started on {self.host}:{self.port}")
|
|
logger.info(f"Metrics endpoint: http://{self.host}:{self.port}/metrics")
|
|
logger.info(f"Health endpoint: http://{self.host}:{self.port}/health")
|
|
logger.info(f"Ready endpoint: http://{self.host}:{self.port}/ready")
|
|
logger.info(f"Status endpoint: http://{self.host}:{self.port}/status")
|
|
|
|
return runner
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start HTTP server: {e}")
|
|
self.metrics_service.increment_errors(type(e).__name__, "http_server")
|
|
raise
|
|
|
|
async def stop(self, runner: web.AppRunner):
|
|
"""Остановка HTTP сервера"""
|
|
try:
|
|
await runner.cleanup()
|
|
logger.info("HTTP server stopped")
|
|
except Exception as e:
|
|
logger.error(f"Error stopping HTTP server: {e}")
|
|
self.metrics_service.increment_errors(type(e).__name__, "http_server")
|
|
|
|
|
|
# Глобальный экземпляр HTTP сервера
|
|
_http_server: Optional[HTTPServer] = None
|
|
|
|
|
|
def get_http_server(host: str = DEFAULT_HTTP_HOST, port: int = DEFAULT_HTTP_PORT) -> HTTPServer:
|
|
"""Получить экземпляр HTTP сервера"""
|
|
global _http_server
|
|
if _http_server is None:
|
|
_http_server = HTTPServer(host, port)
|
|
return _http_server
|
|
|
|
|
|
async def start_http_server(host: str = DEFAULT_HTTP_HOST, port: int = DEFAULT_HTTP_PORT) -> web.AppRunner:
|
|
"""Запустить HTTP сервер"""
|
|
server = get_http_server(host, port)
|
|
return await server.start()
|
|
|
|
|
|
async def stop_http_server(runner: web.AppRunner):
|
|
"""Остановить HTTP сервер"""
|
|
server = get_http_server()
|
|
await server.stop(runner)
|