#!/usr/bin/env python3 """ Тесты для MetricsCollector """ import pytest import sys import os import time import platform from unittest.mock import Mock, patch, MagicMock from datetime import datetime # Добавляем путь к модулям мониторинга sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring')) from infra.monitoring.metrics_collector import MetricsCollector class TestMetricsCollector: """Тесты для класса MetricsCollector""" @pytest.fixture def metrics_collector(self): """Создает экземпляр MetricsCollector для тестов""" return MetricsCollector() @pytest.fixture def mock_psutil(self): """Мок для psutil""" mock_psutil = Mock() # Мокаем CPU mock_psutil.cpu_percent.return_value = 25.5 mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0) mock_psutil.cpu_count.return_value = 8 # Мокаем память mock_memory = Mock() mock_memory.used = 8 * (1024**3) # 8 GB mock_memory.total = 16 * (1024**3) # 16 GB mock_psutil.virtual_memory.return_value = mock_memory mock_swap = Mock() mock_swap.used = 1 * (1024**3) # 1 GB mock_swap.total = 2 * (1024**3) # 2 GB mock_swap.percent = 50.0 mock_psutil.swap_memory.return_value = mock_swap # Мокаем диск mock_disk = Mock() mock_disk.used = 100 * (1024**3) # 100 GB mock_disk.total = 500 * (1024**3) # 500 GB mock_disk.free = 400 * (1024**3) # 400 GB mock_psutil.disk_usage.return_value = mock_disk # Мокаем disk I/O mock_disk_io = Mock() mock_disk_io.read_count = 1000 mock_disk_io.write_count = 500 mock_disk_io.read_bytes = 1024 * (1024**2) # 1 GB mock_disk_io.write_bytes = 512 * (1024**2) # 512 MB mock_psutil.disk_io_counters.return_value = mock_disk_io # Мокаем boot time mock_psutil.boot_time.return_value = time.time() - 86400 # 1 день назад return mock_psutil def test_init(self, metrics_collector): """Тест инициализации MetricsCollector""" assert metrics_collector.threshold == 80.0 assert metrics_collector.recovery_threshold == 75.0 assert isinstance(metrics_collector.alert_states, dict) assert 'cpu' in metrics_collector.alert_states assert 'ram' in metrics_collector.alert_states assert 'disk' in metrics_collector.alert_states assert metrics_collector.monitor_start_time > 0 def test_detect_os_macos(self): """Тест определения macOS""" with patch('platform.system', return_value='Darwin'): collector = MetricsCollector() assert collector.os_type == "macos" def test_detect_os_linux(self): """Тест определения Linux""" with patch('platform.system', return_value='Linux'): collector = MetricsCollector() assert collector.os_type == "ubuntu" def test_detect_os_unknown(self): """Тест определения неизвестной ОС""" with patch('platform.system', return_value='Windows'): collector = MetricsCollector() assert collector.os_type == "unknown" def test_get_disk_path(self, metrics_collector): """Тест получения пути к диску""" # Для всех ОС должен возвращаться "/" assert metrics_collector._get_disk_path() == "/" @patch('subprocess.run') def test_get_macos_disk_usage_success(self, mock_subprocess, metrics_collector): """Тест получения информации о диске macOS через diskutil""" # Настраиваем мок для macOS metrics_collector.os_type = "macos" # Мокаем успешный вывод diskutil mock_result = Mock() mock_result.returncode = 0 mock_result.stdout = """ Container Total Space: 500.0 GB Container Free Space: 400.0 GB """ mock_subprocess.return_value = mock_result disk_info = metrics_collector._get_macos_disk_usage() assert disk_info is not None assert disk_info.total == 500.0 * (1024**3) # В байтах assert disk_info.free == 400.0 * (1024**3) assert disk_info.used == 100.0 * (1024**3) @patch('subprocess.run') def test_get_macos_disk_usage_fallback(self, mock_subprocess, metrics_collector): """Тест fallback к psutil при ошибке diskutil""" metrics_collector.os_type = "macos" # Мокаем неуспешный вывод diskutil mock_result = Mock() mock_result.returncode = 1 mock_subprocess.return_value = mock_result with patch('metrics_collector.psutil.disk_usage') as mock_psutil_disk: mock_disk = Mock() mock_disk.used = 100 * (1024**3) mock_disk.total = 500 * (1024**3) mock_disk.free = 400 * (1024**3) mock_psutil_disk.return_value = mock_disk disk_info = metrics_collector._get_macos_disk_usage() assert disk_info == mock_disk def test_get_system_uptime(self, metrics_collector): """Тест получения uptime системы""" with patch('metrics_collector.psutil.boot_time') as mock_boot_time: mock_boot_time.return_value = time.time() - 3600 # 1 час назад uptime = metrics_collector._get_system_uptime() assert uptime > 0 assert uptime <= 3600.1 # Не больше часа (с небольшим допуском) def test_get_monitor_uptime(self, metrics_collector): """Тест получения uptime мониторинга""" # Ждем немного, чтобы uptime изменился time.sleep(0.1) uptime = metrics_collector.get_monitor_uptime() assert isinstance(uptime, str) assert 'м' in uptime or 'ч' in uptime or 'д' in uptime def test_get_system_info_success(self, metrics_collector): """Тест получения системной информации""" # Мокаем все необходимые функции psutil with patch('metrics_collector.psutil.cpu_percent', return_value=25.5) as mock_cpu, \ patch('metrics_collector.psutil.getloadavg', return_value=(1.2, 1.1, 1.0)) as mock_load, \ patch('metrics_collector.psutil.cpu_count', return_value=8) as mock_cpu_count, \ patch('metrics_collector.psutil.cpu_times_percent') as mock_cpu_times, \ patch('metrics_collector.psutil.virtual_memory') as mock_virtual_memory, \ patch('metrics_collector.psutil.swap_memory') as mock_swap_memory, \ patch('metrics_collector.psutil.disk_usage') as mock_disk_usage, \ patch('metrics_collector.psutil.disk_io_counters') as mock_disk_io, \ patch('metrics_collector.psutil.boot_time', return_value=time.time() - 86400) as mock_boot_time, \ patch('os.uname') as mock_uname: # Настраиваем моки для CPU mock_cpu_times_obj = Mock() mock_cpu_times_obj.iowait = 2.5 mock_cpu_times.return_value = mock_cpu_times_obj # Настраиваем моки для памяти mock_memory = Mock() mock_memory.used = 8 * (1024**3) mock_memory.total = 16 * (1024**3) mock_virtual_memory.return_value = mock_memory # Настраиваем моки для swap mock_swap = Mock() mock_swap.used = 1 * (1024**3) mock_swap.total = 2 * (1024**3) mock_swap.percent = 50.0 mock_swap_memory.return_value = mock_swap # Настраиваем моки для диска mock_disk = Mock() mock_disk.used = 100 * (1024**3) mock_disk.total = 500 * (1024**3) mock_disk.free = 400 * (1024**3) mock_disk_usage.return_value = mock_disk # Настраиваем моки для disk I/O mock_disk_io_obj = Mock() mock_disk_io_obj.read_count = 1000 mock_disk_io_obj.write_count = 500 mock_disk_io_obj.read_bytes = 1024 * (1024**2) mock_disk_io_obj.write_bytes = 512 * (1024**2) mock_disk_io.return_value = mock_disk_io_obj # Настраиваем мок для hostname mock_uname.return_value.nodename = "test-host" # Мокаем _get_disk_usage чтобы возвращал наш мок with patch.object(metrics_collector, '_get_disk_usage', return_value=mock_disk): system_info = metrics_collector.get_system_info() assert isinstance(system_info, dict) assert 'cpu_percent' in system_info assert 'ram_percent' in system_info assert 'disk_percent' in system_info assert 'io_wait_percent' in system_info assert 'server_hostname' in system_info # Проверяем расчеты assert system_info['cpu_percent'] == 25.5 assert system_info['ram_percent'] == 50.0 # 8/16 * 100 assert system_info['disk_percent'] == 20.0 # 100/500 * 100 assert system_info['io_wait_percent'] == 2.5 assert system_info['server_hostname'] == "test-host" def test_get_system_info_error(self, metrics_collector): """Тест получения системной информации при ошибке""" with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")): system_info = metrics_collector.get_system_info() assert system_info == {} def test_format_bytes(self, metrics_collector): """Тест форматирования байтов""" assert metrics_collector._format_bytes(0) == "0 B" assert metrics_collector._format_bytes(1024) == "1.0 KB" assert metrics_collector._format_bytes(1024**2) == "1.0 MB" assert metrics_collector._format_bytes(1024**3) == "1.0 GB" assert metrics_collector._format_bytes(1024**4) == "1.0 TB" def test_format_uptime(self, metrics_collector): """Тест форматирования uptime""" assert metrics_collector._format_uptime(60) == "1м" assert metrics_collector._format_uptime(3600) == "1ч 0м" assert metrics_collector._format_uptime(86400) == "1д 0ч 0м" assert metrics_collector._format_uptime(90000) == "1д 1ч 0м" def test_check_process_status_pid_file(self, metrics_collector, tmp_path): """Тест проверки статуса процесса по PID файлу""" # Создаем временный PID файл pid_file = tmp_path / "helper_bot.pid" pid_file.write_text("12345") # Временно заменяем путь к PID файлу original_pid_files = metrics_collector.pid_files.copy() metrics_collector.pid_files['helper_bot'] = str(pid_file) with patch('metrics_collector.psutil.pid_exists', return_value=True), \ patch('metrics_collector.psutil.Process') as mock_process: mock_proc = Mock() mock_proc.create_time.return_value = time.time() - 3600 mock_process.return_value = mock_proc status, uptime = metrics_collector.check_process_status('helper_bot') assert status == "✅" assert "Uptime" in uptime # Восстанавливаем оригинальные PID файлы metrics_collector.pid_files = original_pid_files def test_check_process_status_not_running(self, metrics_collector): """Тест проверки статуса неработающего процесса""" with patch('metrics_collector.psutil.process_iter', return_value=[]): status, message = metrics_collector.check_process_status('nonexistent_bot') assert status == "❌" assert message == "Выключен" def test_calculate_disk_speed(self, metrics_collector): """Тест расчета скорости диска""" # Инициализируем базовые значения metrics_collector._initialize_disk_io() # Создаем текущую статистику диска current_disk_io = Mock() current_disk_io.read_bytes = 2048 * (1024**2) # 2 GB current_disk_io.write_bytes = 1024 * (1024**2) # 1 GB # Ждем немного для расчета скорости time.sleep(0.1) read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io) assert isinstance(read_speed, str) assert isinstance(write_speed, str) assert "/s" in read_speed assert "/s" in write_speed def test_calculate_disk_io_percent(self, metrics_collector): """Тест расчета процента загрузки диска""" # Инициализируем базовые значения metrics_collector._initialize_disk_io() # Создаем текущую статистику диска current_disk_io = Mock() current_disk_io.read_count = 2000 current_disk_io.write_count = 1000 current_disk_io.read_bytes = 2048 * (1024**2) current_disk_io.write_bytes = 1024 * (1024**2) # Ждем немного для расчета time.sleep(0.1) io_percent = metrics_collector._calculate_disk_io_percent() assert isinstance(io_percent, int) assert 0 <= io_percent <= 100 def test_get_metrics_data(self, metrics_collector): """Тест получения данных для метрик Prometheus""" with patch.object(metrics_collector, 'get_system_info') as mock_get_system_info: mock_get_system_info.return_value = { 'cpu_percent': 25.5, 'ram_percent': 60.2, 'disk_percent': 45.8, 'load_avg_1m': 1.2, 'load_avg_5m': 1.1, 'load_avg_15m': 1.0, 'swap_percent': 10.5 } with patch.object(metrics_collector, '_get_system_uptime', return_value=86400.0): metrics_data = metrics_collector.get_metrics_data() assert isinstance(metrics_data, dict) assert 'cpu_usage_percent' in metrics_data assert 'ram_usage_percent' in metrics_data assert 'disk_usage_percent' in metrics_data assert 'load_average_1m' in metrics_data assert 'system_uptime_seconds' in metrics_data assert 'monitor_uptime_seconds' in metrics_data def test_check_alerts(self, metrics_collector): """Тест проверки алертов""" # Сбрасываем состояния алертов для чистого теста metrics_collector.alert_states = {'cpu': False, 'ram': False, 'disk': False} metrics_collector.alert_start_times = {'cpu': None, 'ram': None, 'disk': None} # Устанавливаем минимальные задержки для тестов metrics_collector.alert_delays = {'cpu': 0, 'ram': 0, 'disk': 0} # Тестируем превышение порога CPU system_info = { 'cpu_percent': 85.0, # Выше порога 80.0 'ram_percent': 60.0, # Ниже порога 'disk_percent': 70.0, # Ниже порога 'load_avg_1m': 2.5, 'ram_used': 8.0, 'ram_total': 16.0, 'disk_free': 300.0 } alerts, recoveries = metrics_collector.check_alerts(system_info) assert len(alerts) == 1 assert alerts[0][0] == 'cpu' # Тип алерта assert alerts[0][1] == 85.0 # Значение assert len(recoveries) == 0 # Проверяем, что состояние алерта изменилось assert metrics_collector.alert_states['cpu'] is True # Тестируем восстановление system_info['cpu_percent'] = 70.0 # Ниже recovery threshold 75.0 alerts, recoveries = metrics_collector.check_alerts(system_info) assert len(alerts) == 0 assert len(recoveries) == 1 assert recoveries[0][0] == 'cpu' assert metrics_collector.alert_states['cpu'] is False def test_environment_variables(self): """Тест работы с переменными окружения""" with patch.dict(os.environ, {'THRESHOLD': '90.0', 'RECOVERY_THRESHOLD': '85.0'}): collector = MetricsCollector() assert collector.threshold == 90.0 assert collector.recovery_threshold == 85.0 def test_metrics_collector_integration(self, metrics_collector): """Интеграционный тест MetricsCollector""" # Проверяем, что можем получить системную информацию system_info = metrics_collector.get_system_info() # Даже если некоторые метрики недоступны, должны получить словарь assert isinstance(system_info, dict) # Проверяем, что можем получить метрики для Prometheus metrics_data = metrics_collector.get_metrics_data() assert isinstance(metrics_data, dict) # Проверяем, что можем проверить алерты alerts, recoveries = metrics_collector.check_alerts(system_info) assert isinstance(alerts, list) assert isinstance(recoveries, list) class TestMetricsCollectorEdgeCases: """Тесты граничных случаев для MetricsCollector""" def test_empty_system_info(self): """Тест работы с пустой системной информацией""" with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")): collector = MetricsCollector() system_info = collector.get_system_info() assert system_info == {} def test_missing_disk_info(self): """Тест работы при отсутствии информации о диске""" collector = MetricsCollector() with patch.object(collector, '_get_disk_usage', return_value=None), \ patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")): system_info = collector.get_system_info() assert system_info == {} def test_disk_io_calculation_without_previous_data(self): """Тест расчета I/O диска без предыдущих данных""" collector = MetricsCollector() # Сбрасываем предыдущие данные collector.last_disk_io = None collector.last_disk_io_time = None current_disk_io = Mock() current_disk_io.read_bytes = 1024 current_disk_io.write_bytes = 512 read_speed, write_speed = collector._calculate_disk_speed(current_disk_io) assert read_speed == "0 B/s" assert write_speed == "0 B/s" def test_uptime_calculation_edge_cases(self): """Тест расчета uptime для граничных случаев""" collector = MetricsCollector() # Тест для очень малого времени assert collector._format_uptime(0) == "0м" assert collector._format_uptime(30) == "0м" # Тест для очень большого времени large_uptime = 365 * 24 * 3600 # 1 год uptime_str = collector._format_uptime(large_uptime) assert "д" in uptime_str if __name__ == "__main__": pytest.main([__file__, "-v"])