Files
prod/tests/infra/test_metrics_collector.py
Andrey 40968dd075 WIP: Development changes moved from master
- Modified Grafana dashboards
- Updated message sender and metrics collector
- Added new rate limiting dashboard
- Removed count_tests.py
2025-09-05 01:29:28 +03:00

465 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Тесты для MetricsCollector
"""
import pytest
import sys
import os
import time
import platform
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from infra.monitoring.metrics_collector import MetricsCollector
class TestMetricsCollector:
"""Тесты для класса MetricsCollector"""
@pytest.fixture
def metrics_collector(self):
"""Создает экземпляр MetricsCollector для тестов"""
return MetricsCollector()
@pytest.fixture
def mock_psutil(self):
"""Мок для psutil"""
mock_psutil = Mock()
# Мокаем CPU
mock_psutil.cpu_percent.return_value = 25.5
mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0)
mock_psutil.cpu_count.return_value = 8
# Мокаем память
mock_memory = Mock()
mock_memory.used = 8 * (1024**3) # 8 GB
mock_memory.total = 16 * (1024**3) # 16 GB
mock_psutil.virtual_memory.return_value = mock_memory
mock_swap = Mock()
mock_swap.used = 1 * (1024**3) # 1 GB
mock_swap.total = 2 * (1024**3) # 2 GB
mock_swap.percent = 50.0
mock_psutil.swap_memory.return_value = mock_swap
# Мокаем диск
mock_disk = Mock()
mock_disk.used = 100 * (1024**3) # 100 GB
mock_disk.total = 500 * (1024**3) # 500 GB
mock_disk.free = 400 * (1024**3) # 400 GB
mock_psutil.disk_usage.return_value = mock_disk
# Мокаем disk I/O
mock_disk_io = Mock()
mock_disk_io.read_count = 1000
mock_disk_io.write_count = 500
mock_disk_io.read_bytes = 1024 * (1024**2) # 1 GB
mock_disk_io.write_bytes = 512 * (1024**2) # 512 MB
mock_psutil.disk_io_counters.return_value = mock_disk_io
# Мокаем boot time
mock_psutil.boot_time.return_value = time.time() - 86400 # 1 день назад
return mock_psutil
def test_init(self, metrics_collector):
"""Тест инициализации MetricsCollector"""
assert metrics_collector.threshold == 80.0
assert metrics_collector.recovery_threshold == 75.0
assert isinstance(metrics_collector.alert_states, dict)
assert 'cpu' in metrics_collector.alert_states
assert 'ram' in metrics_collector.alert_states
assert 'disk' in metrics_collector.alert_states
assert metrics_collector.monitor_start_time > 0
def test_detect_os_macos(self):
"""Тест определения macOS"""
with patch('platform.system', return_value='Darwin'):
collector = MetricsCollector()
assert collector.os_type == "macos"
def test_detect_os_linux(self):
"""Тест определения Linux"""
with patch('platform.system', return_value='Linux'):
collector = MetricsCollector()
assert collector.os_type == "ubuntu"
def test_detect_os_unknown(self):
"""Тест определения неизвестной ОС"""
with patch('platform.system', return_value='Windows'):
collector = MetricsCollector()
assert collector.os_type == "unknown"
def test_get_disk_path(self, metrics_collector):
"""Тест получения пути к диску"""
# Для всех ОС должен возвращаться "/"
assert metrics_collector._get_disk_path() == "/"
@patch('subprocess.run')
def test_get_macos_disk_usage_success(self, mock_subprocess, metrics_collector):
"""Тест получения информации о диске macOS через diskutil"""
# Настраиваем мок для macOS
metrics_collector.os_type = "macos"
# Мокаем успешный вывод diskutil
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = """
Container Total Space: 500.0 GB
Container Free Space: 400.0 GB
"""
mock_subprocess.return_value = mock_result
disk_info = metrics_collector._get_macos_disk_usage()
assert disk_info is not None
assert disk_info.total == 500.0 * (1024**3) # В байтах
assert disk_info.free == 400.0 * (1024**3)
assert disk_info.used == 100.0 * (1024**3)
@patch('subprocess.run')
def test_get_macos_disk_usage_fallback(self, mock_subprocess, metrics_collector):
"""Тест fallback к psutil при ошибке diskutil"""
metrics_collector.os_type = "macos"
# Мокаем неуспешный вывод diskutil
mock_result = Mock()
mock_result.returncode = 1
mock_subprocess.return_value = mock_result
with patch('metrics_collector.psutil.disk_usage') as mock_psutil_disk:
mock_disk = Mock()
mock_disk.used = 100 * (1024**3)
mock_disk.total = 500 * (1024**3)
mock_disk.free = 400 * (1024**3)
mock_psutil_disk.return_value = mock_disk
disk_info = metrics_collector._get_macos_disk_usage()
assert disk_info == mock_disk
def test_get_system_uptime(self, metrics_collector):
"""Тест получения uptime системы"""
with patch('metrics_collector.psutil.boot_time') as mock_boot_time:
mock_boot_time.return_value = time.time() - 3600 # 1 час назад
uptime = metrics_collector._get_system_uptime()
assert uptime > 0
assert uptime <= 3600.1 # Не больше часа (с небольшим допуском)
def test_get_monitor_uptime(self, metrics_collector):
"""Тест получения uptime мониторинга"""
# Ждем немного, чтобы uptime изменился
time.sleep(0.1)
uptime = metrics_collector.get_monitor_uptime()
assert isinstance(uptime, str)
assert 'м' in uptime or 'ч' in uptime or 'д' in uptime
def test_get_system_info_success(self, metrics_collector):
"""Тест получения системной информации"""
# Мокаем все необходимые функции psutil
with patch('metrics_collector.psutil.cpu_percent', return_value=25.5) as mock_cpu, \
patch('metrics_collector.psutil.getloadavg', return_value=(1.2, 1.1, 1.0)) as mock_load, \
patch('metrics_collector.psutil.cpu_count', return_value=8) as mock_cpu_count, \
patch('metrics_collector.psutil.cpu_times_percent') as mock_cpu_times, \
patch('metrics_collector.psutil.virtual_memory') as mock_virtual_memory, \
patch('metrics_collector.psutil.swap_memory') as mock_swap_memory, \
patch('metrics_collector.psutil.disk_usage') as mock_disk_usage, \
patch('metrics_collector.psutil.disk_io_counters') as mock_disk_io, \
patch('metrics_collector.psutil.boot_time', return_value=time.time() - 86400) as mock_boot_time, \
patch('os.uname') as mock_uname:
# Настраиваем моки для CPU
mock_cpu_times_obj = Mock()
mock_cpu_times_obj.iowait = 2.5
mock_cpu_times.return_value = mock_cpu_times_obj
# Настраиваем моки для памяти
mock_memory = Mock()
mock_memory.used = 8 * (1024**3)
mock_memory.total = 16 * (1024**3)
mock_virtual_memory.return_value = mock_memory
# Настраиваем моки для swap
mock_swap = Mock()
mock_swap.used = 1 * (1024**3)
mock_swap.total = 2 * (1024**3)
mock_swap.percent = 50.0
mock_swap_memory.return_value = mock_swap
# Настраиваем моки для диска
mock_disk = Mock()
mock_disk.used = 100 * (1024**3)
mock_disk.total = 500 * (1024**3)
mock_disk.free = 400 * (1024**3)
mock_disk_usage.return_value = mock_disk
# Настраиваем моки для disk I/O
mock_disk_io_obj = Mock()
mock_disk_io_obj.read_count = 1000
mock_disk_io_obj.write_count = 500
mock_disk_io_obj.read_bytes = 1024 * (1024**2)
mock_disk_io_obj.write_bytes = 512 * (1024**2)
mock_disk_io.return_value = mock_disk_io_obj
# Настраиваем мок для hostname
mock_uname.return_value.nodename = "test-host"
# Мокаем _get_disk_usage чтобы возвращал наш мок
with patch.object(metrics_collector, '_get_disk_usage', return_value=mock_disk):
system_info = metrics_collector.get_system_info()
assert isinstance(system_info, dict)
assert 'cpu_percent' in system_info
assert 'ram_percent' in system_info
assert 'disk_percent' in system_info
assert 'io_wait_percent' in system_info
assert 'server_hostname' in system_info
# Проверяем расчеты
assert system_info['cpu_percent'] == 25.5
assert system_info['ram_percent'] == 50.0 # 8/16 * 100
assert system_info['disk_percent'] == 20.0 # 100/500 * 100
assert system_info['io_wait_percent'] == 2.5
assert system_info['server_hostname'] == "test-host"
def test_get_system_info_error(self, metrics_collector):
"""Тест получения системной информации при ошибке"""
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
system_info = metrics_collector.get_system_info()
assert system_info == {}
def test_format_bytes(self, metrics_collector):
"""Тест форматирования байтов"""
assert metrics_collector._format_bytes(0) == "0 B"
assert metrics_collector._format_bytes(1024) == "1.0 KB"
assert metrics_collector._format_bytes(1024**2) == "1.0 MB"
assert metrics_collector._format_bytes(1024**3) == "1.0 GB"
assert metrics_collector._format_bytes(1024**4) == "1.0 TB"
def test_format_uptime(self, metrics_collector):
"""Тест форматирования uptime"""
assert metrics_collector._format_uptime(60) == ""
assert metrics_collector._format_uptime(3600) == "1ч 0м"
assert metrics_collector._format_uptime(86400) == "1д 0ч 0м"
assert metrics_collector._format_uptime(90000) == "1д 1ч 0м"
def test_check_process_status_pid_file(self, metrics_collector, tmp_path):
"""Тест проверки статуса процесса по PID файлу"""
# Создаем временный PID файл
pid_file = tmp_path / "test_bot.pid"
pid_file.write_text("12345")
# Временно заменяем путь к PID файлу
original_pid_files = metrics_collector.pid_files.copy()
metrics_collector.pid_files['test_bot'] = str(pid_file)
with patch('infra.monitoring.metrics_collector.psutil.pid_exists', return_value=True), \
patch('infra.monitoring.metrics_collector.psutil.Process') as mock_process:
mock_proc = Mock()
mock_proc.create_time.return_value = time.time() - 3600
mock_process.return_value = mock_proc
status, uptime = metrics_collector.check_process_status('test_bot')
assert status == ""
assert "Uptime" in uptime
# Восстанавливаем оригинальные PID файлы
metrics_collector.pid_files = original_pid_files
def test_check_process_status_not_running(self, metrics_collector):
"""Тест проверки статуса неработающего процесса"""
with patch('metrics_collector.psutil.process_iter', return_value=[]):
status, message = metrics_collector.check_process_status('nonexistent_bot')
assert status == ""
assert message == "Выключен"
def test_calculate_disk_speed(self, metrics_collector):
"""Тест расчета скорости диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Создаем текущую статистику диска
current_disk_io = Mock()
current_disk_io.read_bytes = 2048 * (1024**2) # 2 GB
current_disk_io.write_bytes = 1024 * (1024**2) # 1 GB
# Ждем немного для расчета скорости
time.sleep(0.1)
read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io)
assert isinstance(read_speed, str)
assert isinstance(write_speed, str)
assert "/s" in read_speed
assert "/s" in write_speed
def test_calculate_disk_io_percent(self, metrics_collector):
"""Тест расчета процента загрузки диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Создаем текущую статистику диска
current_disk_io = Mock()
current_disk_io.read_count = 2000
current_disk_io.write_count = 1000
current_disk_io.read_bytes = 2048 * (1024**2)
current_disk_io.write_bytes = 1024 * (1024**2)
# Ждем немного для расчета
time.sleep(0.1)
io_percent = metrics_collector._calculate_disk_io_percent()
assert isinstance(io_percent, int)
assert 0 <= io_percent <= 100
def test_get_metrics_data(self, metrics_collector):
"""Тест получения данных для метрик Prometheus"""
with patch.object(metrics_collector, 'get_system_info') as mock_get_system_info:
mock_get_system_info.return_value = {
'cpu_percent': 25.5,
'ram_percent': 60.2,
'disk_percent': 45.8,
'load_avg_1m': 1.2,
'load_avg_5m': 1.1,
'load_avg_15m': 1.0,
'swap_percent': 10.5
}
with patch.object(metrics_collector, '_get_system_uptime', return_value=86400.0):
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
assert 'cpu_usage_percent' in metrics_data
assert 'ram_usage_percent' in metrics_data
assert 'disk_usage_percent' in metrics_data
assert 'load_average_1m' in metrics_data
assert 'system_uptime_seconds' in metrics_data
assert 'monitor_uptime_seconds' in metrics_data
def test_check_alerts(self, metrics_collector):
"""Тест проверки алертов"""
# Сбрасываем состояния алертов для чистого теста
metrics_collector.alert_states = {'cpu': False, 'ram': False, 'disk': False}
metrics_collector.alert_start_times = {'cpu': None, 'ram': None, 'disk': None}
# Устанавливаем минимальные задержки для тестов
metrics_collector.alert_delays = {'cpu': 0, 'ram': 0, 'disk': 0}
# Тестируем превышение порога CPU
system_info = {
'cpu_percent': 85.0, # Выше порога 80.0
'ram_percent': 60.0, # Ниже порога
'disk_percent': 70.0, # Ниже порога
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 300.0
}
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert len(alerts) == 1
assert alerts[0][0] == 'cpu' # Тип алерта
assert alerts[0][1] == 85.0 # Значение
assert len(recoveries) == 0
# Проверяем, что состояние алерта изменилось
assert metrics_collector.alert_states['cpu'] is True
# Тестируем восстановление
system_info['cpu_percent'] = 70.0 # Ниже recovery threshold 75.0
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert len(alerts) == 0
assert len(recoveries) == 1
assert recoveries[0][0] == 'cpu'
assert metrics_collector.alert_states['cpu'] is False
def test_environment_variables(self):
"""Тест работы с переменными окружения"""
with patch.dict(os.environ, {'THRESHOLD': '90.0', 'RECOVERY_THRESHOLD': '85.0'}):
collector = MetricsCollector()
assert collector.threshold == 90.0
assert collector.recovery_threshold == 85.0
def test_metrics_collector_integration(self, metrics_collector):
"""Интеграционный тест MetricsCollector"""
# Проверяем, что можем получить системную информацию
system_info = metrics_collector.get_system_info()
# Даже если некоторые метрики недоступны, должны получить словарь
assert isinstance(system_info, dict)
# Проверяем, что можем получить метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# Проверяем, что можем проверить алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
class TestMetricsCollectorEdgeCases:
"""Тесты граничных случаев для MetricsCollector"""
def test_empty_system_info(self):
"""Тест работы с пустой системной информацией"""
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
collector = MetricsCollector()
system_info = collector.get_system_info()
assert system_info == {}
def test_missing_disk_info(self):
"""Тест работы при отсутствии информации о диске"""
collector = MetricsCollector()
with patch.object(collector, '_get_disk_usage', return_value=None), \
patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
system_info = collector.get_system_info()
assert system_info == {}
def test_disk_io_calculation_without_previous_data(self):
"""Тест расчета I/O диска без предыдущих данных"""
collector = MetricsCollector()
# Сбрасываем предыдущие данные
collector.last_disk_io = None
collector.last_disk_io_time = None
current_disk_io = Mock()
current_disk_io.read_bytes = 1024
current_disk_io.write_bytes = 512
read_speed, write_speed = collector._calculate_disk_speed(current_disk_io)
assert read_speed == "0 B/s"
assert write_speed == "0 B/s"
def test_uptime_calculation_edge_cases(self):
"""Тест расчета uptime для граничных случаев"""
collector = MetricsCollector()
# Тест для очень малого времени
assert collector._format_uptime(0) == ""
assert collector._format_uptime(30) == ""
# Тест для очень большого времени
large_uptime = 365 * 24 * 3600 # 1 год
uptime_str = collector._format_uptime(large_uptime)
assert "д" in uptime_str
if __name__ == "__main__":
pytest.main([__file__, "-v"])