Files
prod/tests/infra/test_prometheus_integration.py

438 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Интеграционные тесты для Prometheus и связанных компонентов
"""
import pytest
import pytest_asyncio
import asyncio
import sys
import os
import tempfile
import yaml
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from pathlib import Path
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from prometheus_server import PrometheusServer
from metrics_collector import MetricsCollector
class TestPrometheusIntegration:
"""Интеграционные тесты для Prometheus"""
@pytest_asyncio.fixture
async def prometheus_server(self):
"""Создает экземпляр PrometheusServer для интеграционных тестов"""
server = PrometheusServer(host='127.0.0.1', port=0)
return server
@pytest.fixture
def metrics_collector(self):
"""Создает экземпляр MetricsCollector для интеграционных тестов"""
return MetricsCollector()
@pytest.fixture
def sample_prometheus_config(self):
"""Создает пример конфигурации Prometheus для тестов"""
return {
'global': {
'scrape_interval': '15s',
'evaluation_interval': '15s'
},
'scrape_configs': [
{
'job_name': 'test-infrastructure',
'static_configs': [
{
'targets': ['127.0.0.1:9091'],
'labels': {
'environment': 'test',
'service': 'test-monitoring'
}
}
],
'metrics_path': '/metrics',
'scrape_interval': '30s',
'scrape_timeout': '10s',
'honor_labels': True
}
]
}
@pytest.mark.integration
@pytest.mark.asyncio
async def test_prometheus_server_with_real_metrics_collector(self, prometheus_server):
"""Тест интеграции PrometheusServer с реальным MetricsCollector"""
# Получаем реальные метрики
metrics_data = prometheus_server.metrics_collector.get_metrics_data()
# Проверяем, что можем получить метрики
assert isinstance(metrics_data, dict)
# Форматируем метрики в Prometheus формат
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
# Проверяем, что метрики содержат системную информацию
assert '# HELP system_info System information' in prometheus_metrics
assert '# TYPE system_info gauge' in prometheus_metrics
# Проверяем, что есть хотя бы одна метрика
lines = prometheus_metrics.split('\n')
assert len(lines) >= 3 # system_info help, type, value
@pytest.mark.integration
def test_metrics_collector_system_integration(self, metrics_collector):
"""Тест интеграции MetricsCollector с системой"""
# Получаем системную информацию
system_info = metrics_collector.get_system_info()
# Проверяем, что получили словарь
assert isinstance(system_info, dict)
# Проверяем, что можем получить метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# Проверяем, что можем проверить алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
@pytest.mark.integration
def test_prometheus_metrics_format_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции форматирования метрик Prometheus"""
# Получаем реальные метрики
metrics_data = metrics_collector.get_metrics_data()
# Форматируем в Prometheus формат
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
# Проверяем структуру метрик
lines = prometheus_metrics.split('\n')
# Должна быть системная информация
system_info_lines = [line for line in lines if 'system_info' in line]
assert len(system_info_lines) >= 3 # help, type, value
# Проверяем, что метрики содержат правильные типы
type_lines = [line for line in lines if '# TYPE' in line]
assert len(type_lines) > 0
# Проверяем, что все метрики имеют правильный формат
metric_lines = [line for line in lines if line and not line.startswith('#')]
for line in metric_lines:
# Проверяем, что строка метрики содержит имя и значение
assert ' ' in line
parts = line.split(' ')
assert len(parts) >= 2
@pytest.mark.integration
def test_os_detection_integration(self):
"""Тест интеграции определения ОС"""
# Создаем коллектор с реальным определением ОС
collector = MetricsCollector()
# Проверяем, что ОС определена
assert collector.os_type in ["macos", "ubuntu", "unknown"]
# Проверяем, что можем получить информацию о диске
disk_info = collector._get_disk_usage()
if disk_info is not None:
assert hasattr(disk_info, 'total')
assert hasattr(disk_info, 'used')
assert hasattr(disk_info, 'free')
@pytest.mark.integration
def test_disk_io_calculation_integration(self, metrics_collector):
"""Тест интеграции расчета I/O диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Получаем текущую статистику диска
current_disk_io = metrics_collector._get_disk_io_counters()
if current_disk_io is not None:
# Рассчитываем скорость
read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io)
# Проверяем, что получили строки с единицами измерения
assert isinstance(read_speed, str)
assert isinstance(write_speed, str)
assert "/s" in read_speed
assert "/s" in write_speed
# Рассчитываем процент загрузки
io_percent = metrics_collector._calculate_disk_io_percent()
assert isinstance(io_percent, int)
assert 0 <= io_percent <= 100
@pytest.mark.integration
def test_process_monitoring_integration(self, metrics_collector):
"""Тест интеграции мониторинга процессов"""
# Проверяем статус процессов
for process_name in ['helper_bot']:
status, message = metrics_collector.check_process_status(process_name)
# Статус должен быть либо ✅, либо ❌
assert status in ["", ""]
# Сообщение должно быть строкой
assert isinstance(message, str)
@pytest.mark.integration
def test_alert_system_integration(self, metrics_collector):
"""Тест интеграции системы алертов"""
# Сбрасываем состояния алертов для чистого теста
metrics_collector.alert_states = {'cpu': False, 'ram': False, 'disk': False}
metrics_collector.alert_start_times = {'cpu': None, 'ram': None, 'disk': None}
# Устанавливаем минимальные задержки для тестов
metrics_collector.alert_delays = {'cpu': 0, 'ram': 0, 'disk': 0}
# Создаем тестовые данные
test_system_info = {
'cpu_percent': 85.0, # Выше порога
'ram_percent': 60.0, # Ниже порога
'disk_percent': 70.0, # Ниже порога
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 300.0
}
# Проверяем алерты
alerts, recoveries = metrics_collector.check_alerts(test_system_info)
# Должен быть хотя бы один алерт для CPU
assert len(alerts) >= 1
assert any(alert[0] == 'cpu' for alert in alerts)
# Проверяем, что состояние алерта изменилось
assert metrics_collector.alert_states['cpu'] is True
# Тестируем восстановление
test_system_info['cpu_percent'] = 70.0 # Ниже recovery threshold
alerts, recoveries = metrics_collector.check_alerts(test_system_info)
# Должно быть восстановление
assert len(recoveries) >= 1
assert any(recovery[0] == 'cpu' for recovery in recoveries)
assert metrics_collector.alert_states['cpu'] is False
@pytest.mark.integration
def test_uptime_calculation_integration(self, metrics_collector):
"""Тест интеграции расчета uptime"""
# Получаем uptime системы
system_uptime = metrics_collector._get_system_uptime()
assert system_uptime > 0
# Получаем uptime мониторинга
monitor_uptime = metrics_collector.get_monitor_uptime()
assert isinstance(monitor_uptime, str)
assert len(monitor_uptime) > 0
# Форматируем uptime
formatted_uptime = metrics_collector._format_uptime(system_uptime)
assert isinstance(formatted_uptime, str)
assert len(formatted_uptime) > 0
@pytest.mark.integration
def test_environment_variables_integration(self):
"""Тест интеграции с переменными окружения"""
# Тестируем с пользовательскими значениями
test_threshold = '90.0'
test_recovery_threshold = '85.0'
with patch.dict(os.environ, {
'THRESHOLD': test_threshold,
'RECOVERY_THRESHOLD': test_recovery_threshold
}):
collector = MetricsCollector()
# Проверяем, что значения установлены
assert collector.threshold == float(test_threshold)
assert collector.recovery_threshold == float(test_recovery_threshold)
@pytest.mark.integration
def test_prometheus_config_validation_integration(self, sample_prometheus_config):
"""Тест интеграции валидации конфигурации Prometheus"""
# Проверяем структуру конфигурации
assert 'global' in sample_prometheus_config
assert 'scrape_configs' in sample_prometheus_config
global_config = sample_prometheus_config['global']
assert 'scrape_interval' in global_config
assert 'evaluation_interval' in global_config
scrape_configs = sample_prometheus_config['scrape_configs']
assert len(scrape_configs) > 0
# Проверяем каждый job
for job in scrape_configs:
assert 'job_name' in job
assert 'static_configs' in job
static_configs = job['static_configs']
assert len(static_configs) > 0
for static_config in static_configs:
assert 'targets' in static_config
targets = static_config['targets']
assert len(targets) > 0
@pytest.mark.integration
def test_metrics_data_consistency_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции консистентности данных метрик"""
# Получаем метрики разными способами
system_info = metrics_collector.get_system_info()
metrics_data = metrics_collector.get_metrics_data()
# Проверяем консистентность между system_info и metrics_data
# Реальные метрики могут значительно отличаться из-за времени между вызовами
# и системной нагрузки, поэтому используем более широкие допуски
if 'cpu_percent' in system_info and 'cpu_usage_percent' in metrics_data:
# CPU метрики могут сильно колебаться, используем допуск 50%
# Это связано с тем, что CPU измеряется в разные моменты времени
cpu_diff = abs(system_info['cpu_percent'] - metrics_data['cpu_usage_percent'])
assert cpu_diff < 50.0, f"CPU metrics difference too large: {cpu_diff}% (system: {system_info['cpu_percent']}%, metrics: {metrics_data['cpu_usage_percent']}%)"
if 'ram_percent' in system_info and 'ram_usage_percent' in metrics_data:
# RAM метрики более стабильны, но все же используем допуск 15%
ram_diff = abs(system_info['ram_percent'] - metrics_data['ram_usage_percent'])
assert ram_diff < 15.0, f"RAM metrics difference too large: {ram_diff}% (system: {system_info['ram_percent']}%, metrics: {metrics_data['ram_usage_percent']}%)"
if 'disk_percent' in system_info and 'disk_usage_percent' in metrics_data:
# Disk метрики должны быть очень стабильными, допуск 10%
disk_diff = abs(system_info['disk_percent'] - metrics_data['disk_usage_percent'])
assert disk_diff < 10.0, f"Disk metrics difference too large: {disk_diff}% (system: {system_info['disk_percent']}%, metrics: {metrics_data['disk_usage_percent']}%)"
# Проверяем, что все метрики имеют разумные значения
for metric_name, value in system_info.items():
if isinstance(value, (int, float)):
assert value >= 0, f"Metric {metric_name} should be non-negative: {value}"
for metric_name, value in metrics_data.items():
if isinstance(value, (int, float)):
assert value >= 0, f"Metric {metric_name} should be non-negative: {value}"
@pytest.mark.integration
def test_error_handling_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции обработки ошибок"""
# Тестируем обработку ошибок в PrometheusServer
with patch.object(metrics_collector, 'get_metrics_data', side_effect=Exception("Test error")):
prometheus_server.metrics_collector = metrics_collector
# Создаем мок запрос
request = Mock()
# Обрабатываем запрос метрик
response = asyncio.run(prometheus_server.metrics_handler(request))
# Должен вернуться ответ с ошибкой
assert response.status == 500
assert 'Error: Test error' in response.text
@pytest.mark.integration
def test_performance_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции производительности"""
import time
# Измеряем время получения системной информации
start_time = time.time()
system_info = metrics_collector.get_system_info()
system_info_time = time.time() - start_time
# Измеряем время получения метрик
start_time = time.time()
metrics_data = metrics_collector.get_metrics_data()
metrics_time = time.time() - start_time
# Измеряем время форматирования Prometheus метрик
start_time = time.time()
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
formatting_time = time.time() - start_time
# Проверяем, что операции выполняются в разумное время
assert system_info_time < 5.0, f"System info collection took too long: {system_info_time}s"
assert metrics_time < 3.0, f"Metrics collection took too long: {metrics_time}s"
assert formatting_time < 0.1, f"Metrics formatting took too long: {formatting_time}s"
# Проверяем, что получили данные
assert isinstance(system_info, dict)
assert isinstance(metrics_data, dict)
assert isinstance(prometheus_metrics, str)
assert len(prometheus_metrics) > 0
class TestPrometheusEndToEnd:
"""End-to-end тесты для Prometheus"""
@pytest.mark.integration
@pytest.mark.slow
def test_full_metrics_pipeline(self):
"""Тест полного пайплайна метрик"""
# Создаем все компоненты
metrics_collector = MetricsCollector()
prometheus_server = PrometheusServer()
# 1. Собираем системную информацию
system_info = metrics_collector.get_system_info()
assert isinstance(system_info, dict)
# 2. Получаем метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# 3. Форматируем метрики в Prometheus формат
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
assert isinstance(prometheus_metrics, str)
# 4. Проверяем, что метрики содержат необходимую информацию
lines = prometheus_metrics.split('\n')
# Должна быть системная информация
assert any('system_info' in line for line in lines)
# Должны быть метрики системы
assert any('cpu_usage_percent' in line for line in lines) or any('ram_usage_percent' in line for line in lines)
# 5. Проверяем алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
@pytest.mark.integration
@pytest.mark.slow
def test_metrics_stability(self):
"""Тест стабильности метрик"""
import time
metrics_collector = MetricsCollector()
# Получаем метрики несколько раз подряд
metrics_list = []
for _ in range(3):
metrics = metrics_collector.get_metrics_data()
metrics_list.append(metrics)
time.sleep(0.1) # Небольшая пауза
# Проверяем, что структура метрик не изменилась
for metrics in metrics_list:
assert isinstance(metrics, dict)
assert len(metrics) > 0
# Проверяем, что ключи метрик не изменились
first_keys = set(metrics_list[0].keys())
for metrics in metrics_list[1:]:
current_keys = set(metrics.keys())
# Некоторые метрики могут отсутствовать, но структура должна быть похожей
assert len(current_keys.intersection(first_keys)) > 0
if __name__ == "__main__":
pytest.main([__file__, "-v", "-m", "integration"])