Update docker-compose and README for Telegram bot integration; add environment file reference and clarify port usage in documentation.

This commit is contained in:
2025-08-31 23:32:56 +03:00
parent 7378179d98
commit 6733043a61
17 changed files with 2499 additions and 12 deletions

View File

@@ -0,0 +1,441 @@
#!/usr/bin/env python3
"""
Тесты для MetricsCollector
"""
import pytest
import sys
import os
import time
import platform
from unittest.mock import Mock, patch, MagicMock
from datetime import datetime
# Добавляем путь к модулям мониторинга
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../infra/monitoring'))
from metrics_collector import MetricsCollector
class TestMetricsCollector:
"""Тесты для класса MetricsCollector"""
@pytest.fixture
def metrics_collector(self):
"""Создает экземпляр MetricsCollector для тестов"""
return MetricsCollector()
@pytest.fixture
def mock_psutil(self):
"""Мок для psutil"""
mock_psutil = Mock()
# Мокаем CPU
mock_psutil.cpu_percent.return_value = 25.5
mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0)
mock_psutil.cpu_count.return_value = 8
# Мокаем память
mock_memory = Mock()
mock_memory.used = 8 * (1024**3) # 8 GB
mock_memory.total = 16 * (1024**3) # 16 GB
mock_psutil.virtual_memory.return_value = mock_memory
mock_swap = Mock()
mock_swap.used = 1 * (1024**3) # 1 GB
mock_swap.total = 2 * (1024**3) # 2 GB
mock_swap.percent = 50.0
mock_psutil.swap_memory.return_value = mock_swap
# Мокаем диск
mock_disk = Mock()
mock_disk.used = 100 * (1024**3) # 100 GB
mock_disk.total = 500 * (1024**3) # 500 GB
mock_disk.free = 400 * (1024**3) # 400 GB
mock_psutil.disk_usage.return_value = mock_disk
# Мокаем disk I/O
mock_disk_io = Mock()
mock_disk_io.read_count = 1000
mock_disk_io.write_count = 500
mock_disk_io.read_bytes = 1024 * (1024**2) # 1 GB
mock_disk_io.write_bytes = 512 * (1024**2) # 512 MB
mock_psutil.disk_io_counters.return_value = mock_disk_io
# Мокаем boot time
mock_psutil.boot_time.return_value = time.time() - 86400 # 1 день назад
return mock_psutil
def test_init(self, metrics_collector):
"""Тест инициализации MetricsCollector"""
assert metrics_collector.threshold == 80.0
assert metrics_collector.recovery_threshold == 75.0
assert isinstance(metrics_collector.alert_states, dict)
assert 'cpu' in metrics_collector.alert_states
assert 'ram' in metrics_collector.alert_states
assert 'disk' in metrics_collector.alert_states
assert metrics_collector.monitor_start_time > 0
def test_detect_os_macos(self):
"""Тест определения macOS"""
with patch('platform.system', return_value='Darwin'):
collector = MetricsCollector()
assert collector.os_type == "macos"
def test_detect_os_linux(self):
"""Тест определения Linux"""
with patch('platform.system', return_value='Linux'):
collector = MetricsCollector()
assert collector.os_type == "ubuntu"
def test_detect_os_unknown(self):
"""Тест определения неизвестной ОС"""
with patch('platform.system', return_value='Windows'):
collector = MetricsCollector()
assert collector.os_type == "unknown"
def test_get_disk_path(self, metrics_collector):
"""Тест получения пути к диску"""
# Для всех ОС должен возвращаться "/"
assert metrics_collector._get_disk_path() == "/"
@patch('subprocess.run')
def test_get_macos_disk_usage_success(self, mock_subprocess, metrics_collector):
"""Тест получения информации о диске macOS через diskutil"""
# Настраиваем мок для macOS
metrics_collector.os_type = "macos"
# Мокаем успешный вывод diskutil
mock_result = Mock()
mock_result.returncode = 0
mock_result.stdout = """
Container Total Space: 500.0 GB
Container Free Space: 400.0 GB
"""
mock_subprocess.return_value = mock_result
disk_info = metrics_collector._get_macos_disk_usage()
assert disk_info is not None
assert disk_info.total == 500.0 * (1024**3) # В байтах
assert disk_info.free == 400.0 * (1024**3)
assert disk_info.used == 100.0 * (1024**3)
@patch('subprocess.run')
def test_get_macos_disk_usage_fallback(self, mock_subprocess, metrics_collector):
"""Тест fallback к psutil при ошибке diskutil"""
metrics_collector.os_type = "macos"
# Мокаем неуспешный вывод diskutil
mock_result = Mock()
mock_result.returncode = 1
mock_subprocess.return_value = mock_result
with patch('metrics_collector.psutil.disk_usage') as mock_psutil_disk:
mock_disk = Mock()
mock_disk.used = 100 * (1024**3)
mock_disk.total = 500 * (1024**3)
mock_disk.free = 400 * (1024**3)
mock_psutil_disk.return_value = mock_disk
disk_info = metrics_collector._get_macos_disk_usage()
assert disk_info == mock_disk
def test_get_system_uptime(self, metrics_collector):
"""Тест получения uptime системы"""
with patch('metrics_collector.psutil.boot_time') as mock_boot_time:
mock_boot_time.return_value = time.time() - 3600 # 1 час назад
uptime = metrics_collector._get_system_uptime()
assert uptime > 0
assert uptime <= 3600.1 # Не больше часа (с небольшим допуском)
def test_get_monitor_uptime(self, metrics_collector):
"""Тест получения uptime мониторинга"""
# Ждем немного, чтобы uptime изменился
time.sleep(0.1)
uptime = metrics_collector.get_monitor_uptime()
assert isinstance(uptime, str)
assert 'м' in uptime or 'ч' in uptime or 'д' in uptime
@patch('metrics_collector.psutil')
def test_get_system_info_success(self, mock_psutil, metrics_collector):
"""Тест получения системной информации"""
# Настраиваем моки
mock_psutil.cpu_percent.return_value = 25.5
mock_psutil.getloadavg.return_value = (1.2, 1.1, 1.0)
mock_psutil.cpu_count.return_value = 8
mock_memory = Mock()
mock_memory.used = 8 * (1024**3)
mock_memory.total = 16 * (1024**3)
mock_psutil.virtual_memory.return_value = mock_memory
mock_swap = Mock()
mock_swap.used = 1 * (1024**3)
mock_swap.total = 2 * (1024**3)
mock_swap.percent = 50.0
mock_psutil.swap_memory.return_value = mock_swap
mock_disk = Mock()
mock_disk.used = 100 * (1024**3)
mock_disk.total = 500 * (1024**3)
mock_disk.free = 400 * (1024**3)
mock_psutil.disk_usage.return_value = mock_disk
# Мокаем _get_disk_usage чтобы возвращал наш мок
with patch.object(metrics_collector, '_get_disk_usage', return_value=mock_disk):
mock_disk_io = Mock()
mock_disk_io.read_count = 1000
mock_disk_io.write_count = 500
mock_disk_io.read_bytes = 1024 * (1024**2)
mock_disk_io.write_bytes = 512 * (1024**2)
mock_psutil.disk_io_counters.return_value = mock_disk_io
mock_psutil.boot_time.return_value = time.time() - 86400
with patch('os.uname') as mock_uname:
mock_uname.return_value.nodename = "test-host"
system_info = metrics_collector.get_system_info()
assert isinstance(system_info, dict)
assert 'cpu_percent' in system_info
assert 'ram_percent' in system_info
assert 'disk_percent' in system_info
assert 'server_hostname' in system_info
# Проверяем расчеты
assert system_info['cpu_percent'] == 25.5
assert system_info['ram_percent'] == 50.0 # 8/16 * 100
assert system_info['disk_percent'] == 20.0 # 100/500 * 100
assert system_info['server_hostname'] == "test-host"
def test_get_system_info_error(self, metrics_collector):
"""Тест получения системной информации при ошибке"""
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
system_info = metrics_collector.get_system_info()
assert system_info == {}
def test_format_bytes(self, metrics_collector):
"""Тест форматирования байтов"""
assert metrics_collector._format_bytes(0) == "0 B"
assert metrics_collector._format_bytes(1024) == "1.0 KB"
assert metrics_collector._format_bytes(1024**2) == "1.0 MB"
assert metrics_collector._format_bytes(1024**3) == "1.0 GB"
assert metrics_collector._format_bytes(1024**4) == "1.0 TB"
def test_format_uptime(self, metrics_collector):
"""Тест форматирования uptime"""
assert metrics_collector._format_uptime(60) == ""
assert metrics_collector._format_uptime(3600) == "1ч 0м"
assert metrics_collector._format_uptime(86400) == "1д 0ч 0м"
assert metrics_collector._format_uptime(90000) == "1д 1ч 0м"
def test_check_process_status_pid_file(self, metrics_collector, tmp_path):
"""Тест проверки статуса процесса по PID файлу"""
# Создаем временный PID файл
pid_file = tmp_path / "helper_bot.pid"
pid_file.write_text("12345")
# Временно заменяем путь к PID файлу
original_pid_files = metrics_collector.pid_files.copy()
metrics_collector.pid_files['helper_bot'] = str(pid_file)
with patch('metrics_collector.psutil.pid_exists', return_value=True), \
patch('metrics_collector.psutil.Process') as mock_process:
mock_proc = Mock()
mock_proc.create_time.return_value = time.time() - 3600
mock_process.return_value = mock_proc
status, uptime = metrics_collector.check_process_status('helper_bot')
assert status == ""
assert "Uptime" in uptime
# Восстанавливаем оригинальные PID файлы
metrics_collector.pid_files = original_pid_files
def test_check_process_status_not_running(self, metrics_collector):
"""Тест проверки статуса неработающего процесса"""
with patch('metrics_collector.psutil.process_iter', return_value=[]):
status, message = metrics_collector.check_process_status('nonexistent_bot')
assert status == ""
assert message == "Выключен"
def test_calculate_disk_speed(self, metrics_collector):
"""Тест расчета скорости диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Создаем текущую статистику диска
current_disk_io = Mock()
current_disk_io.read_bytes = 2048 * (1024**2) # 2 GB
current_disk_io.write_bytes = 1024 * (1024**2) # 1 GB
# Ждем немного для расчета скорости
time.sleep(0.1)
read_speed, write_speed = metrics_collector._calculate_disk_speed(current_disk_io)
assert isinstance(read_speed, str)
assert isinstance(write_speed, str)
assert "/s" in read_speed
assert "/s" in write_speed
def test_calculate_disk_io_percent(self, metrics_collector):
"""Тест расчета процента загрузки диска"""
# Инициализируем базовые значения
metrics_collector._initialize_disk_io()
# Создаем текущую статистику диска
current_disk_io = Mock()
current_disk_io.read_count = 2000
current_disk_io.write_count = 1000
current_disk_io.read_bytes = 2048 * (1024**2)
current_disk_io.write_bytes = 1024 * (1024**2)
# Ждем немного для расчета
time.sleep(0.1)
io_percent = metrics_collector._calculate_disk_io_percent()
assert isinstance(io_percent, int)
assert 0 <= io_percent <= 100
def test_get_metrics_data(self, metrics_collector):
"""Тест получения данных для метрик Prometheus"""
with patch.object(metrics_collector, 'get_system_info') as mock_get_system_info:
mock_get_system_info.return_value = {
'cpu_percent': 25.5,
'ram_percent': 60.2,
'disk_percent': 45.8,
'load_avg_1m': 1.2,
'load_avg_5m': 1.1,
'load_avg_15m': 1.0,
'swap_percent': 10.5
}
with patch.object(metrics_collector, '_get_system_uptime', return_value=86400.0):
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
assert 'cpu_usage_percent' in metrics_data
assert 'ram_usage_percent' in metrics_data
assert 'disk_usage_percent' in metrics_data
assert 'load_average_1m' in metrics_data
assert 'system_uptime_seconds' in metrics_data
assert 'monitor_uptime_seconds' in metrics_data
def test_check_alerts(self, metrics_collector):
"""Тест проверки алертов"""
# Тестируем превышение порога CPU
system_info = {
'cpu_percent': 85.0, # Выше порога 80.0
'ram_percent': 60.0, # Ниже порога
'disk_percent': 70.0, # Ниже порога
'load_avg_1m': 2.5,
'ram_used': 8.0,
'ram_total': 16.0,
'disk_free': 300.0
}
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert len(alerts) == 1
assert alerts[0][0] == 'cpu' # Тип алерта
assert alerts[0][1] == 85.0 # Значение
assert len(recoveries) == 0
# Проверяем, что состояние алерта изменилось
assert metrics_collector.alert_states['cpu'] is True
# Тестируем восстановление
system_info['cpu_percent'] = 70.0 # Ниже recovery threshold 75.0
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert len(alerts) == 0
assert len(recoveries) == 1
assert recoveries[0][0] == 'cpu'
assert metrics_collector.alert_states['cpu'] is False
def test_environment_variables(self):
"""Тест работы с переменными окружения"""
with patch.dict(os.environ, {'THRESHOLD': '90.0', 'RECOVERY_THRESHOLD': '85.0'}):
collector = MetricsCollector()
assert collector.threshold == 90.0
assert collector.recovery_threshold == 85.0
def test_metrics_collector_integration(self, metrics_collector):
"""Интеграционный тест MetricsCollector"""
# Проверяем, что можем получить системную информацию
system_info = metrics_collector.get_system_info()
# Даже если некоторые метрики недоступны, должны получить словарь
assert isinstance(system_info, dict)
# Проверяем, что можем получить метрики для Prometheus
metrics_data = metrics_collector.get_metrics_data()
assert isinstance(metrics_data, dict)
# Проверяем, что можем проверить алерты
alerts, recoveries = metrics_collector.check_alerts(system_info)
assert isinstance(alerts, list)
assert isinstance(recoveries, list)
class TestMetricsCollectorEdgeCases:
"""Тесты граничных случаев для MetricsCollector"""
def test_empty_system_info(self):
"""Тест работы с пустой системной информацией"""
with patch('metrics_collector.psutil.cpu_percent', side_effect=Exception("Test error")):
collector = MetricsCollector()
system_info = collector.get_system_info()
assert system_info == {}
def test_missing_disk_info(self):
"""Тест работы при отсутствии информации о диске"""
collector = MetricsCollector()
with patch.object(collector, '_get_disk_usage', return_value=None):
system_info = collector.get_system_info()
assert system_info == {}
def test_disk_io_calculation_without_previous_data(self):
"""Тест расчета I/O диска без предыдущих данных"""
collector = MetricsCollector()
# Сбрасываем предыдущие данные
collector.last_disk_io = None
collector.last_disk_io_time = None
current_disk_io = Mock()
current_disk_io.read_bytes = 1024
current_disk_io.write_bytes = 512
read_speed, write_speed = collector._calculate_disk_speed(current_disk_io)
assert read_speed == "0 B/s"
assert write_speed == "0 B/s"
def test_uptime_calculation_edge_cases(self):
"""Тест расчета uptime для граничных случаев"""
collector = MetricsCollector()
# Тест для очень малого времени
assert collector._format_uptime(0) == ""
assert collector._format_uptime(30) == ""
# Тест для очень большого времени
large_uptime = 365 * 24 * 3600 # 1 год
uptime_str = collector._format_uptime(large_uptime)
assert "д" in uptime_str
if __name__ == "__main__":
pytest.main([__file__, "-v"])