refactor: update Docker setup and remove deprecated monitoring components

- Replace curl with wget in healthcheck commands for better reliability.
- Remove server_monitor service and related configurations from docker-compose.
- Update Dockerfile to use a multi-stage build for optimized image size.
- Delete obsolete Dockerfile.optimized and related monitoring scripts.
- Clean up Makefile by removing commands related to the server_monitor service.
- Update README to reflect changes in monitoring services and commands.
This commit is contained in:
2025-09-16 17:49:42 +03:00
parent 8673cb4f55
commit 30830c5bd9
26 changed files with 43 additions and 4668 deletions

View File

@@ -1,318 +0,0 @@
#!/usr/bin/env python3
"""
Общие фикстуры для тестов инфраструктуры
"""
import pytest
import asyncio
import sys
import os
from unittest.mock import Mock, AsyncMock, patch
from pathlib import Path
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
# Настройка pytest-asyncio
pytest_plugins = ('pytest_asyncio',)
@pytest.fixture(scope="session")
def event_loop():
"""Создает event loop для асинхронных тестов"""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
@pytest.fixture
def mock_metrics_data():
"""Создает мок данных метрик для тестов"""
return {
'cpu_usage_percent': 25.5,
'ram_usage_percent': 60.2,
'disk_usage_percent': 45.8,
'load_average_1m': 1.2,
'load_average_5m': 1.1,
'load_average_15m': 1.0,
'swap_usage_percent': 10.5,
'disk_io_percent': 15.3,
'system_uptime_seconds': 86400.0,
'monitor_uptime_seconds': 3600.0
}
@pytest.fixture
def mock_system_info():
"""Создает мок системной информации для тестов"""
return {
'cpu_percent': 25.5,
'load_avg_1m': 1.2,
'load_avg_5m': 1.1,
'load_avg_15m': 1.0,
'cpu_count': 8,
'io_wait_percent': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'ram_percent': 50.0,
'swap_used': 1.0,
'swap_total': 2.0,
'swap_percent': 50.0,
'disk_used': 100.0,
'disk_total': 500.0,
'disk_percent': 20.0,
'disk_free': 400.0,
'disk_read_speed': '1.0 MB/s',
'disk_write_speed': '512.0 KB/s',
'disk_io_percent': 15,
'system_uptime': '1д 0ч 0м',
'monitor_uptime': '1ч 0м',
'server_hostname': 'test-host',
'current_time': '2025-01-01 12:00:00'
}
@pytest.fixture
def mock_psutil():
"""Создает мок для psutil"""
mock_psutil = Mock()
# Мокаем CPU
mock_psutil.cpu_percent.return_value = 25.5
mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0)
mock_psutil.cpu_count.return_value = 8
# Мокаем память
mock_memory = Mock()
mock_memory.used = 8 * (1024**3) # 8 GB
mock_memory.total = 16 * (1024**3) # 16 GB
mock_psutil.virtual_memory.return_value = mock_memory
mock_swap = Mock()
mock_swap.used = 1 * (1024**3) # 1 GB
mock_swap.total = 2 * (1024**3) # 2 GB
mock_swap.percent = 50.0
mock_psutil.swap_memory.return_value = mock_swap
# Мокаем диск
mock_disk = Mock()
mock_disk.used = 100 * (1024**3) # 100 GB
mock_disk.total = 500 * (1024**3) # 500 GB
mock_disk.free = 400 * (1024**3) # 400 GB
mock_psutil.disk_usage.return_value = mock_disk
# Мокаем disk I/O
mock_disk_io = Mock()
mock_disk_io.read_count = 1000
mock_disk_io.write_count = 500
mock_disk_io.read_bytes = 1024 * (1024**2) # 1 GB
mock_disk_io.write_bytes = 512 * (1024**2) # 512 MB
mock_psutil.disk_io_counters.return_value = mock_disk_io
# Мокаем boot time
import time
mock_psutil.boot_time.return_value = time.time() - 86400 # 1 день назад
return mock_psutil
@pytest.fixture
def mock_platform():
"""Создает мок для platform"""
mock_platform = Mock()
mock_platform.system.return_value = 'Linux'
return mock_platform
@pytest.fixture
def mock_subprocess():
"""Создает мок для subprocess"""
mock_subprocess = Mock()
# Мокаем успешный результат diskutil
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = """
Container Total Space: 500.0 GB
Container Free Space: 400.0 GB
"""
mock_subprocess.run.return_value = mock_result
return mock_subprocess
@pytest.fixture
def mock_os():
"""Создает мок для os"""
mock_os = Mock()
mock_os.getenv.side_effect = lambda key, default=None: {
'THRESHOLD': '80.0',
'RECOVERY_THRESHOLD': '75.0'
}.get(key, default)
# Мокаем uname
mock_uname = Mock()
mock_uname.nodename = "test-host"
mock_os.uname.return_value = mock_uname
return mock_os
@pytest.fixture
def prometheus_config_sample():
"""Создает пример конфигурации Prometheus для тестов"""
return {
'global': {
'scrape_interval': '15s',
'evaluation_interval': '15s'
},
'rule_files': [
'# - "first_rules.yml"',
'# - "second_rules.yml"'
],
'scrape_configs': [
{
'job_name': 'prometheus',
'static_configs': [
{
'targets': ['localhost:9090']
}
]
},
{
'job_name': 'infrastructure',
'static_configs': [
{
'targets': ['host.docker.internal:9091']
}
],
'metrics_path': '/metrics',
'scrape_interval': '30s',
'scrape_timeout': '10s',
'honor_labels': True
},
{
'job_name': 'telegram-helper-bot',
'static_configs': [
{
'targets': ['bots_telegram_bot:8080'],
'labels': {
'bot_name': 'telegram-helper-bot',
'environment': 'production',
'service': 'telegram-bot'
}
}
],
'metrics_path': '/metrics',
'scrape_interval': '15s',
'scrape_timeout': '10s',
'honor_labels': True
}
],
'alerting': {
'alertmanagers': [
{
'static_configs': [
{
'targets': [
'# - alertmanager:9093'
]
}
]
}
]
}
}
@pytest.fixture
def mock_aiohttp():
"""Создает мок для aiohttp"""
mock_aiohttp = Mock()
# Мокаем web.Application
mock_app = Mock()
mock_aiohttp.web.Application.return_value = mock_app
# Мокаем web.Response
mock_response = Mock()
mock_response.status = 200
mock_response.content_type = 'text/plain'
mock_response.text = 'Test response'
mock_aiohttp.web.Response.return_value = mock_response
return mock_aiohttp
@pytest.fixture
def mock_request():
"""Создает мок для HTTP запроса"""
request = Mock()
request.method = 'GET'
request.path = '/metrics'
request.headers = {}
return request
@pytest.fixture
def test_environment():
"""Создает тестовое окружение"""
return {
'os_type': 'ubuntu',
'threshold': 80.0,
'recovery_threshold': 75.0,
'host': '127.0.0.1',
'port': 9091
}
# Маркеры для категоризации тестов
def pytest_configure(config):
"""Настройка маркеров pytest"""
config.addinivalue_line(
"markers", "asyncio: mark test as async"
)
config.addinivalue_line(
"markers", "slow: mark test as slow"
)
config.addinivalue_line(
"markers", "integration: mark test as integration test"
)
config.addinivalue_line(
"markers", "unit: mark test as unit test"
)
config.addinivalue_line(
"markers", "prometheus: mark test as prometheus related"
)
config.addinivalue_line(
"markers", "metrics: mark test as metrics related"
)
# Автоматическая маркировка тестов
def pytest_collection_modifyitems(config, items):
"""Автоматически маркирует тесты по их расположению"""
for item in items:
# Маркируем асинхронные тесты
if "async" in item.name or "Async" in item.name:
item.add_marker(pytest.mark.asyncio)
# Маркируем интеграционные тесты
if "integration" in item.name.lower() or "Integration" in str(item.cls):
item.add_marker(pytest.mark.integration)
# Маркируем unit тесты
if "unit" in item.name.lower() or "Unit" in str(item.cls):
item.add_marker(pytest.mark.unit)
# Маркируем медленные тесты
if "slow" in item.name.lower() or "Slow" in str(item.cls):
item.add_marker(pytest.mark.slow)
# Маркируем тесты Prometheus
if "prometheus" in item.name.lower() or "Prometheus" in str(item.cls):
item.add_marker(pytest.mark.prometheus)
# Маркируем тесты метрик
if "metrics" in item.name.lower() or "Metrics" in str(item.cls):
item.add_marker(pytest.mark.metrics)

View File

@@ -1,230 +0,0 @@
import pytest
import time
from unittest.mock import Mock, patch
import sys
import os
# Добавляем путь к модулю для импорта
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', 'infra', 'monitoring'))
from metrics_collector import MetricsCollector
class TestAlertDelays:
"""Тесты для механизма задержки алертов"""
def setup_method(self):
"""Настройка перед каждым тестом"""
# Мокаем переменные окружения
with patch.dict(os.environ, {
'CPU_ALERT_DELAY': '5', # 5 секунд для быстрого тестирования
'RAM_ALERT_DELAY': '7', # 7 секунд для быстрого тестирования
'DISK_ALERT_DELAY': '10' # 10 секунд для быстрого тестирования
}):
self.collector = MetricsCollector()
def test_alert_delays_initialization(self):
"""Тест инициализации задержек алертов"""
assert self.collector.alert_delays['cpu'] == 5
assert self.collector.alert_delays['ram'] == 7
assert self.collector.alert_delays['disk'] == 10
# Проверяем, что время начала превышения инициализировано как None
assert self.collector.alert_start_times['cpu'] is None
assert self.collector.alert_start_times['ram'] is None
assert self.collector.alert_start_times['disk'] is None
def test_cpu_alert_delay_logic(self):
"""Тест логики задержки алерта CPU"""
# Симулируем превышение порога CPU
system_info = {
'cpu_percent': 85.0, # Выше порога 80%
'ram_percent': 70.0, # Нормально
'disk_percent': 75.0, # Нормально
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 25.0
}
# Первая проверка - должно начать отсчет задержки
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0 # Алерт еще не отправлен
assert self.collector.alert_start_times['cpu'] is not None # Время начала установлено
# Проверяем, что состояние алерта не изменилось
assert not self.collector.alert_states['cpu']
# Симулируем время, прошедшее с начала превышения
# Устанавливаем время начала в прошлое (больше задержки)
self.collector.alert_start_times['cpu'] = time.time() - 6 # 6 секунд назад
# Теперь алерт должен сработать
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 1 # Алерт отправлен
assert alerts[0][0] == 'cpu' # Тип алерта
assert alerts[0][1] == 85.0 # Значение CPU
assert self.collector.alert_states['cpu'] # Состояние алерта установлено
def test_alert_reset_on_recovery(self):
"""Тест сброса алерта при восстановлении"""
# Сначала превышаем порог и ждем задержку
system_info_high = {
'cpu_percent': 85.0,
'ram_percent': 70.0,
'disk_percent': 75.0,
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 25.0
}
# Устанавливаем время начала превышения в прошлое
self.collector.alert_start_times['cpu'] = time.time() - 6
# Проверяем - алерт должен сработать
alerts, recoveries = self.collector.check_alerts(system_info_high)
assert len(alerts) == 1 # Алерт отправлен
assert self.collector.alert_states['cpu'] # Состояние установлено
# Теперь симулируем восстановление
system_info_low = {
'cpu_percent': 70.0, # Ниже порога восстановления 75%
'ram_percent': 70.0,
'disk_percent': 75.0,
'load_avg_1m': 1.2,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 25.0
}
alerts, recoveries = self.collector.check_alerts(system_info_low)
assert len(recoveries) == 1 # Сообщение о восстановлении
assert recoveries[0][0] == 'cpu' # Тип восстановления
assert not self.collector.alert_states['cpu'] # Состояние сброшено
assert self.collector.alert_start_times['cpu'] is None # Время сброшено
def test_multiple_metrics_alert(self):
"""Тест алертов по нескольким метрикам одновременно"""
system_info = {
'cpu_percent': 85.0, # Выше порога
'ram_percent': 85.0, # Выше порога
'disk_percent': 75.0, # Нормально
'load_avg_1m': 2.5,
'ram_used': 13.0,
'ram_total': 16.0,
'disk_free': 25.0
}
# Устанавливаем время начала превышения для CPU и RAM в прошлое
self.collector.alert_start_times['cpu'] = time.time() - 6 # Больше CPU_ALERT_DELAY (5 сек)
self.collector.alert_start_times['ram'] = time.time() - 8 # Больше RAM_ALERT_DELAY (7 сек)
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 2 # Два алерта: CPU и RAM
# Проверяем типы алертов
alert_types = [alert[0] for alert in alerts]
assert 'cpu' in alert_types
assert 'ram' in alert_types
# Проверяем состояния
assert self.collector.alert_states['cpu']
assert self.collector.alert_states['ram']
assert not self.collector.alert_states['disk']
def test_alert_delay_customization(self):
"""Тест настройки пользовательских задержек"""
# Тестируем с другими значениями задержек
with patch.dict(os.environ, {
'CPU_ALERT_DELAY': '2',
'RAM_ALERT_DELAY': '3',
'DISK_ALERT_DELAY': '4'
}):
collector = MetricsCollector()
assert collector.alert_delays['cpu'] == 2
assert collector.alert_delays['ram'] == 3
assert collector.alert_delays['disk'] == 4
def test_no_false_alerts(self):
"""Тест отсутствия ложных алертов при кратковременных пиках"""
system_info = {
'cpu_percent': 85.0,
'ram_percent': 70.0,
'disk_percent': 75.0,
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 25.0
}
# Проверяем сразу после превышения порога
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0 # Алерт не должен сработать сразу
# Проверяем, что время начала установлено
assert self.collector.alert_start_times['cpu'] is not None
# Проверяем через короткое время (до истечения задержки)
# Устанавливаем время начала в прошлое, но меньше задержки
self.collector.alert_start_times['cpu'] = time.time() - 2 # 2 секунды назад
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0 # Алерт все еще не должен сработать
def test_alert_state_persistence(self):
"""Тест сохранения состояния алерта между проверками"""
system_info = {
'cpu_percent': 85.0,
'ram_percent': 70.0,
'disk_percent': 75.0,
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 25.0
}
# Первая проверка - начинаем отсчет
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0
initial_time = self.collector.alert_start_times['cpu']
assert initial_time is not None
# Проверяем еще раз - время начала должно сохраниться
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0
assert self.collector.alert_start_times['cpu'] == initial_time # Время не изменилось
def test_disk_alert_delay(self):
"""Тест задержки алерта для диска"""
system_info = {
'cpu_percent': 70.0,
'ram_percent': 70.0,
'disk_percent': 85.0, # Выше порога
'load_avg_1m': 1.2,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 15.0
}
# Первая проверка
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0
assert self.collector.alert_start_times['disk'] is not None
# Устанавливаем время начала превышения в прошлое, но меньше задержки
self.collector.alert_start_times['disk'] = time.time() - 5 # 5 секунд назад (меньше DISK_ALERT_DELAY)
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 0 # Алерт не должен сработать
# Устанавливаем время начала превышения в прошлое, больше задержки
self.collector.alert_start_times['disk'] = time.time() - 11 # 11 секунд назад (больше DISK_ALERT_DELAY)
alerts, recoveries = self.collector.check_alerts(system_info)
assert len(alerts) == 1 # Алерт должен сработать
assert alerts[0][0] == 'disk'
if __name__ == '__main__':
pytest.main([__file__])

View File

@@ -1,102 +0,0 @@
#!/usr/bin/env python3
"""
Тесты для инфраструктуры мониторинга
"""
import pytest
import sys
import os
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
def test_imports():
"""Тест импорта основных модулей"""
try:
from metrics_collector import MetricsCollector
from message_sender import MessageSender
from prometheus_server import PrometheusServer
from server_monitor import ServerMonitor
assert True
except ImportError as e:
pytest.fail(f"Failed to import modules: {e}")
def test_metrics_collector_creation():
"""Тест создания MetricsCollector"""
try:
from metrics_collector import MetricsCollector
collector = MetricsCollector()
assert collector is not None
assert hasattr(collector, 'get_system_info')
assert hasattr(collector, 'get_metrics_data')
except Exception as e:
pytest.fail(f"Failed to create MetricsCollector: {e}")
def test_message_sender_creation():
"""Тест создания MessageSender"""
try:
from message_sender import MessageSender
sender = MessageSender()
assert sender is not None
except Exception as e:
pytest.fail(f"Failed to create MessageSender: {e}")
def test_prometheus_server_creation():
"""Тест создания PrometheusServer"""
try:
from prometheus_server import PrometheusServer
server = PrometheusServer()
assert server is not None
assert hasattr(server, 'host')
assert hasattr(server, 'port')
except Exception as e:
pytest.fail(f"Failed to create PrometheusServer: {e}")
def test_server_monitor_creation():
"""Тест создания ServerMonitor"""
try:
from server_monitor import ServerMonitor
monitor = ServerMonitor()
assert monitor is not None
assert hasattr(monitor, 'metrics_collector')
assert hasattr(monitor, 'message_sender')
assert hasattr(monitor, 'prometheus_server')
except Exception as e:
pytest.fail(f"Failed to create ServerMonitor: {e}")
def test_system_info_structure():
"""Тест структуры системной информации"""
try:
from metrics_collector import MetricsCollector
collector = MetricsCollector()
system_info = collector.get_system_info()
# Проверяем, что system_info это словарь
assert isinstance(system_info, dict)
# Проверяем наличие основных ключей
expected_keys = ['cpu_percent', 'ram_percent', 'disk_percent', 'server_hostname']
for key in expected_keys:
assert key in system_info, f"Missing key: {key}"
except Exception as e:
pytest.fail(f"Failed to get system info: {e}")
def test_metrics_data_structure():
"""Тест структуры метрик"""
try:
from metrics_collector import MetricsCollector
collector = MetricsCollector()
metrics = collector.get_metrics_data()
# Проверяем, что metrics это словарь
assert isinstance(metrics, dict)
# Проверяем, что есть хотя бы одна метрика
assert len(metrics) > 0, "Metrics should not be empty"
except Exception as e:
pytest.fail(f"Failed to get metrics data: {e}")
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,92 +0,0 @@
#!/usr/bin/env python3
"""
Тесты для MessageSender
"""
import pytest
import sys
import os
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from infra.monitoring.message_sender import MessageSender
class TestMessageSender:
"""Тесты для класса MessageSender"""
@pytest.fixture
def message_sender(self):
"""Создает экземпляр MessageSender для тестов"""
return MessageSender()
def test_get_cpu_emoji(self, message_sender):
"""Тест получения эмодзи для CPU"""
# Тест зеленого уровня (нормальная нагрузка)
assert message_sender._get_cpu_emoji(25.0) == "🟢"
assert message_sender._get_cpu_emoji(49.9) == "🟢"
# Тест желтого уровня (средняя нагрузка)
assert message_sender._get_cpu_emoji(50.0) == "⚠️"
assert message_sender._get_cpu_emoji(79.9) == "⚠️"
# Тест красного уровня (высокая нагрузка)
assert message_sender._get_cpu_emoji(80.0) == "🚨"
assert message_sender._get_cpu_emoji(95.0) == "🚨"
def test_get_memory_emoji(self, message_sender):
"""Тест получения эмодзи для памяти"""
# Тест зеленого уровня (нормальное использование)
assert message_sender._get_memory_emoji(30.0) == "🟢"
assert message_sender._get_memory_emoji(59.9) == "🟢"
# Тест желтого уровня (среднее использование)
assert message_sender._get_memory_emoji(60.0) == "⚠️"
assert message_sender._get_memory_emoji(84.9) == "⚠️"
# Тест красного уровня (высокое использование)
assert message_sender._get_memory_emoji(85.0) == "🚨"
assert message_sender._get_memory_emoji(95.0) == "🚨"
def test_get_load_average_emoji(self, message_sender):
"""Тест получения эмодзи для Load Average"""
# Тест зеленого уровня (нормальная нагрузка)
assert message_sender._get_load_average_emoji(4.0, 8) == "🟢" # 0.5 на ядро
assert message_sender._get_load_average_emoji(7.9, 8) == "🟢" # 0.9875 на ядро
# Тест желтого уровня (средняя нагрузка)
assert message_sender._get_load_average_emoji(8.0, 8) == "⚠️" # 1.0 на ядро
assert message_sender._get_load_average_emoji(15.9, 8) == "⚠️" # 1.9875 на ядро
# Тест красного уровня (высокая нагрузка)
assert message_sender._get_load_average_emoji(16.0, 8) == "🚨" # 2.0 на ядро
assert message_sender._get_load_average_emoji(24.0, 8) == "🚨" # 3.0 на ядро
def test_get_io_wait_emoji(self, message_sender):
"""Тест получения эмодзи для IO Wait"""
# Тест зеленого уровня (нормальный IO Wait)
assert message_sender._get_io_wait_emoji(2.0) == "🟢"
assert message_sender._get_io_wait_emoji(4.9) == "🟢"
# Тест желтого уровня (средний IO Wait)
assert message_sender._get_io_wait_emoji(5.0) == "⚠️"
assert message_sender._get_io_wait_emoji(19.9) == "⚠️"
# Тест красного уровня (высокий IO Wait)
assert message_sender._get_io_wait_emoji(20.0) == "🚨"
assert message_sender._get_io_wait_emoji(35.0) == "🚨"
def test_get_disk_space_emoji(self, message_sender):
"""Тест получения эмодзи для дискового пространства"""
# Тест зеленого уровня (нормальное использование)
assert message_sender._get_disk_space_emoji(30.0) == "🟢"
assert message_sender._get_disk_space_emoji(59.9) == "🟢"
# Тест желтого уровня (среднее использование)
assert message_sender._get_disk_space_emoji(60.0) == "⚠️"
assert message_sender._get_disk_space_emoji(89.9) == "⚠️"
# Тест красного уровня (высокое использование)
assert message_sender._get_disk_space_emoji(90.0) == "🚨"
assert message_sender._get_disk_space_emoji(95.0) == "🚨"

View File

@@ -1,464 +0,0 @@
#!/usr/bin/env python3
"""
Тесты для MetricsCollector
"""
import pytest
import sys
import os
import time
import platform
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from infra.monitoring.metrics_collector import MetricsCollector
class TestMetricsCollector:
"""Тесты для класса MetricsCollector"""
@pytest.fixture
def metrics_collector(self):
"""Создает экземпляр MetricsCollector для тестов"""
return MetricsCollector()
@pytest.fixture
def mock_psutil(self):
"""Мок для psutil"""
mock_psutil = Mock()
# Мокаем CPU
mock_psutil.cpu_percent.return_value = 25.5
mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0)
mock_psutil.cpu_count.return_value = 8
# Мокаем память
mock_memory = Mock()
mock_memory.used = 8 * (1024**3) # 8 GB
mock_memory.total = 16 * (1024**3) # 16 GB
mock_psutil.virtual_memory.return_value = mock_memory
mock_swap = Mock()
mock_swap.used = 1 * (1024**3) # 1 GB
mock_swap.total = 2 * (1024**3) # 2 GB
mock_swap.percent = 50.0
mock_psutil.swap_memory.return_value = mock_swap
# Мокаем диск
mock_disk = Mock()
mock_disk.used = 100 * (1024**3) # 100 GB
mock_disk.total = 500 * (1024**3) # 500 GB
mock_disk.free = 400 * (1024**3) # 400 GB
mock_psutil.disk_usage.return_value = mock_disk
# Мокаем disk I/O
mock_disk_io = Mock()
mock_disk_io.read_count = 1000
mock_disk_io.write_count = 500
mock_disk_io.read_bytes = 1024 * (1024**2) # 1 GB
mock_disk_io.write_bytes = 512 * (1024**2) # 512 MB
mock_psutil.disk_io_counters.return_value = mock_disk_io
# Мокаем boot time
mock_psutil.boot_time.return_value = time.time() - 86400 # 1 день назад
return mock_psutil
def test_init(self, metrics_collector):
"""Тест инициализации MetricsCollector"""
assert metrics_collector.threshold == 80.0
assert metrics_collector.recovery_threshold == 75.0
assert isinstance(metrics_collector.alert_states, dict)
assert 'cpu' in metrics_collector.alert_states
assert 'ram' in metrics_collector.alert_states
assert 'disk' in metrics_collector.alert_states
assert metrics_collector.monitor_start_time > 0
def test_detect_os_macos(self):
"""Тест определения macOS"""
with patch('platform.system', return_value='Darwin'):
collector = MetricsCollector()
assert collector.os_type == "macos"
def test_detect_os_linux(self):
"""Тест определения Linux"""
with patch('platform.system', return_value='Linux'):
collector = MetricsCollector()
assert collector.os_type == "ubuntu"
def test_detect_os_unknown(self):
"""Тест определения неизвестной ОС"""
with patch('platform.system', return_value='Windows'):
collector = MetricsCollector()
assert collector.os_type == "unknown"
def test_get_disk_path(self, metrics_collector):
"""Тест получения пути к диску"""
# Для всех ОС должен возвращаться "/"
assert metrics_collector._get_disk_path() == "/"
@patch('subprocess.run')
def test_get_macos_disk_usage_success(self, mock_subprocess, metrics_collector):
"""Тест получения информации о диске macOS через diskutil"""
# Настраиваем мок для macOS
metrics_collector.os_type = "macos"
# Мокаем успешный вывод diskutil
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = """
Container Total Space: 500.0 GB
Container Free Space: 400.0 GB
"""
mock_subprocess.return_value = mock_result
disk_info = metrics_collector._get_macos_disk_usage()
assert disk_info is not None
assert disk_info.total == 500.0 * (1024**3) # В байтах
assert disk_info.free == 400.0 * (1024**3)
assert disk_info.used == 100.0 * (1024**3)
@patch('subprocess.run')
def test_get_macos_disk_usage_fallback(self, mock_subprocess, metrics_collector):
"""Тест fallback к psutil при ошибке diskutil"""
metrics_collector.os_type = "macos"
# Мокаем неуспешный вывод diskutil
mock_result = Mock()
mock_result.returncode = 1
mock_subprocess.return_value = mock_result
with patch('metrics_collector.psutil.disk_usage') as mock_psutil_disk:
mock_disk = Mock()
mock_disk.used = 100 * (1024**3)
mock_disk.total = 500 * (1024**3)
mock_disk.free = 400 * (1024**3)
mock_psutil_disk.return_value = mock_disk
disk_info = metrics_collector._get_macos_disk_usage()
assert disk_info == mock_disk
def test_get_system_uptime(self, metrics_collector):
"""Тест получения uptime системы"""
with patch('metrics_collector.psutil.boot_time') as mock_boot_time:
mock_boot_time.return_value = time.time() - 3600 # 1 час назад
uptime = metrics_collector._get_system_uptime()
assert uptime > 0
assert uptime <= 3600.1 # Не больше часа (с небольшим допуском)
def test_get_monitor_uptime(self, metrics_collector):
"""Тест получения uptime мониторинга"""
# Ждем немного, чтобы uptime изменился
time.sleep(0.1)
uptime = metrics_collector.get_monitor_uptime()
assert isinstance(uptime, str)
assert 'м' in uptime or 'ч' in uptime or 'д' in uptime
def test_get_system_info_success(self, metrics_collector):
"""Тест получения системной информации"""
# Мокаем все необходимые функции psutil
with patch('metrics_collector.psutil.cpu_percent', return_value=25.5) as mock_cpu, \
patch('metrics_collector.psutil.getloadavg', return_value=(1.2, 1.1, 1.0)) as mock_load, \
patch('metrics_collector.psutil.cpu_count', return_value=8) as mock_cpu_count, \
patch('metrics_collector.psutil.cpu_times_percent') as mock_cpu_times, \
patch('metrics_collector.psutil.virtual_memory') as mock_virtual_memory, \
patch('metrics_collector.psutil.swap_memory') as mock_swap_memory, \
patch('metrics_collector.psutil.disk_usage') as mock_disk_usage, \
patch('metrics_collector.psutil.disk_io_counters') as mock_disk_io, \
patch('metrics_collector.psutil.boot_time', return_value=time.time() - 86400) as mock_boot_time, \
patch('os.uname') as mock_uname:
# Настраиваем моки для CPU
mock_cpu_times_obj = Mock()
mock_cpu_times_obj.iowait = 2.5
mock_cpu_times.return_value = mock_cpu_times_obj
# Настраиваем моки для памяти
mock_memory = Mock()
mock_memory.used = 8 * (1024**3)
mock_memory.total = 16 * (1024**3)
mock_virtual_memory.return_value = mock_memory
# Настраиваем моки для swap
mock_swap = Mock()
mock_swap.used = 1 * (1024**3)
mock_swap.total = 2 * (1024**3)
mock_swap.percent = 50.0
mock_swap_memory.return_value = mock_swap
# Настраиваем моки для диска
mock_disk = Mock()
mock_disk.used = 100 * (1024**3)
mock_disk.total = 500 * (1024**3)
mock_disk.free = 400 * (1024**3)
mock_disk_usage.return_value = mock_disk
# Настраиваем моки для disk I/O
mock_disk_io_obj = Mock()
mock_disk_io_obj.read_count = 1000
mock_disk_io_obj.write_count = 500
mock_disk_io_obj.read_bytes = 1024 * (1024**2)
mock_disk_io_obj.write_bytes = 512 * (1024**2)
mock_disk_io.return_value = mock_disk_io_obj
# Настраиваем мок для hostname
mock_uname.return_value.nodename = "test-host"
# Мокаем _get_disk_usage чтобы возвращал наш мок
with patch.object(metrics_collector, '_get_disk_usage', return_value=mock_disk):
system_info = metrics_collector.get_system_info()
assert isinstance(system_info, dict)
assert 'cpu_percent' in system_info
assert 'ram_percent' in system_info
assert 'disk_percent' in system_info
assert 'io_wait_percent' in system_info
assert 'server_hostname' in system_info
# Проверяем расчеты
assert system_info['cpu_percent'] == 25.5
assert system_info['ram_percent'] == 50.0 # 8/16 * 100
assert system_info['disk_percent'] == 20.0 # 100/500 * 100
assert system_info['io_wait_percent'] == 2.5
assert system_info['server_hostname'] == "test-host"
def test_get_system_info_error(self, metrics_collector):
"""Тест получения системной информации при ошибке"""
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
system_info = metrics_collector.get_system_info()
assert system_info == {}
def test_format_bytes(self, metrics_collector):
"""Тест форматирования байтов"""
assert metrics_collector._format_bytes(0) == "0 B"
assert metrics_collector._format_bytes(1024) == "1.0 KB"
assert metrics_collector._format_bytes(1024**2) == "1.0 MB"
assert metrics_collector._format_bytes(1024**3) == "1.0 GB"
assert metrics_collector._format_bytes(1024**4) == "1.0 TB"
def test_format_uptime(self, metrics_collector):
"""Тест форматирования uptime"""
assert metrics_collector._format_uptime(60) == ""
assert metrics_collector._format_uptime(3600) == "1ч 0м"
assert metrics_collector._format_uptime(86400) == "1д 0ч 0м"
assert metrics_collector._format_uptime(90000) == "1д 1ч 0м"
def test_check_process_status_pid_file(self, metrics_collector, tmp_path):
"""Тест проверки статуса процесса по PID файлу"""
# Создаем временный PID файл
pid_file = tmp_path / "test_bot.pid"
pid_file.write_text("12345")
# Временно заменяем путь к PID файлу
original_pid_files = metrics_collector.pid_files.copy()
metrics_collector.pid_files['test_bot'] = str(pid_file)
with patch('infra.monitoring.metrics_collector.psutil.pid_exists', return_value=True), \
patch('infra.monitoring.metrics_collector.psutil.Process') as mock_process:
mock_proc = Mock()
mock_proc.create_time.return_value = time.time() - 3600
mock_process.return_value = mock_proc
status, uptime = metrics_collector.check_process_status('test_bot')
assert status == ""
assert "Uptime" in uptime
# Восстанавливаем оригинальные PID файлы
metrics_collector.pid_files = original_pid_files
def test_check_process_status_not_running(self, metrics_collector):
"""Тест проверки статуса неработающего процесса"""
with patch('metrics_collector.psutil.process_iter', return_value=[]):
status, message = metrics_collector.check_process_status('nonexistent_bot')
assert status == ""
assert message == "Выключен"
def test_calculate_disk_speed(self, metrics_collector):
"""Тест расчета скорости диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Создаем текущую статистику диска
current_disk_io = Mock()
current_disk_io.read_bytes = 2048 * (1024**2) # 2 GB
current_disk_io.write_bytes = 1024 * (1024**2) # 1 GB
# Ждем немного для расчета скорости
time.sleep(0.1)
read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io)
assert isinstance(read_speed, str)
assert isinstance(write_speed, str)
assert "/s" in read_speed
assert "/s" in write_speed
def test_calculate_disk_io_percent(self, metrics_collector):
"""Тест расчета процента загрузки диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Создаем текущую статистику диска
current_disk_io = Mock()
current_disk_io.read_count = 2000
current_disk_io.write_count = 1000
current_disk_io.read_bytes = 2048 * (1024**2)
current_disk_io.write_bytes = 1024 * (1024**2)
# Ждем немного для расчета
time.sleep(0.1)
io_percent = metrics_collector._calculate_disk_io_percent()
assert isinstance(io_percent, int)
assert 0 <= io_percent <= 100
def test_get_metrics_data(self, metrics_collector):
"""Тест получения данных для метрик Prometheus"""
with patch.object(metrics_collector, 'get_system_info') as mock_get_system_info:
mock_get_system_info.return_value = {
'cpu_percent': 25.5,
'ram_percent': 60.2,
'disk_percent': 45.8,
'load_avg_1m': 1.2,
'load_avg_5m': 1.1,
'load_avg_15m': 1.0,
'swap_percent': 10.5
}
with patch.object(metrics_collector, '_get_system_uptime', return_value=86400.0):
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
assert 'cpu_usage_percent' in metrics_data
assert 'ram_usage_percent' in metrics_data
assert 'disk_usage_percent' in metrics_data
assert 'load_average_1m' in metrics_data
assert 'system_uptime_seconds' in metrics_data
assert 'monitor_uptime_seconds' in metrics_data
def test_check_alerts(self, metrics_collector):
"""Тест проверки алертов"""
# Сбрасываем состояния алертов для чистого теста
metrics_collector.alert_states = {'cpu': False, 'ram': False, 'disk': False}
metrics_collector.alert_start_times = {'cpu': None, 'ram': None, 'disk': None}
# Устанавливаем минимальные задержки для тестов
metrics_collector.alert_delays = {'cpu': 0, 'ram': 0, 'disk': 0}
# Тестируем превышение порога CPU
system_info = {
'cpu_percent': 85.0, # Выше порога 80.0
'ram_percent': 60.0, # Ниже порога
'disk_percent': 70.0, # Ниже порога
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 300.0
}
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert len(alerts) == 1
assert alerts[0][0] == 'cpu' # Тип алерта
assert alerts[0][1] == 85.0 # Значение
assert len(recoveries) == 0
# Проверяем, что состояние алерта изменилось
assert metrics_collector.alert_states['cpu'] is True
# Тестируем восстановление
system_info['cpu_percent'] = 70.0 # Ниже recovery threshold 75.0
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert len(alerts) == 0
assert len(recoveries) == 1
assert recoveries[0][0] == 'cpu'
assert metrics_collector.alert_states['cpu'] is False
def test_environment_variables(self):
"""Тест работы с переменными окружения"""
with patch.dict(os.environ, {'THRESHOLD': '90.0', 'RECOVERY_THRESHOLD': '85.0'}):
collector = MetricsCollector()
assert collector.threshold == 90.0
assert collector.recovery_threshold == 85.0
def test_metrics_collector_integration(self, metrics_collector):
"""Интеграционный тест MetricsCollector"""
# Проверяем, что можем получить системную информацию
system_info = metrics_collector.get_system_info()
# Даже если некоторые метрики недоступны, должны получить словарь
assert isinstance(system_info, dict)
# Проверяем, что можем получить метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# Проверяем, что можем проверить алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
class TestMetricsCollectorEdgeCases:
"""Тесты граничных случаев для MetricsCollector"""
def test_empty_system_info(self):
"""Тест работы с пустой системной информацией"""
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
collector = MetricsCollector()
system_info = collector.get_system_info()
assert system_info == {}
def test_missing_disk_info(self):
"""Тест работы при отсутствии информации о диске"""
collector = MetricsCollector()
with patch.object(collector, '_get_disk_usage', return_value=None), \
patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
system_info = collector.get_system_info()
assert system_info == {}
def test_disk_io_calculation_without_previous_data(self):
"""Тест расчета I/O диска без предыдущих данных"""
collector = MetricsCollector()
# Сбрасываем предыдущие данные
collector.last_disk_io = None
collector.last_disk_io_time = None
current_disk_io = Mock()
current_disk_io.read_bytes = 1024
current_disk_io.write_bytes = 512
read_speed, write_speed = collector._calculate_disk_speed(current_disk_io)
assert read_speed == "0 B/s"
assert write_speed == "0 B/s"
def test_uptime_calculation_edge_cases(self):
"""Тест расчета uptime для граничных случаев"""
collector = MetricsCollector()
# Тест для очень малого времени
assert collector._format_uptime(0) == ""
assert collector._format_uptime(30) == ""
# Тест для очень большого времени
large_uptime = 365 * 24 * 3600 # 1 год
uptime_str = collector._format_uptime(large_uptime)
assert "д" in uptime_str
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -9,8 +9,6 @@ import sys
import os
from pathlib import Path
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
class TestPrometheusConfig:
@@ -80,38 +78,6 @@ class TestPrometheusConfig:
targets = static_configs[0].get('targets', [])
assert 'localhost:9090' in targets, "Prometheus should scrape localhost:9090"
def test_infrastructure_job(self, prometheus_config):
"""Тест job для инфраструктуры"""
scrape_configs = prometheus_config['scrape_configs']
# Ищем job для infrastructure
infra_job = None
for job in scrape_configs:
if job.get('job_name') == 'infrastructure':
infra_job = job
break
assert infra_job is not None, "Should have infrastructure job"
# Проверяем основные параметры
assert 'static_configs' in infra_job, "Infrastructure job should have static_configs"
assert 'metrics_path' in infra_job, "Infrastructure job should have metrics_path"
assert 'scrape_interval' in infra_job, "Infrastructure job should have scrape_interval"
assert 'scrape_timeout' in infra_job, "Infrastructure job should have scrape_timeout"
assert 'honor_labels' in infra_job, "Infrastructure job should have honor_labels"
# Проверяем значения
assert infra_job['metrics_path'] == '/metrics', "Metrics path should be /metrics"
assert infra_job['scrape_interval'] == '30s', "Scrape interval should be 30s"
assert infra_job['scrape_timeout'] == '10s', "Scrape timeout should be 10s"
assert infra_job['honor_labels'] is True, "honor_labels should be True"
# Проверяем targets
static_configs = infra_job['static_configs']
assert len(static_configs) > 0, "Should have at least one static config"
targets = static_configs[0].get('targets', [])
assert 'bots_server_monitor:9091' in targets, "Should scrape bots_server_monitor:9091"
def test_telegram_bot_job(self, prometheus_config):
"""Тест job для telegram-helper-bot"""

View File

@@ -1,437 +0,0 @@
#!/usr/bin/env python3
"""
Интеграционные тесты для Prometheus и связанных компонентов
"""
import pytest
import pytest_asyncio
import asyncio
import sys
import os
import tempfile
import yaml
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from pathlib import Path
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from prometheus_server import PrometheusServer
from metrics_collector import MetricsCollector
class TestPrometheusIntegration:
"""Интеграционные тесты для Prometheus"""
@pytest_asyncio.fixture
async def prometheus_server(self):
"""Создает экземпляр PrometheusServer для интеграционных тестов"""
server = PrometheusServer(host='127.0.0.1', port=0)
return server
@pytest.fixture
def metrics_collector(self):
"""Создает экземпляр MetricsCollector для интеграционных тестов"""
return MetricsCollector()
@pytest.fixture
def sample_prometheus_config(self):
"""Создает пример конфигурации Prometheus для тестов"""
return {
'global': {
'scrape_interval': '15s',
'evaluation_interval': '15s'
},
'scrape_configs': [
{
'job_name': 'test-infrastructure',
'static_configs': [
{
'targets': ['127.0.0.1:9091'],
'labels': {
'environment': 'test',
'service': 'test-monitoring'
}
}
],
'metrics_path': '/metrics',
'scrape_interval': '30s',
'scrape_timeout': '10s',
'honor_labels': True
}
]
}
@pytest.mark.integration
@pytest.mark.asyncio
async def test_prometheus_server_with_real_metrics_collector(self, prometheus_server):
"""Тест интеграции PrometheusServer с реальным MetricsCollector"""
# Получаем реальные метрики
metrics_data = prometheus_server.metrics_collector.get_metrics_data()
# Проверяем, что можем получить метрики
assert isinstance(metrics_data, dict)
# Форматируем метрики в Prometheus формат
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
# Проверяем, что метрики содержат системную информацию
assert '# HELP system_info System information' in prometheus_metrics
assert '# TYPE system_info gauge' in prometheus_metrics
# Проверяем, что есть хотя бы одна метрика
lines = prometheus_metrics.split('\n')
assert len(lines) >= 3 # system_info help, type, value
@pytest.mark.integration
def test_metrics_collector_system_integration(self, metrics_collector):
"""Тест интеграции MetricsCollector с системой"""
# Получаем системную информацию
system_info = metrics_collector.get_system_info()
# Проверяем, что получили словарь
assert isinstance(system_info, dict)
# Проверяем, что можем получить метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# Проверяем, что можем проверить алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
@pytest.mark.integration
def test_prometheus_metrics_format_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции форматирования метрик Prometheus"""
# Получаем реальные метрики
metrics_data = metrics_collector.get_metrics_data()
# Форматируем в Prometheus формат
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
# Проверяем структуру метрик
lines = prometheus_metrics.split('\n')
# Должна быть системная информация
system_info_lines = [line for line in lines if 'system_info' in line]
assert len(system_info_lines) >= 3 # help, type, value
# Проверяем, что метрики содержат правильные типы
type_lines = [line for line in lines if '# TYPE' in line]
assert len(type_lines) > 0
# Проверяем, что все метрики имеют правильный формат
metric_lines = [line for line in lines if line and not line.startswith('#')]
for line in metric_lines:
# Проверяем, что строка метрики содержит имя и значение
assert ' ' in line
parts = line.split(' ')
assert len(parts) >= 2
@pytest.mark.integration
def test_os_detection_integration(self):
"""Тест интеграции определения ОС"""
# Создаем коллектор с реальным определением ОС
collector = MetricsCollector()
# Проверяем, что ОС определена
assert collector.os_type in ["macos", "ubuntu", "unknown"]
# Проверяем, что можем получить информацию о диске
disk_info = collector._get_disk_usage()
if disk_info is not None:
assert hasattr(disk_info, 'total')
assert hasattr(disk_info, 'used')
assert hasattr(disk_info, 'free')
@pytest.mark.integration
def test_disk_io_calculation_integration(self, metrics_collector):
"""Тест интеграции расчета I/O диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Получаем текущую статистику диска
current_disk_io = metrics_collector._get_disk_io_counters()
if current_disk_io is not None:
# Рассчитываем скорость
read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io)
# Проверяем, что получили строки с единицами измерения
assert isinstance(read_speed, str)
assert isinstance(write_speed, str)
assert "/s" in read_speed
assert "/s" in write_speed
# Рассчитываем процент загрузки
io_percent = metrics_collector._calculate_disk_io_percent()
assert isinstance(io_percent, int)
assert 0 <= io_percent <= 100
@pytest.mark.integration
def test_process_monitoring_integration(self, metrics_collector):
"""Тест интеграции мониторинга процессов"""
# Проверяем статус процессов
for process_name in ['helper_bot']:
status, message = metrics_collector.check_process_status(process_name)
# Статус должен быть либо ✅, либо ❌
assert status in ["", ""]
# Сообщение должно быть строкой
assert isinstance(message, str)
@pytest.mark.integration
def test_alert_system_integration(self, metrics_collector):
"""Тест интеграции системы алертов"""
# Сбрасываем состояния алертов для чистого теста
metrics_collector.alert_states = {'cpu': False, 'ram': False, 'disk': False}
metrics_collector.alert_start_times = {'cpu': None, 'ram': None, 'disk': None}
# Устанавливаем минимальные задержки для тестов
metrics_collector.alert_delays = {'cpu': 0, 'ram': 0, 'disk': 0}
# Создаем тестовые данные
test_system_info = {
'cpu_percent': 85.0, # Выше порога
'ram_percent': 60.0, # Ниже порога
'disk_percent': 70.0, # Ниже порога
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 300.0
}
# Проверяем алерты
alerts, recoveries = metrics_collector.check_alerts(test_system_info)
# Должен быть хотя бы один алерт для CPU
assert len(alerts) >= 1
assert any(alert[0] == 'cpu' for alert in alerts)
# Проверяем, что состояние алерта изменилось
assert metrics_collector.alert_states['cpu'] is True
# Тестируем восстановление
test_system_info['cpu_percent'] = 70.0 # Ниже recovery threshold
alerts, recoveries = metrics_collector.check_alerts(test_system_info)
# Должно быть восстановление
assert len(recoveries) >= 1
assert any(recovery[0] == 'cpu' for recovery in recoveries)
assert metrics_collector.alert_states['cpu'] is False
@pytest.mark.integration
def test_uptime_calculation_integration(self, metrics_collector):
"""Тест интеграции расчета uptime"""
# Получаем uptime системы
system_uptime = metrics_collector._get_system_uptime()
assert system_uptime > 0
# Получаем uptime мониторинга
monitor_uptime = metrics_collector.get_monitor_uptime()
assert isinstance(monitor_uptime, str)
assert len(monitor_uptime) > 0
# Форматируем uptime
formatted_uptime = metrics_collector._format_uptime(system_uptime)
assert isinstance(formatted_uptime, str)
assert len(formatted_uptime) > 0
@pytest.mark.integration
def test_environment_variables_integration(self):
"""Тест интеграции с переменными окружения"""
# Тестируем с пользовательскими значениями
test_threshold = '90.0'
test_recovery_threshold = '85.0'
with patch.dict(os.environ, {
'THRESHOLD': test_threshold,
'RECOVERY_THRESHOLD': test_recovery_threshold
}):
collector = MetricsCollector()
# Проверяем, что значения установлены
assert collector.threshold == float(test_threshold)
assert collector.recovery_threshold == float(test_recovery_threshold)
@pytest.mark.integration
def test_prometheus_config_validation_integration(self, sample_prometheus_config):
"""Тест интеграции валидации конфигурации Prometheus"""
# Проверяем структуру конфигурации
assert 'global' in sample_prometheus_config
assert 'scrape_configs' in sample_prometheus_config
global_config = sample_prometheus_config['global']
assert 'scrape_interval' in global_config
assert 'evaluation_interval' in global_config
scrape_configs = sample_prometheus_config['scrape_configs']
assert len(scrape_configs) > 0
# Проверяем каждый job
for job in scrape_configs:
assert 'job_name' in job
assert 'static_configs' in job
static_configs = job['static_configs']
assert len(static_configs) > 0
for static_config in static_configs:
assert 'targets' in static_config
targets = static_config['targets']
assert len(targets) > 0
@pytest.mark.integration
def test_metrics_data_consistency_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции консистентности данных метрик"""
# Получаем метрики разными способами
system_info = metrics_collector.get_system_info()
metrics_data = metrics_collector.get_metrics_data()
# Проверяем консистентность между system_info и metrics_data
# Реальные метрики могут значительно отличаться из-за времени между вызовами
# и системной нагрузки, поэтому используем более широкие допуски
if 'cpu_percent' in system_info and 'cpu_usage_percent' in metrics_data:
# CPU метрики могут сильно колебаться, используем допуск 50%
# Это связано с тем, что CPU измеряется в разные моменты времени
cpu_diff = abs(system_info['cpu_percent'] - metrics_data['cpu_usage_percent'])
assert cpu_diff < 50.0, f"CPU metrics difference too large: {cpu_diff}% (system: {system_info['cpu_percent']}%, metrics: {metrics_data['cpu_usage_percent']}%)"
if 'ram_percent' in system_info and 'ram_usage_percent' in metrics_data:
# RAM метрики более стабильны, но все же используем допуск 15%
ram_diff = abs(system_info['ram_percent'] - metrics_data['ram_usage_percent'])
assert ram_diff < 15.0, f"RAM metrics difference too large: {ram_diff}% (system: {system_info['ram_percent']}%, metrics: {metrics_data['ram_usage_percent']}%)"
if 'disk_percent' in system_info and 'disk_usage_percent' in metrics_data:
# Disk метрики должны быть очень стабильными, допуск 10%
disk_diff = abs(system_info['disk_percent'] - metrics_data['disk_usage_percent'])
assert disk_diff < 10.0, f"Disk metrics difference too large: {disk_diff}% (system: {system_info['disk_percent']}%, metrics: {metrics_data['disk_usage_percent']}%)"
# Проверяем, что все метрики имеют разумные значения
for metric_name, value in system_info.items():
if isinstance(value, (int, float)):
assert value >= 0, f"Metric {metric_name} should be non-negative: {value}"
for metric_name, value in metrics_data.items():
if isinstance(value, (int, float)):
assert value >= 0, f"Metric {metric_name} should be non-negative: {value}"
@pytest.mark.integration
def test_error_handling_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции обработки ошибок"""
# Тестируем обработку ошибок в PrometheusServer
with patch.object(metrics_collector, 'get_metrics_data', side_effect=Exception("Test error")):
prometheus_server.metrics_collector = metrics_collector
# Создаем мок запрос
request = Mock()
# Обрабатываем запрос метрик
response = asyncio.run(prometheus_server.metrics_handler(request))
# Должен вернуться ответ с ошибкой
assert response.status == 500
assert 'Error: Test error' in response.text
@pytest.mark.integration
def test_performance_integration(self, prometheus_server, metrics_collector):
"""Тест интеграции производительности"""
import time
# Измеряем время получения системной информации
start_time = time.time()
system_info = metrics_collector.get_system_info()
system_info_time = time.time() - start_time
# Измеряем время получения метрик
start_time = time.time()
metrics_data = metrics_collector.get_metrics_data()
metrics_time = time.time() - start_time
# Измеряем время форматирования Prometheus метрик
start_time = time.time()
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
formatting_time = time.time() - start_time
# Проверяем, что операции выполняются в разумное время
assert system_info_time < 5.0, f"System info collection took too long: {system_info_time}s"
assert metrics_time < 3.0, f"Metrics collection took too long: {metrics_time}s"
assert formatting_time < 0.1, f"Metrics formatting took too long: {formatting_time}s"
# Проверяем, что получили данные
assert isinstance(system_info, dict)
assert isinstance(metrics_data, dict)
assert isinstance(prometheus_metrics, str)
assert len(prometheus_metrics) > 0
class TestPrometheusEndToEnd:
"""End-to-end тесты для Prometheus"""
@pytest.mark.integration
@pytest.mark.slow
def test_full_metrics_pipeline(self):
"""Тест полного пайплайна метрик"""
# Создаем все компоненты
metrics_collector = MetricsCollector()
prometheus_server = PrometheusServer()
# 1. Собираем системную информацию
system_info = metrics_collector.get_system_info()
assert isinstance(system_info, dict)
# 2. Получаем метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# 3. Форматируем метрики в Prometheus формат
prometheus_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
assert isinstance(prometheus_metrics, str)
# 4. Проверяем, что метрики содержат необходимую информацию
lines = prometheus_metrics.split('\n')
# Должна быть системная информация
assert any('system_info' in line for line in lines)
# Должны быть метрики системы
assert any('cpu_usage_percent' in line for line in lines) or any('ram_usage_percent' in line for line in lines)
# 5. Проверяем алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
@pytest.mark.integration
@pytest.mark.slow
def test_metrics_stability(self):
"""Тест стабильности метрик"""
import time
metrics_collector = MetricsCollector()
# Получаем метрики несколько раз подряд
metrics_list = []
for _ in range(3):
metrics = metrics_collector.get_metrics_data()
metrics_list.append(metrics)
time.sleep(0.1) # Небольшая пауза
# Проверяем, что структура метрик не изменилась
for metrics in metrics_list:
assert isinstance(metrics, dict)
assert len(metrics) > 0
# Проверяем, что ключи метрик не изменились
first_keys = set(metrics_list[0].keys())
for metrics in metrics_list[1:]:
current_keys = set(metrics.keys())
# Некоторые метрики могут отсутствовать, но структура должна быть похожей
assert len(current_keys.intersection(first_keys)) > 0
if __name__ == "__main__":
pytest.main([__file__, "-v", "-m", "integration"])

View File

@@ -1,309 +0,0 @@
#!/usr/bin/env python3
"""
Тесты для PrometheusServer
"""
import pytest
import asyncio
import sys
import os
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from aiohttp import web
from aiohttp.test_utils import TestClient
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from prometheus_server import PrometheusServer
class TestPrometheusServer:
"""Тесты для класса PrometheusServer"""
@pytest.fixture
def prometheus_server(self):
"""Создает экземпляр PrometheusServer для тестов"""
return PrometheusServer(host='127.0.0.1', port=9091)
@pytest.fixture
def mock_metrics_collector(self):
"""Создает мок MetricsCollector"""
mock_collector = Mock()
mock_collector.os_type = "ubuntu"
mock_collector.get_metrics_data.return_value = {
'cpu_usage_percent': 25.5,
'ram_usage_percent': 60.2,
'disk_usage_percent': 45.8,
'load_average_1m': 1.2,
'load_average_5m': 1.1,
'load_average_15m': 1.0,
'swap_usage_percent': 10.5,
'disk_io_percent': 15.3,
'system_uptime_seconds': 86400.0,
'monitor_uptime_seconds': 3600.0
}
return mock_collector
def test_init(self, prometheus_server):
"""Тест инициализации PrometheusServer"""
assert prometheus_server.host == '127.0.0.1'
assert prometheus_server.port == 9091
assert prometheus_server.metrics_collector is not None
assert isinstance(prometheus_server.app, web.Application)
def test_setup_routes(self, prometheus_server):
"""Тест настройки маршрутов"""
routes = list(prometheus_server.app.router.routes())
# aiohttp создает по 2 маршрута для каждого эндпоинта (GET и HEAD)
assert len(routes) == 6
# Проверяем наличие всех маршрутов
route_paths = [route.resource.canonical for route in routes]
assert '/' in route_paths
assert '/metrics' in route_paths
assert '/health' in route_paths
@pytest.mark.asyncio
async def test_root_handler(self, prometheus_server):
"""Тест главного обработчика"""
request = Mock()
response = await prometheus_server.root_handler(request)
assert isinstance(response, web.Response)
assert response.status == 200
assert response.content_type == 'text/plain'
assert 'Prometheus Metrics Server' in response.text
assert '/metrics' in response.text
assert '/health' in response.text
@pytest.mark.asyncio
async def test_health_handler(self, prometheus_server):
"""Тест health check обработчика"""
request = Mock()
response = await prometheus_server.health_handler(request)
assert isinstance(response, web.Response)
assert response.status == 200
assert response.content_type == 'text/plain'
assert response.text == 'OK'
@pytest.mark.asyncio
async def test_metrics_handler_success(self, prometheus_server, mock_metrics_collector):
"""Тест обработчика метрик при успешном получении данных"""
# Заменяем metrics_collector на мок
prometheus_server.metrics_collector = mock_metrics_collector
request = Mock()
response = await prometheus_server.metrics_handler(request)
assert isinstance(response, web.Response)
assert response.status == 200
assert response.content_type == 'text/plain'
# Проверяем, что метрики содержат ожидаемые данные
metrics_text = response.text
assert '# HELP system_info System information' in metrics_text
assert '# TYPE system_info gauge' in metrics_text
assert 'system_info{os="ubuntu"}' in metrics_text
assert '# HELP cpu_usage_percent CPU usage percentage' in metrics_text
assert 'cpu_usage_percent 25.5' in metrics_text
@pytest.mark.asyncio
async def test_metrics_handler_error(self, prometheus_server, mock_metrics_collector):
"""Тест обработчика метрик при ошибке"""
# Настраиваем мок для вызова исключения
mock_metrics_collector.get_metrics_data.side_effect = Exception("Test error")
prometheus_server.metrics_collector = mock_metrics_collector
request = Mock()
response = await prometheus_server.metrics_handler(request)
assert isinstance(response, web.Response)
assert response.status == 500
assert response.content_type == 'text/plain'
assert 'Error: Test error' in response.text
def test_format_prometheus_metrics(self, prometheus_server, mock_metrics_collector):
"""Тест форматирования метрик в Prometheus формат"""
prometheus_server.metrics_collector = mock_metrics_collector
metrics_data = mock_metrics_collector.get_metrics_data()
formatted_metrics = prometheus_server._format_prometheus_metrics(metrics_data)
# Проверяем структуру метрик
lines = formatted_metrics.split('\n')
# Проверяем наличие системной информации
assert any('system_info' in line for line in lines)
assert any('os="ubuntu"' in line for line in lines)
# Проверяем наличие CPU метрик
assert any('cpu_usage_percent' in line for line in lines)
assert any('25.5' in line for line in lines)
# Проверяем наличие RAM метрик
assert any('ram_usage_percent' in line for line in lines)
assert any('60.2' in line for line in lines)
# Проверяем наличие disk метрик
assert any('disk_usage_percent' in line for line in lines)
assert any('45.8' in line for line in lines)
# Проверяем наличие load average метрик
assert any('load_average_1m' in line for line in lines)
assert any('1.2' in line for line in lines)
def test_format_prometheus_metrics_empty_data(self, prometheus_server):
"""Тест форматирования метрик с пустыми данными"""
empty_metrics = {}
formatted_metrics = prometheus_server._format_prometheus_metrics(empty_metrics)
# Должна быть только системная информация
lines = formatted_metrics.split('\n')
assert len(lines) == 3 # system_info help, type, value
assert any('system_info' in line for line in lines)
def test_format_prometheus_metrics_partial_data(self, prometheus_server, mock_metrics_collector):
"""Тест форматирования метрик с частичными данными"""
prometheus_server.metrics_collector = mock_metrics_collector
# Только CPU метрики
partial_metrics = {
'cpu_usage_percent': 50.0,
'load_average_1m': 2.5
}
formatted_metrics = prometheus_server._format_prometheus_metrics(partial_metrics)
lines = formatted_metrics.split('\n')
# Проверяем, что есть системная информация + CPU + load average
assert any('system_info' in line for line in lines)
assert any('cpu_usage_percent' in line for line in lines)
assert any('load_average_1m' in line for line in lines)
assert any('50.0' in line for line in lines)
assert any('2.5' in line for line in lines)
# Проверяем, что нет RAM метрик
assert not any('ram_usage_percent' in line for line in lines)
@pytest.mark.asyncio
async def test_start_and_stop(self, prometheus_server):
"""Тест запуска и остановки сервера"""
# Мокаем web.AppRunner и TCPSite
with patch('prometheus_server.web.AppRunner') as mock_runner_class, \
patch('prometheus_server.web.TCPSite') as mock_site_class:
mock_runner = Mock()
mock_runner.setup = AsyncMock()
mock_runner.cleanup = AsyncMock()
mock_runner_class.return_value = mock_runner
mock_site = Mock()
mock_site.start = AsyncMock()
mock_site_class.return_value = mock_site
# Запускаем сервер
runner = await prometheus_server.start()
# Проверяем, что методы были вызваны
mock_runner.setup.assert_called_once()
mock_site.start.assert_called_once()
assert runner == mock_runner
# Останавливаем сервер
await prometheus_server.stop(runner)
mock_runner.cleanup.assert_called_once()
def test_different_os_types(self):
"""Тест работы с разными типами ОС"""
# Тестируем macOS
with patch('platform.system', return_value='Darwin'):
server_macos = PrometheusServer()
assert server_macos.metrics_collector.os_type == "macos"
# Тестируем Linux
with patch('platform.system', return_value='Linux'):
server_linux = PrometheusServer()
assert server_linux.metrics_collector.os_type == "ubuntu"
# Тестируем неизвестную ОС
with patch('platform.system', return_value='Windows'):
server_unknown = PrometheusServer()
assert server_unknown.metrics_collector.os_type == "unknown"
def test_custom_host_port(self):
"""Тест создания сервера с пользовательскими параметрами"""
server = PrometheusServer(host='192.168.1.100', port=9092)
assert server.host == '192.168.1.100'
assert server.port == 9092
def test_metrics_collector_integration(self, prometheus_server):
"""Тест интеграции с MetricsCollector"""
# Проверяем, что metrics_collector имеет необходимые методы
collector = prometheus_server.metrics_collector
assert hasattr(collector, 'get_metrics_data')
assert hasattr(collector, 'os_type')
# Проверяем, что можем получить данные
metrics_data = collector.get_metrics_data()
assert isinstance(metrics_data, dict)
class TestPrometheusServerIntegration:
"""Интеграционные тесты для PrometheusServer"""
@pytest.mark.asyncio
async def test_server_creation_integration(self):
"""Интеграционный тест создания сервера"""
server = PrometheusServer(host='127.0.0.1', port=0)
# Проверяем, что сервер создался
assert server is not None
assert server.host == '127.0.0.1'
assert server.port == 0
# Проверяем, что приложение создалось
assert server.app is not None
# Проверяем, что маршруты настроены
routes = list(server.app.router.routes())
assert len(routes) > 0
@pytest.mark.asyncio
async def test_metrics_collector_integration(self):
"""Интеграционный тест с MetricsCollector"""
server = PrometheusServer(host='127.0.0.1', port=0)
# Проверяем, что можем получить метрики
metrics_data = server.metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# Проверяем, что можем отформатировать метрики
prometheus_metrics = server._format_prometheus_metrics(metrics_data)
assert isinstance(prometheus_metrics, str)
assert len(prometheus_metrics) > 0
@pytest.mark.asyncio
async def test_endpoint_handlers_integration(self):
"""Интеграционный тест обработчиков эндпоинтов"""
server = PrometheusServer(host='127.0.0.1', port=0)
# Тестируем корневой обработчик
request = Mock()
response = await server.root_handler(request)
assert response.status == 200
assert 'Prometheus Metrics Server' in response.text
# Тестируем health обработчик
response = await server.health_handler(request)
assert response.status == 200
assert response.text == 'OK'
# Тестируем metrics обработчик
response = await server.metrics_handler(request)
assert response.status == 200
assert '# HELP system_info' in response.text
if __name__ == "__main__":
pytest.main([__file__, "-v"])