Files
prod/infra/monitoring/metrics_collector.py

496 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import psutil
import time
import platform
from datetime import datetime
from typing import Dict, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class MetricsCollector:
def __init__(self):
# Определяем ОС
self.os_type = self._detect_os()
logger.info(f"Обнаружена ОС: {self.os_type}")
# Пороговые значения для алертов
self.threshold = float(os.getenv('THRESHOLD', '80.0'))
self.recovery_threshold = float(os.getenv('RECOVERY_THRESHOLD', '75.0'))
# Состояние алертов для предотвращения спама
self.alert_states = {
'cpu': False,
'ram': False,
'disk': False
}
# PID файлы для отслеживания процессов
self.pid_files = {
#'voice_bot': 'voice_bot.pid',
'helper_bot': 'helper_bot.pid'
}
# Для расчета скорости диска
self.last_disk_io = None
self.last_disk_io_time = None
# Для расчета процента загрузки диска (отдельные переменные)
self.last_disk_io_for_percent = None
self.last_disk_io_time_for_percent = None
# Инициализируем базовые значения для скорости диска при первом вызове
self._initialize_disk_io()
# Время запуска мониторинга для расчета uptime
self.monitor_start_time = time.time()
def _detect_os(self) -> str:
"""Определение типа операционной системы"""
system = platform.system().lower()
if system == "darwin":
return "macos"
elif system == "linux":
return "ubuntu"
else:
return "unknown"
def _initialize_disk_io(self):
"""Инициализация базовых значений для расчета скорости диска"""
try:
disk_io = self._get_disk_io_counters()
if disk_io:
self.last_disk_io = disk_io
self.last_disk_io_time = time.time()
logger.debug("Инициализированы базовые значения для расчета скорости диска")
except Exception as e:
logger.error(f"Ошибка при инициализации диска I/O: {e}")
def _get_disk_path(self) -> str:
"""Получение пути к диску в зависимости от ОС"""
if self.os_type == "macos":
return "/"
elif self.os_type == "ubuntu":
return "/"
else:
return "/"
def _get_disk_usage(self) -> Optional[object]:
"""Получение информации о диске с учетом ОС"""
try:
if self.os_type == "macos":
# На macOS используем diskutil для получения реального использования диска
return self._get_macos_disk_usage()
else:
disk_path = self._get_disk_path()
return psutil.disk_usage(disk_path)
except Exception as e:
logger.error(f"Ошибка при получении информации о диске: {e}")
return None
def _get_macos_disk_usage(self) -> Optional[object]:
"""Получение информации о диске на macOS через diskutil"""
try:
import subprocess
import re
# Получаем информацию о диске через diskutil
result = subprocess.run(['diskutil', 'info', '/'], capture_output=True, text=True)
if result.returncode != 0:
# Fallback к psutil
return psutil.disk_usage('/')
output = result.stdout
# Извлекаем размеры из вывода diskutil
total_match = re.search(r'Container Total Space:\s+(\d+\.\d+)\s+GB', output)
free_match = re.search(r'Container Free Space:\s+(\d+\.\d+)\s+GB', output)
if total_match and free_match:
total_gb = float(total_match.group(1))
free_gb = float(free_match.group(1))
used_gb = total_gb - free_gb
# Создаем объект, похожий на результат psutil.disk_usage
class DiskUsage:
def __init__(self, total, used, free):
self.total = total * (1024**3) # Конвертируем в байты
self.used = used * (1024**3)
self.free = free * (1024**3)
return DiskUsage(total_gb, used_gb, free_gb)
else:
# Fallback к psutil
return psutil.disk_usage('/')
except Exception as e:
logger.error(f"Ошибка при получении информации о диске macOS: {e}")
# Fallback к psutil
return psutil.disk_usage('/')
def _get_disk_io_counters(self):
"""Получение статистики диска с учетом ОС"""
try:
if self.os_type == "macos":
# На macOS может быть несколько дисков, берем основной
return psutil.disk_io_counters(perdisk=False)
elif self.os_type == "ubuntu":
# На Ubuntu обычно один диск
return psutil.disk_io_counters(perdisk=False)
else:
return psutil.disk_io_counters()
except Exception as e:
logger.error(f"Ошибка при получении статистики диска: {e}")
return None
def _get_system_uptime(self) -> float:
"""Получение uptime системы с учетом ОС"""
try:
if self.os_type == "macos":
# На macOS используем boot_time
boot_time = psutil.boot_time()
return time.time() - boot_time
elif self.os_type == "ubuntu":
# На Ubuntu также используем boot_time
boot_time = psutil.boot_time()
return time.time() - boot_time
else:
boot_time = psutil.boot_time()
return time.time() - boot_time
except Exception as e:
logger.error(f"Ошибка при получении uptime системы: {e}")
return 0.0
def get_monitor_uptime(self) -> str:
"""Получение uptime мониторинга"""
uptime_seconds = time.time() - self.monitor_start_time
return self._format_uptime(uptime_seconds)
def get_system_info(self) -> Dict:
"""Получение информации о системе"""
try:
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
load_avg = psutil.getloadavg()
cpu_count = psutil.cpu_count()
# Память
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
# Используем единый расчет для всех ОС: used / total для получения процента занятой памяти
# Это обеспечивает консистентность между macOS и Ubuntu
ram_percent = (memory.used / memory.total) * 100
# Диск
disk = self._get_disk_usage()
disk_io = self._get_disk_io_counters()
if disk is None:
logger.error("Не удалось получить информацию о диске")
return {}
# Сначала рассчитываем процент загрузки диска (до обновления last_disk_io_time)
disk_io_percent = self._calculate_disk_io_percent()
# Затем рассчитываем скорость диска (это обновит last_disk_io_time)
disk_read_speed, disk_write_speed = self._calculate_disk_speed(disk_io)
# Диагностика диска для отладки
if disk_io:
logger.debug(f"Диск I/O статистика: read_count={disk_io.read_count}, write_count={disk_io.write_count}, "
f"read_bytes={disk_io.read_bytes}, write_bytes={disk_io.write_bytes}")
# Система
system_uptime = self._get_system_uptime()
# Получаем имя хоста в зависимости от ОС
if self.os_type == "macos":
hostname = os.uname().nodename
elif self.os_type == "ubuntu":
hostname = os.uname().nodename
else:
hostname = "unknown"
return {
'cpu_percent': cpu_percent,
'load_avg_1m': round(load_avg[0], 2),
'load_avg_5m': round(load_avg[1], 2),
'load_avg_15m': round(load_avg[2], 2),
'cpu_count': cpu_count,
'ram_used': round(memory.used / (1024**3), 2),
'ram_total': round(memory.total / (1024**3), 2),
'ram_percent': round(ram_percent, 1), # Исправленный процент занятой памяти
'swap_used': round(swap.used / (1024**3), 2),
'swap_total': round(swap.total / (1024**3), 2),
'swap_percent': swap.percent,
'disk_used': round(disk.used / (1024**3), 2),
'disk_total': round(disk.total / (1024**3), 2),
'disk_percent': round((disk.used / disk.total) * 100, 1),
'disk_free': round(disk.free / (1024**3), 2),
'disk_read_speed': disk_read_speed,
'disk_write_speed': disk_write_speed,
'disk_io_percent': disk_io_percent,
'system_uptime': self._format_uptime(system_uptime),
'monitor_uptime': self.get_monitor_uptime(),
'server_hostname': hostname,
'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
logger.error(f"Ошибка при получении информации о системе: {e}")
return {}
def _format_bytes(self, bytes_value: int) -> str:
"""Форматирование байтов в человекочитаемый вид"""
if bytes_value == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while bytes_value >= 1024 and i < len(size_names) - 1:
bytes_value /= 1024.0
i += 1
return f"{bytes_value:.1f} {size_names[i]}"
def _format_uptime(self, seconds: float) -> str:
"""Форматирование времени работы системы"""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
if days > 0:
return f"{days}д {hours}ч {minutes}м"
elif hours > 0:
return f"{hours}ч {minutes}м"
else:
return f"{minutes}м"
def check_process_status(self, process_name: str) -> Tuple[str, str]:
"""Проверка статуса процесса и возврат статуса с uptime"""
try:
# Сначала проверяем по PID файлу
pid_file = self.pid_files.get(process_name)
if pid_file and os.path.exists(pid_file):
try:
with open(pid_file, 'r') as f:
content = f.read().strip()
if content and content != '# Этот файл будет автоматически обновляться при запуске бота':
pid = int(content)
if psutil.pid_exists(pid):
# Получаем uptime процесса
try:
proc = psutil.Process(pid)
proc_uptime = time.time() - proc.create_time()
uptime_str = self._format_uptime(proc_uptime)
return "", f"Uptime {uptime_str}"
except:
return "", "Uptime неизвестно"
except (ValueError, FileNotFoundError):
pass
# Проверяем по имени процесса более точно
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
proc_name = proc.info['name'].lower()
cmdline = ' '.join(proc.info['cmdline']).lower() if proc.info['cmdline'] else ''
# Более точная проверка для каждого бота
if process_name == 'voice_bot':
# Проверяем voice_bot
if ('voice_bot' in proc_name or
'voice_bot' in cmdline or
'voice_bot_v2.py' in cmdline):
# Получаем uptime процесса
try:
proc_uptime = time.time() - proc.create_time()
uptime_str = self._format_uptime(proc_uptime)
return "", f"Uptime {uptime_str}"
except:
return "", "Uptime неизвестно"
elif process_name == 'helper_bot':
# Проверяем helper_bot
if ('helper_bot' in proc_name or
'helper_bot' in cmdline or
'run_helper.py' in cmdline or
'python' in proc_name and 'helper_bot' in cmdline):
# Получаем uptime процесса
try:
proc_uptime = time.time() - proc.create_time()
uptime_str = self._format_uptime(proc_uptime)
return "", f"Uptime {uptime_str}"
except:
return "", "Uptime неизвестно"
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return "", "Выключен"
except Exception as e:
logger.error(f"Ошибка при проверке процесса {process_name}: {e}")
return "", "Выключен"
def _calculate_disk_speed(self, current_disk_io) -> Tuple[str, str]:
"""Расчет скорости чтения/записи диска"""
current_time = time.time()
if self.last_disk_io is None or self.last_disk_io_time is None:
self.last_disk_io = current_disk_io
self.last_disk_io_time = current_time
return "0 B/s", "0 B/s"
time_diff = current_time - self.last_disk_io_time
if time_diff < 1: # Минимальный интервал 1 секунда
return "0 B/s", "0 B/s"
read_diff = current_disk_io.read_bytes - self.last_disk_io.read_bytes
write_diff = current_disk_io.write_bytes - self.last_disk_io.write_bytes
read_speed = read_diff / time_diff
write_speed = write_diff / time_diff
# Обновляем предыдущие значения
self.last_disk_io = current_disk_io
self.last_disk_io_time = current_time
return self._format_bytes(read_speed) + "/s", self._format_bytes(write_speed) + "/s"
def _calculate_disk_io_percent(self) -> int:
"""Расчет процента загрузки диска на основе реальной скорости I/O"""
try:
# Получаем текущую статистику диска
current_disk_io = self._get_disk_io_counters()
if current_disk_io is None:
return 0
current_time = time.time()
# Если это первое измерение, инициализируем
if self.last_disk_io_for_percent is None or self.last_disk_io_time_for_percent is None:
logger.debug("Первое измерение диска для процента, инициализируем базовые значения")
self.last_disk_io_for_percent = current_disk_io
self.last_disk_io_time_for_percent = current_time
return 0
# Рассчитываем время между измерениями
time_diff = current_time - self.last_disk_io_time_for_percent
if time_diff < 0.1: # Минимальный интервал 0.1 секунды для более точных измерений
logger.debug(f"Интервал между измерениями слишком мал: {time_diff:.3f}s, возвращаем 0%")
return 0
# Рассчитываем скорость операций в секунду
read_ops_diff = current_disk_io.read_count - self.last_disk_io_for_percent.read_count
write_ops_diff = current_disk_io.write_count - self.last_disk_io_for_percent.write_count
read_ops_per_sec = read_ops_diff / time_diff
write_ops_per_sec = write_ops_diff / time_diff
total_ops_per_sec = read_ops_per_sec + write_ops_per_sec
# Рассчитываем скорость передачи данных в байтах в секунду
read_bytes_diff = current_disk_io.read_bytes - self.last_disk_io_for_percent.read_bytes
write_bytes_diff = current_disk_io.write_bytes - self.last_disk_io_for_percent.write_bytes
read_bytes_per_sec = read_bytes_diff / time_diff
write_bytes_per_sec = write_bytes_diff / time_diff
total_bytes_per_sec = read_bytes_per_sec + write_bytes_per_sec
# Обновляем предыдущие значения для процента
self.last_disk_io_for_percent = current_disk_io
self.last_disk_io_time_for_percent = current_time
# Определяем максимальную производительность диска в зависимости от ОС
if self.os_type == "macos":
# macOS обычно имеет SSD с высокой производительностью
max_ops_per_sec = 50000 # Операций в секунду
max_bytes_per_sec = 3 * (1024**3) # 3 GB/s
elif self.os_type == "ubuntu":
# Ubuntu может быть на разных типах дисков
max_ops_per_sec = 30000 # Операций в секунду
max_bytes_per_sec = 2 * (1024**3) # 2 GB/s
else:
max_ops_per_sec = 40000
max_bytes_per_sec = 2.5 * (1024**3)
# Рассчитываем процент загрузки на основе операций и байтов
# Защита от деления на ноль
if max_ops_per_sec > 0:
ops_percent = min(100, (total_ops_per_sec / max_ops_per_sec) * 100)
else:
ops_percent = 0
if max_bytes_per_sec > 0:
bytes_percent = min(100, (total_bytes_per_sec / max_bytes_per_sec) * 100)
else:
bytes_percent = 0
# Взвешенный средний процент (операции важнее для большинства случаев)
final_percent = (ops_percent * 0.7) + (bytes_percent * 0.3)
# Логируем для отладки (только при высоких значениях)
if final_percent > 10:
logger.debug(f"Диск I/O: {total_ops_per_sec:.1f} ops/s, {total_bytes_per_sec/(1024**2):.1f} MB/s, "
f"Загрузка: {final_percent:.1f}% (ops: {ops_percent:.1f}%, bytes: {bytes_percent:.1f}%)")
# Округляем до целого числа
return round(final_percent)
except Exception as e:
logger.error(f"Ошибка при расчете процента загрузки диска: {e}")
return 0
def get_metrics_data(self) -> Dict:
"""Получение данных для метрик Prometheus"""
system_info = self.get_system_info()
if not system_info:
return {}
return {
'cpu_usage_percent': system_info.get('cpu_percent', 0),
'ram_usage_percent': system_info.get('ram_percent', 0),
'disk_usage_percent': system_info.get('disk_percent', 0),
'load_average_1m': system_info.get('load_avg_1m', 0),
'load_average_5m': system_info.get('load_avg_5m', 0),
'load_average_15m': system_info.get('load_avg_15m', 0),
'swap_usage_percent': system_info.get('swap_percent', 0),
'disk_io_percent': system_info.get('disk_io_percent', 0),
'system_uptime_seconds': self._get_system_uptime(),
'monitor_uptime_seconds': time.time() - self.monitor_start_time
}
def check_alerts(self, system_info: Dict) -> Tuple[bool, Optional[str]]:
"""Проверка необходимости отправки алертов"""
alerts = []
# Проверка CPU
if system_info['cpu_percent'] > self.threshold and not self.alert_states['cpu']:
self.alert_states['cpu'] = True
alerts.append(('cpu', system_info['cpu_percent'], f"Нагрузка за 1 мин: {system_info['load_avg_1m']}"))
# Проверка RAM
if system_info['ram_percent'] > self.threshold and not self.alert_states['ram']:
self.alert_states['ram'] = True
alerts.append(('ram', system_info['ram_percent'], f"Используется: {system_info['ram_used']} GB из {system_info['ram_total']} GB"))
# Проверка диска
if system_info['disk_percent'] > self.threshold and not self.alert_states['disk']:
self.alert_states['disk'] = True
alerts.append(('disk', system_info['disk_percent'], f"Свободно: {system_info['disk_free']} GB на /"))
# Проверка восстановления
recoveries = []
if system_info['cpu_percent'] < self.recovery_threshold and self.alert_states['cpu']:
self.alert_states['cpu'] = False
recoveries.append(('cpu', system_info['cpu_percent']))
if system_info['ram_percent'] < self.recovery_threshold and self.alert_states['ram']:
self.alert_states['ram'] = False
recoveries.append(('ram', system_info['ram_percent']))
if system_info['disk_percent'] < self.recovery_threshold and self.alert_states['disk']:
self.alert_states['disk'] = False
recoveries.append(('disk', system_info['disk_percent']))
return alerts, recoveries