465 lines
21 KiB
Python
465 lines
21 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Тесты для MetricsCollector
|
||
"""
|
||
|
||
import pytest
|
||
import sys
|
||
import os
|
||
import time
|
||
import platform
|
||
from unittest.mock import Mock, patch, MagicMock
|
||
from datetime import datetime
|
||
|
||
# Добавляем путь к модулям мониторинга
|
||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
|
||
|
||
from infra.monitoring.metrics_collector import MetricsCollector
|
||
|
||
|
||
class TestMetricsCollector:
|
||
"""Тесты для класса MetricsCollector"""
|
||
|
||
@pytest.fixture
|
||
def metrics_collector(self):
|
||
"""Создает экземпляр MetricsCollector для тестов"""
|
||
return MetricsCollector()
|
||
|
||
@pytest.fixture
|
||
def mock_psutil(self):
|
||
"""Мок для psutil"""
|
||
mock_psutil = Mock()
|
||
|
||
# Мокаем CPU
|
||
mock_psutil.cpu_percent.return_value = 25.5
|
||
mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0)
|
||
mock_psutil.cpu_count.return_value = 8
|
||
|
||
# Мокаем память
|
||
mock_memory = Mock()
|
||
mock_memory.used = 8 * (1024**3) # 8 GB
|
||
mock_memory.total = 16 * (1024**3) # 16 GB
|
||
mock_psutil.virtual_memory.return_value = mock_memory
|
||
|
||
mock_swap = Mock()
|
||
mock_swap.used = 1 * (1024**3) # 1 GB
|
||
mock_swap.total = 2 * (1024**3) # 2 GB
|
||
mock_swap.percent = 50.0
|
||
mock_psutil.swap_memory.return_value = mock_swap
|
||
|
||
# Мокаем диск
|
||
mock_disk = Mock()
|
||
mock_disk.used = 100 * (1024**3) # 100 GB
|
||
mock_disk.total = 500 * (1024**3) # 500 GB
|
||
mock_disk.free = 400 * (1024**3) # 400 GB
|
||
mock_psutil.disk_usage.return_value = mock_disk
|
||
|
||
# Мокаем disk I/O
|
||
mock_disk_io = Mock()
|
||
mock_disk_io.read_count = 1000
|
||
mock_disk_io.write_count = 500
|
||
mock_disk_io.read_bytes = 1024 * (1024**2) # 1 GB
|
||
mock_disk_io.write_bytes = 512 * (1024**2) # 512 MB
|
||
mock_psutil.disk_io_counters.return_value = mock_disk_io
|
||
|
||
# Мокаем boot time
|
||
mock_psutil.boot_time.return_value = time.time() - 86400 # 1 день назад
|
||
|
||
return mock_psutil
|
||
|
||
def test_init(self, metrics_collector):
|
||
"""Тест инициализации MetricsCollector"""
|
||
assert metrics_collector.threshold == 80.0
|
||
assert metrics_collector.recovery_threshold == 75.0
|
||
assert isinstance(metrics_collector.alert_states, dict)
|
||
assert 'cpu' in metrics_collector.alert_states
|
||
assert 'ram' in metrics_collector.alert_states
|
||
assert 'disk' in metrics_collector.alert_states
|
||
assert metrics_collector.monitor_start_time > 0
|
||
|
||
def test_detect_os_macos(self):
|
||
"""Тест определения macOS"""
|
||
with patch('platform.system', return_value='Darwin'):
|
||
collector = MetricsCollector()
|
||
assert collector.os_type == "macos"
|
||
|
||
def test_detect_os_linux(self):
|
||
"""Тест определения Linux"""
|
||
with patch('platform.system', return_value='Linux'):
|
||
collector = MetricsCollector()
|
||
assert collector.os_type == "ubuntu"
|
||
|
||
def test_detect_os_unknown(self):
|
||
"""Тест определения неизвестной ОС"""
|
||
with patch('platform.system', return_value='Windows'):
|
||
collector = MetricsCollector()
|
||
assert collector.os_type == "unknown"
|
||
|
||
def test_get_disk_path(self, metrics_collector):
|
||
"""Тест получения пути к диску"""
|
||
# Для всех ОС должен возвращаться "/"
|
||
assert metrics_collector._get_disk_path() == "/"
|
||
|
||
@patch('subprocess.run')
|
||
def test_get_macos_disk_usage_success(self, mock_subprocess, metrics_collector):
|
||
"""Тест получения информации о диске macOS через diskutil"""
|
||
# Настраиваем мок для macOS
|
||
metrics_collector.os_type = "macos"
|
||
|
||
# Мокаем успешный вывод diskutil
|
||
mock_result = Mock()
|
||
mock_result.returncode = 0
|
||
mock_result.stdout = """
|
||
Container Total Space: 500.0 GB
|
||
Container Free Space: 400.0 GB
|
||
"""
|
||
mock_subprocess.return_value = mock_result
|
||
|
||
disk_info = metrics_collector._get_macos_disk_usage()
|
||
|
||
assert disk_info is not None
|
||
assert disk_info.total == 500.0 * (1024**3) # В байтах
|
||
assert disk_info.free == 400.0 * (1024**3)
|
||
assert disk_info.used == 100.0 * (1024**3)
|
||
|
||
@patch('subprocess.run')
|
||
def test_get_macos_disk_usage_fallback(self, mock_subprocess, metrics_collector):
|
||
"""Тест fallback к psutil при ошибке diskutil"""
|
||
metrics_collector.os_type = "macos"
|
||
|
||
# Мокаем неуспешный вывод diskutil
|
||
mock_result = Mock()
|
||
mock_result.returncode = 1
|
||
mock_subprocess.return_value = mock_result
|
||
|
||
with patch('metrics_collector.psutil.disk_usage') as mock_psutil_disk:
|
||
mock_disk = Mock()
|
||
mock_disk.used = 100 * (1024**3)
|
||
mock_disk.total = 500 * (1024**3)
|
||
mock_disk.free = 400 * (1024**3)
|
||
mock_psutil_disk.return_value = mock_disk
|
||
|
||
disk_info = metrics_collector._get_macos_disk_usage()
|
||
assert disk_info == mock_disk
|
||
|
||
def test_get_system_uptime(self, metrics_collector):
|
||
"""Тест получения uptime системы"""
|
||
with patch('metrics_collector.psutil.boot_time') as mock_boot_time:
|
||
mock_boot_time.return_value = time.time() - 3600 # 1 час назад
|
||
|
||
uptime = metrics_collector._get_system_uptime()
|
||
assert uptime > 0
|
||
assert uptime <= 3600.1 # Не больше часа (с небольшим допуском)
|
||
|
||
def test_get_monitor_uptime(self, metrics_collector):
|
||
"""Тест получения uptime мониторинга"""
|
||
# Ждем немного, чтобы uptime изменился
|
||
time.sleep(0.1)
|
||
|
||
uptime = metrics_collector.get_monitor_uptime()
|
||
assert isinstance(uptime, str)
|
||
assert 'м' in uptime or 'ч' in uptime or 'д' in uptime
|
||
|
||
def test_get_system_info_success(self, metrics_collector):
|
||
"""Тест получения системной информации"""
|
||
# Мокаем все необходимые функции psutil
|
||
with patch('metrics_collector.psutil.cpu_percent', return_value=25.5) as mock_cpu, \
|
||
patch('metrics_collector.psutil.getloadavg', return_value=(1.2, 1.1, 1.0)) as mock_load, \
|
||
patch('metrics_collector.psutil.cpu_count', return_value=8) as mock_cpu_count, \
|
||
patch('metrics_collector.psutil.cpu_times_percent') as mock_cpu_times, \
|
||
patch('metrics_collector.psutil.virtual_memory') as mock_virtual_memory, \
|
||
patch('metrics_collector.psutil.swap_memory') as mock_swap_memory, \
|
||
patch('metrics_collector.psutil.disk_usage') as mock_disk_usage, \
|
||
patch('metrics_collector.psutil.disk_io_counters') as mock_disk_io, \
|
||
patch('metrics_collector.psutil.boot_time', return_value=time.time() - 86400) as mock_boot_time, \
|
||
patch('os.uname') as mock_uname:
|
||
|
||
# Настраиваем моки для CPU
|
||
mock_cpu_times_obj = Mock()
|
||
mock_cpu_times_obj.iowait = 2.5
|
||
mock_cpu_times.return_value = mock_cpu_times_obj
|
||
|
||
# Настраиваем моки для памяти
|
||
mock_memory = Mock()
|
||
mock_memory.used = 8 * (1024**3)
|
||
mock_memory.total = 16 * (1024**3)
|
||
mock_virtual_memory.return_value = mock_memory
|
||
|
||
# Настраиваем моки для swap
|
||
mock_swap = Mock()
|
||
mock_swap.used = 1 * (1024**3)
|
||
mock_swap.total = 2 * (1024**3)
|
||
mock_swap.percent = 50.0
|
||
mock_swap_memory.return_value = mock_swap
|
||
|
||
# Настраиваем моки для диска
|
||
mock_disk = Mock()
|
||
mock_disk.used = 100 * (1024**3)
|
||
mock_disk.total = 500 * (1024**3)
|
||
mock_disk.free = 400 * (1024**3)
|
||
mock_disk_usage.return_value = mock_disk
|
||
|
||
# Настраиваем моки для disk I/O
|
||
mock_disk_io_obj = Mock()
|
||
mock_disk_io_obj.read_count = 1000
|
||
mock_disk_io_obj.write_count = 500
|
||
mock_disk_io_obj.read_bytes = 1024 * (1024**2)
|
||
mock_disk_io_obj.write_bytes = 512 * (1024**2)
|
||
mock_disk_io.return_value = mock_disk_io_obj
|
||
|
||
# Настраиваем мок для hostname
|
||
mock_uname.return_value.nodename = "test-host"
|
||
|
||
# Мокаем _get_disk_usage чтобы возвращал наш мок
|
||
with patch.object(metrics_collector, '_get_disk_usage', return_value=mock_disk):
|
||
system_info = metrics_collector.get_system_info()
|
||
|
||
assert isinstance(system_info, dict)
|
||
assert 'cpu_percent' in system_info
|
||
assert 'ram_percent' in system_info
|
||
assert 'disk_percent' in system_info
|
||
assert 'io_wait_percent' in system_info
|
||
assert 'server_hostname' in system_info
|
||
|
||
# Проверяем расчеты
|
||
assert system_info['cpu_percent'] == 25.5
|
||
assert system_info['ram_percent'] == 50.0 # 8/16 * 100
|
||
assert system_info['disk_percent'] == 20.0 # 100/500 * 100
|
||
assert system_info['io_wait_percent'] == 2.5
|
||
assert system_info['server_hostname'] == "test-host"
|
||
|
||
def test_get_system_info_error(self, metrics_collector):
|
||
"""Тест получения системной информации при ошибке"""
|
||
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
|
||
system_info = metrics_collector.get_system_info()
|
||
assert system_info == {}
|
||
|
||
def test_format_bytes(self, metrics_collector):
|
||
"""Тест форматирования байтов"""
|
||
assert metrics_collector._format_bytes(0) == "0 B"
|
||
assert metrics_collector._format_bytes(1024) == "1.0 KB"
|
||
assert metrics_collector._format_bytes(1024**2) == "1.0 MB"
|
||
assert metrics_collector._format_bytes(1024**3) == "1.0 GB"
|
||
assert metrics_collector._format_bytes(1024**4) == "1.0 TB"
|
||
|
||
def test_format_uptime(self, metrics_collector):
|
||
"""Тест форматирования uptime"""
|
||
assert metrics_collector._format_uptime(60) == "1м"
|
||
assert metrics_collector._format_uptime(3600) == "1ч 0м"
|
||
assert metrics_collector._format_uptime(86400) == "1д 0ч 0м"
|
||
assert metrics_collector._format_uptime(90000) == "1д 1ч 0м"
|
||
|
||
def test_check_process_status_pid_file(self, metrics_collector, tmp_path):
|
||
"""Тест проверки статуса процесса по PID файлу"""
|
||
# Создаем временный PID файл
|
||
pid_file = tmp_path / "helper_bot.pid"
|
||
pid_file.write_text("12345")
|
||
|
||
# Временно заменяем путь к PID файлу
|
||
original_pid_files = metrics_collector.pid_files.copy()
|
||
metrics_collector.pid_files['helper_bot'] = str(pid_file)
|
||
|
||
with patch('metrics_collector.psutil.pid_exists', return_value=True), \
|
||
patch('metrics_collector.psutil.Process') as mock_process:
|
||
|
||
mock_proc = Mock()
|
||
mock_proc.create_time.return_value = time.time() - 3600
|
||
mock_process.return_value = mock_proc
|
||
|
||
status, uptime = metrics_collector.check_process_status('helper_bot')
|
||
|
||
assert status == "✅"
|
||
assert "Uptime" in uptime
|
||
|
||
# Восстанавливаем оригинальные PID файлы
|
||
metrics_collector.pid_files = original_pid_files
|
||
|
||
def test_check_process_status_not_running(self, metrics_collector):
|
||
"""Тест проверки статуса неработающего процесса"""
|
||
with patch('metrics_collector.psutil.process_iter', return_value=[]):
|
||
status, message = metrics_collector.check_process_status('nonexistent_bot')
|
||
assert status == "❌"
|
||
assert message == "Выключен"
|
||
|
||
def test_calculate_disk_speed(self, metrics_collector):
|
||
"""Тест расчета скорости диска"""
|
||
# Инициализируем базовые значения
|
||
metrics_collector._initialize_disk_io()
|
||
|
||
# Создаем текущую статистику диска
|
||
current_disk_io = Mock()
|
||
current_disk_io.read_bytes = 2048 * (1024**2) # 2 GB
|
||
current_disk_io.write_bytes = 1024 * (1024**2) # 1 GB
|
||
|
||
# Ждем немного для расчета скорости
|
||
time.sleep(0.1)
|
||
|
||
read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io)
|
||
|
||
assert isinstance(read_speed, str)
|
||
assert isinstance(write_speed, str)
|
||
assert "/s" in read_speed
|
||
assert "/s" in write_speed
|
||
|
||
def test_calculate_disk_io_percent(self, metrics_collector):
|
||
"""Тест расчета процента загрузки диска"""
|
||
# Инициализируем базовые значения
|
||
metrics_collector._initialize_disk_io()
|
||
|
||
# Создаем текущую статистику диска
|
||
current_disk_io = Mock()
|
||
current_disk_io.read_count = 2000
|
||
current_disk_io.write_count = 1000
|
||
current_disk_io.read_bytes = 2048 * (1024**2)
|
||
current_disk_io.write_bytes = 1024 * (1024**2)
|
||
|
||
# Ждем немного для расчета
|
||
time.sleep(0.1)
|
||
|
||
io_percent = metrics_collector._calculate_disk_io_percent()
|
||
|
||
assert isinstance(io_percent, int)
|
||
assert 0 <= io_percent <= 100
|
||
|
||
def test_get_metrics_data(self, metrics_collector):
|
||
"""Тест получения данных для метрик Prometheus"""
|
||
with patch.object(metrics_collector, 'get_system_info') as mock_get_system_info:
|
||
mock_get_system_info.return_value = {
|
||
'cpu_percent': 25.5,
|
||
'ram_percent': 60.2,
|
||
'disk_percent': 45.8,
|
||
'load_avg_1m': 1.2,
|
||
'load_avg_5m': 1.1,
|
||
'load_avg_15m': 1.0,
|
||
'swap_percent': 10.5
|
||
}
|
||
|
||
with patch.object(metrics_collector, '_get_system_uptime', return_value=86400.0):
|
||
metrics_data = metrics_collector.get_metrics_data()
|
||
|
||
assert isinstance(metrics_data, dict)
|
||
assert 'cpu_usage_percent' in metrics_data
|
||
assert 'ram_usage_percent' in metrics_data
|
||
assert 'disk_usage_percent' in metrics_data
|
||
assert 'load_average_1m' in metrics_data
|
||
assert 'system_uptime_seconds' in metrics_data
|
||
assert 'monitor_uptime_seconds' in metrics_data
|
||
|
||
def test_check_alerts(self, metrics_collector):
|
||
"""Тест проверки алертов"""
|
||
# Сбрасываем состояния алертов для чистого теста
|
||
metrics_collector.alert_states = {'cpu': False, 'ram': False, 'disk': False}
|
||
metrics_collector.alert_start_times = {'cpu': None, 'ram': None, 'disk': None}
|
||
|
||
# Устанавливаем минимальные задержки для тестов
|
||
metrics_collector.alert_delays = {'cpu': 0, 'ram': 0, 'disk': 0}
|
||
|
||
# Тестируем превышение порога CPU
|
||
system_info = {
|
||
'cpu_percent': 85.0, # Выше порога 80.0
|
||
'ram_percent': 60.0, # Ниже порога
|
||
'disk_percent': 70.0, # Ниже порога
|
||
'load_avg_1m': 2.5,
|
||
'ram_used': 8.0,
|
||
'ram_total': 16.0,
|
||
'disk_free': 300.0
|
||
}
|
||
|
||
alerts, recoveries = metrics_collector.check_alerts(system_info)
|
||
|
||
assert len(alerts) == 1
|
||
assert alerts[0][0] == 'cpu' # Тип алерта
|
||
assert alerts[0][1] == 85.0 # Значение
|
||
assert len(recoveries) == 0
|
||
|
||
# Проверяем, что состояние алерта изменилось
|
||
assert metrics_collector.alert_states['cpu'] is True
|
||
|
||
# Тестируем восстановление
|
||
system_info['cpu_percent'] = 70.0 # Ниже recovery threshold 75.0
|
||
|
||
alerts, recoveries = metrics_collector.check_alerts(system_info)
|
||
|
||
assert len(alerts) == 0
|
||
assert len(recoveries) == 1
|
||
assert recoveries[0][0] == 'cpu'
|
||
assert metrics_collector.alert_states['cpu'] is False
|
||
|
||
def test_environment_variables(self):
|
||
"""Тест работы с переменными окружения"""
|
||
with patch.dict(os.environ, {'THRESHOLD': '90.0', 'RECOVERY_THRESHOLD': '85.0'}):
|
||
collector = MetricsCollector()
|
||
assert collector.threshold == 90.0
|
||
assert collector.recovery_threshold == 85.0
|
||
|
||
def test_metrics_collector_integration(self, metrics_collector):
|
||
"""Интеграционный тест MetricsCollector"""
|
||
# Проверяем, что можем получить системную информацию
|
||
system_info = metrics_collector.get_system_info()
|
||
|
||
# Даже если некоторые метрики недоступны, должны получить словарь
|
||
assert isinstance(system_info, dict)
|
||
|
||
# Проверяем, что можем получить метрики для Prometheus
|
||
metrics_data = metrics_collector.get_metrics_data()
|
||
assert isinstance(metrics_data, dict)
|
||
|
||
# Проверяем, что можем проверить алерты
|
||
alerts, recoveries = metrics_collector.check_alerts(system_info)
|
||
assert isinstance(alerts, list)
|
||
assert isinstance(recoveries, list)
|
||
|
||
|
||
class TestMetricsCollectorEdgeCases:
|
||
"""Тесты граничных случаев для MetricsCollector"""
|
||
|
||
def test_empty_system_info(self):
|
||
"""Тест работы с пустой системной информацией"""
|
||
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
|
||
collector = MetricsCollector()
|
||
system_info = collector.get_system_info()
|
||
assert system_info == {}
|
||
|
||
def test_missing_disk_info(self):
|
||
"""Тест работы при отсутствии информации о диске"""
|
||
collector = MetricsCollector()
|
||
|
||
with patch.object(collector, '_get_disk_usage', return_value=None), \
|
||
patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
|
||
system_info = collector.get_system_info()
|
||
assert system_info == {}
|
||
|
||
def test_disk_io_calculation_without_previous_data(self):
|
||
"""Тест расчета I/O диска без предыдущих данных"""
|
||
collector = MetricsCollector()
|
||
|
||
# Сбрасываем предыдущие данные
|
||
collector.last_disk_io = None
|
||
collector.last_disk_io_time = None
|
||
|
||
current_disk_io = Mock()
|
||
current_disk_io.read_bytes = 1024
|
||
current_disk_io.write_bytes = 512
|
||
|
||
read_speed, write_speed = collector._calculate_disk_speed(current_disk_io)
|
||
|
||
assert read_speed == "0 B/s"
|
||
assert write_speed == "0 B/s"
|
||
|
||
def test_uptime_calculation_edge_cases(self):
|
||
"""Тест расчета uptime для граничных случаев"""
|
||
collector = MetricsCollector()
|
||
|
||
# Тест для очень малого времени
|
||
assert collector._format_uptime(0) == "0м"
|
||
assert collector._format_uptime(30) == "0м"
|
||
|
||
# Тест для очень большого времени
|
||
large_uptime = 365 * 24 * 3600 # 1 год
|
||
uptime_str = collector._format_uptime(large_uptime)
|
||
assert "д" in uptime_str
|
||
|
||
|
||
if __name__ == "__main__":
|
||
pytest.main([__file__, "-v"])
|