import os import psutil import time import platform from datetime import datetime from typing import Dict, Optional, Tuple import logging from pid_manager import create_pid_manager logger = logging.getLogger(__name__) class MetricsCollector: def __init__(self): # Определяем ОС self.os_type = self._detect_os() logger.info(f"Обнаружена ОС: {self.os_type}") # Проверяем, запущены ли мы в Docker с доступом к хосту self.is_docker_host_monitoring = self._check_docker_host_access() if self.is_docker_host_monitoring: logger.info("Обнаружен доступ к хосту через Docker volumes - мониторинг будет вестись на уровне хоста") else: logger.warning("Мониторинг будет вестись на уровне контейнера (не рекомендуется для продакшена)") # Пороговые значения для алертов self.threshold = float(os.getenv('THRESHOLD', '80.0')) self.recovery_threshold = float(os.getenv('RECOVERY_THRESHOLD', '75.0')) # Задержки для алертов (в секундах) - предотвращают ложные срабатывания self.alert_delays = { 'cpu': int(os.getenv('CPU_ALERT_DELAY', '30')), # 30 сек для CPU 'ram': int(os.getenv('RAM_ALERT_DELAY', '45')), # 45 сек для RAM 'disk': int(os.getenv('DISK_ALERT_DELAY', '60')) # 60 сек для диска } # Состояние алертов для предотвращения спама self.alert_states = { 'cpu': False, 'ram': False, 'disk': False } # Время первого превышения порога для каждого метрика self.alert_start_times = { 'cpu': None, 'ram': None, 'disk': None } # PID файлы для отслеживания процессов # Определяем корень проекта для поиска PID файлов current_file = os.path.abspath(__file__) self.project_root = os.path.dirname(os.path.dirname(current_file)) self.pid_files = { 'helper_bot': os.path.join(self.project_root, 'helper_bot.pid') } # Для расчета скорости диска self.last_disk_io = None self.last_disk_io_time = None # Для расчета процента загрузки диска (отдельные переменные) self.last_disk_io_for_percent = None self.last_disk_io_time_for_percent = None # Инициализируем базовые значения для скорости диска при первом вызове self._initialize_disk_io() # Время запуска мониторинга для расчета uptime self.monitor_start_time = time.time() logger.info(f"Инициализированы задержки алертов: CPU={self.alert_delays['cpu']}s, RAM={self.alert_delays['ram']}s, Disk={self.alert_delays['disk']}s") def add_bot_to_monitoring(self, bot_name: str): """ Добавление нового бота в мониторинг Args: bot_name: Имя бота (например, 'helper_bot', 'admin_bot', etc.) """ pid_file_path = os.path.join(self.project_root, f"{bot_name}.pid") self.pid_files[bot_name] = pid_file_path logger.info(f"Добавлен бот {bot_name} в мониторинг: {pid_file_path}") def _detect_os(self) -> str: """Определение типа операционной системы""" system = platform.system().lower() if system == "darwin": return "macos" elif system == "linux": return "ubuntu" else: return "unknown" def _check_docker_host_access(self) -> bool: """Проверка доступности хоста через Docker volumes""" try: # Проверяем, доступны ли файлы хоста через /host/proc # Это означает, что контейнер запущен с --privileged и volume mounts if os.path.exists('/host/proc/stat') and os.path.exists('/host/proc/meminfo'): return True # Альтернативная проверка - проверяем, запущены ли мы в Docker # и есть ли доступ к системным файлам хоста if os.path.exists('/.dockerenv'): # Проверяем, можем ли мы читать системные файлы хоста try: with open('/proc/stat', 'r') as f: f.read(100) # Читаем немного для проверки доступа return True except (OSError, PermissionError): pass return False except Exception as e: logger.debug(f"Ошибка при проверке доступа к хосту: {e}") return False def _initialize_disk_io(self): """Инициализация базовых значений для расчета скорости диска""" try: disk_io = self._get_disk_io_counters() if disk_io: self.last_disk_io = disk_io self.last_disk_io_time = time.time() logger.debug("Инициализированы базовые значения для расчета скорости диска") except Exception as e: logger.error(f"Ошибка при инициализации диска I/O: {e}") def _get_disk_path(self) -> str: """Получение пути к диску в зависимости от ОС""" if self.os_type == "macos": return "/" elif self.os_type == "ubuntu": return "/" else: return "/" def _get_disk_usage(self) -> Optional[object]: """Получение информации о диске с учетом ОС""" try: if self.os_type == "macos": # На macOS используем diskutil для получения реального использования диска return self._get_macos_disk_usage() else: disk_path = self._get_disk_path() return psutil.disk_usage(disk_path) except Exception as e: logger.error(f"Ошибка при получении информации о диске: {e}") return None def _get_macos_disk_usage(self) -> Optional[object]: """Получение информации о диске на macOS через diskutil""" try: import subprocess import re # Получаем информацию о диске через diskutil result = subprocess.run(['diskutil', 'info', '/'], capture_output=True, text=True) if result.returncode != 0: # Fallback к psutil return psutil.disk_usage('/') output = result.stdout # Извлекаем размеры из вывода diskutil total_match = re.search(r'Container Total Space:\s+(\d+\.\d+)\s+GB', output) free_match = re.search(r'Container Free Space:\s+(\d+\.\d+)\s+GB', output) if total_match and free_match: total_gb = float(total_match.group(1)) free_gb = float(free_match.group(1)) used_gb = total_gb - free_gb # Создаем объект, похожий на результат psutil.disk_usage class DiskUsage: def __init__(self, total, used, free): self.total = total * (1024**3) # Конвертируем в байты self.used = used * (1024**3) self.free = free * (1024**3) return DiskUsage(total_gb, used_gb, free_gb) else: # Fallback к psutil return psutil.disk_usage('/') except Exception as e: logger.error(f"Ошибка при получении информации о диске macOS: {e}") # Fallback к psutil return psutil.disk_usage('/') def _get_disk_io_counters(self): """Получение статистики диска с учетом ОС""" try: if self.os_type == "macos": # На macOS может быть несколько дисков, берем основной return psutil.disk_io_counters(perdisk=False) elif self.os_type == "ubuntu": # На Ubuntu обычно один диск return psutil.disk_io_counters(perdisk=False) else: return psutil.disk_io_counters() except Exception as e: logger.error(f"Ошибка при получении статистики диска: {e}") return None def _get_system_uptime(self) -> float: """Получение uptime системы с учетом ОС""" try: if self.os_type == "macos": # На macOS используем boot_time boot_time = psutil.boot_time() return time.time() - boot_time elif self.os_type == "ubuntu": # На Ubuntu также используем boot_time boot_time = psutil.boot_time() return time.time() - boot_time else: boot_time = psutil.boot_time() return time.time() - boot_time except Exception as e: logger.error(f"Ошибка при получении uptime системы: {e}") return 0.0 def get_monitor_uptime(self) -> str: """Получение uptime мониторинга""" uptime_seconds = time.time() - self.monitor_start_time return self._format_uptime(uptime_seconds) def get_system_info(self) -> Dict: """Получение информации о системе""" try: # Определяем, какой psutil использовать current_psutil = psutil if self.is_docker_host_monitoring: # Для хоста используем специальные методы host_cpu = self._get_host_cpu_info() host_memory = self._get_host_memory_info() host_disk = self._get_host_disk_info() if host_cpu and host_memory and host_disk: # Используем данные хоста cpu_count = host_cpu['cpu_count'] load_avg = host_cpu['load_avg'] # Для CPU процента используем упрощенный расчет на основе load average # Load average > 1.0 на ядро считается высокой нагрузкой load_per_core = load_avg[0] / cpu_count if cpu_count > 0 else 0 cpu_percent = min(100, load_per_core * 100) # Упрощенный расчет # Память хоста ram_total = host_memory['ram_total'] ram_used = host_memory['ram_used'] ram_percent = host_memory['ram_percent'] swap_total = host_memory['swap_total'] swap_used = host_memory['swap_used'] swap_percent = host_memory['swap_percent'] # Диск хоста disk_total = host_disk['total'] disk_used = host_disk['used'] disk_free = host_disk['free'] disk_percent = host_disk['percent'] # IO Wait и другие метрики недоступны через /proc, используем 0 io_wait_percent = 0.0 logger.debug("Используются метрики хоста через Docker volumes") else: # Fallback к стандартному psutil logger.warning("Не удалось получить метрики хоста, используем контейнер") current_psutil = psutil host_cpu = host_memory = host_disk = None else: # Стандартный psutil для контейнера host_cpu = host_memory = host_disk = None # Если не используем хост, получаем стандартные метрики if not host_cpu: cpu_percent = current_psutil.cpu_percent(interval=1) load_avg = current_psutil.getloadavg() cpu_count = current_psutil.cpu_count() # CPU times для получения IO Wait cpu_times = current_psutil.cpu_times_percent(interval=1) io_wait_percent = getattr(cpu_times, 'iowait', 0.0) # Память memory = current_psutil.virtual_memory() swap = current_psutil.swap_memory() # Используем единый расчет для всех ОС: used / total для получения процента занятой памяти ram_percent = (memory.used / memory.total) * 100 ram_total = memory.total ram_used = memory.used swap_total = swap.total swap_used = swap.used swap_percent = swap.percent # Диск disk = self._get_disk_usage() disk_total = disk.total if disk else 0 disk_used = disk.used if disk else 0 disk_free = disk.free if disk else 0 disk_percent = (disk_used / disk_total * 100) if disk_total > 0 else 0 # Диск I/O (может быть недоступен для хоста) disk_io = self._get_disk_io_counters() if disk_io: disk_io_percent = self._calculate_disk_io_percent() disk_read_speed, disk_write_speed = self._calculate_disk_speed(disk_io) else: disk_io_percent = 0 disk_read_speed = "0 B/s" disk_write_speed = "0 B/s" # Система system_uptime = self._get_system_uptime() # Получаем имя хоста if self.is_docker_host_monitoring: try: with open('/host/proc/sys/kernel/hostname', 'r') as f: hostname = f.read().strip() except: hostname = "host" else: hostname = os.uname().nodename return { 'cpu_percent': round(cpu_percent, 1), 'load_avg_1m': round(load_avg[0], 2), 'load_avg_5m': round(load_avg[1], 2), 'load_avg_15m': round(load_avg[2], 2), 'cpu_count': cpu_count, 'io_wait_percent': round(io_wait_percent, 1), 'ram_used': round(ram_used / (1024**3), 2), 'ram_total': round(ram_total / (1024**3), 2), 'ram_percent': round(ram_percent, 1), 'swap_used': round(swap_used / (1024**3), 2), 'swap_total': round(swap_total / (1024**3), 2), 'swap_percent': round(swap_percent, 1), 'disk_used': round(disk_used / (1024**3), 2), 'disk_total': round(disk_total / (1024**3), 2), 'disk_percent': round(disk_percent, 1), 'disk_free': round(disk_free / (1024**3), 2), 'disk_read_speed': disk_read_speed, 'disk_write_speed': disk_write_speed, 'disk_io_percent': disk_io_percent, 'system_uptime': self._format_uptime(system_uptime), 'monitor_uptime': self.get_monitor_uptime(), 'server_hostname': hostname, 'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'monitoring_level': 'host' if self.is_docker_host_monitoring else 'container' } except Exception as e: logger.error(f"Ошибка при получении информации о системе: {e}") return {} def _format_bytes(self, bytes_value: int) -> str: """Форматирование байтов в человекочитаемый вид""" if bytes_value == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] i = 0 while bytes_value >= 1024 and i < len(size_names) - 1: bytes_value /= 1024.0 i += 1 return f"{bytes_value:.1f} {size_names[i]}" def _format_uptime(self, seconds: float) -> str: """Форматирование времени работы системы""" days = int(seconds // 86400) hours = int((seconds % 86400) // 3600) minutes = int((seconds % 3600) // 60) if days > 0: return f"{days}д {hours}ч {minutes}м" elif hours > 0: return f"{hours}ч {minutes}м" else: return f"{minutes}м" def check_process_status(self, process_name: str) -> Tuple[str, str]: """Проверка статуса процесса и возврат статуса с uptime""" try: # Для helper_bot используем HTTP endpoint if process_name == 'helper_bot': return self._check_helper_bot_status() # Для других процессов используем стандартную проверку return self._check_local_process_status(process_name) except Exception as e: logger.error(f"Ошибка при проверке процесса {process_name}: {e}") return "❌", "Выключен" def _check_local_process_status(self, process_name: str) -> Tuple[str, str]: """Проверка локального процесса по PID файлу или имени""" try: # Проверяем по PID файлу pid_file = self.pid_files.get(process_name) if pid_file and os.path.exists(pid_file): try: with open(pid_file, 'r') as f: content = f.read().strip() if content and content != '# Этот файл будет автоматически обновляться при запуске бота': pid = int(content) if psutil.pid_exists(pid): proc = psutil.Process(pid) proc_uptime = time.time() - proc.create_time() uptime_str = self._format_uptime(proc_uptime) return "✅", f"Uptime {uptime_str}" except (ValueError, FileNotFoundError): pass # Проверяем по имени процесса for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: proc_name = proc.info['name'].lower() cmdline = ' '.join(proc.info['cmdline']).lower() if proc.info['cmdline'] else '' if (process_name in proc_name or process_name in cmdline or 'python' in proc_name and process_name in cmdline): proc_uptime = time.time() - proc.create_time() uptime_str = self._format_uptime(proc_uptime) return "✅", f"Uptime {uptime_str}" except (psutil.NoSuchProcess, psutil.AccessDenied): continue return "❌", "Выключен" except Exception as e: logger.error(f"Ошибка при проверке локального процесса {process_name}: {e}") return "❌", "Выключен" def _calculate_disk_speed(self, current_disk_io) -> Tuple[str, str]: """Расчет скорости чтения/записи диска""" current_time = time.time() if self.last_disk_io is None or self.last_disk_io_time is None: self.last_disk_io = current_disk_io self.last_disk_io_time = current_time return "0 B/s", "0 B/s" time_diff = current_time - self.last_disk_io_time if time_diff < 1: # Минимальный интервал 1 секунда return "0 B/s", "0 B/s" read_diff = current_disk_io.read_bytes - self.last_disk_io.read_bytes write_diff = current_disk_io.write_bytes - self.last_disk_io.write_bytes read_speed = read_diff / time_diff write_speed = write_diff / time_diff # Обновляем предыдущие значения self.last_disk_io = current_disk_io self.last_disk_io_time = current_time return self._format_bytes(read_speed) + "/s", self._format_bytes(write_speed) + "/s" def _calculate_disk_io_percent(self) -> int: """Расчет процента загрузки диска на основе реальной скорости I/O""" try: # Получаем текущую статистику диска current_disk_io = self._get_disk_io_counters() if current_disk_io is None: return 0 current_time = time.time() # Если это первое измерение, инициализируем if self.last_disk_io_for_percent is None or self.last_disk_io_time_for_percent is None: logger.debug("Первое измерение диска для процента, инициализируем базовые значения") self.last_disk_io_for_percent = current_disk_io self.last_disk_io_time_for_percent = current_time return 0 # Рассчитываем время между измерениями time_diff = current_time - self.last_disk_io_time_for_percent if time_diff < 0.1: # Минимальный интервал 0.1 секунды для более точных измерений logger.debug(f"Интервал между измерениями слишком мал: {time_diff:.3f}s, возвращаем 0%") return 0 # Рассчитываем скорость операций в секунду read_ops_diff = current_disk_io.read_count - self.last_disk_io_for_percent.read_count write_ops_diff = current_disk_io.write_count - self.last_disk_io_for_percent.write_count read_ops_per_sec = read_ops_diff / time_diff write_ops_per_sec = write_ops_diff / time_diff total_ops_per_sec = read_ops_per_sec + write_ops_per_sec # Рассчитываем скорость передачи данных в байтах в секунду read_bytes_diff = current_disk_io.read_bytes - self.last_disk_io_for_percent.read_bytes write_bytes_diff = current_disk_io.write_bytes - self.last_disk_io_for_percent.write_bytes read_bytes_per_sec = read_bytes_diff / time_diff write_bytes_per_sec = write_bytes_diff / time_diff total_bytes_per_sec = read_bytes_per_sec + write_bytes_per_sec # Обновляем предыдущие значения для процента self.last_disk_io_for_percent = current_disk_io self.last_disk_io_time_for_percent = current_time # Определяем максимальную производительность диска в зависимости от ОС if self.os_type == "macos": # macOS обычно имеет SSD с высокой производительностью max_ops_per_sec = 50000 # Операций в секунду max_bytes_per_sec = 3 * (1024**3) # 3 GB/s elif self.os_type == "ubuntu": # Ubuntu может быть на разных типах дисков max_ops_per_sec = 30000 # Операций в секунду max_bytes_per_sec = 2 * (1024**3) # 2 GB/s else: max_ops_per_sec = 40000 max_bytes_per_sec = 2.5 * (1024**3) # Рассчитываем процент загрузки на основе операций и байтов # Защита от деления на ноль if max_ops_per_sec > 0: ops_percent = min(100, (total_ops_per_sec / max_ops_per_sec) * 100) else: ops_percent = 0 if max_bytes_per_sec > 0: bytes_percent = min(100, (total_bytes_per_sec / max_bytes_per_sec) * 100) else: bytes_percent = 0 # Взвешенный средний процент (операции важнее для большинства случаев) final_percent = (ops_percent * 0.7) + (bytes_percent * 0.3) # Логируем для отладки (только при высоких значениях) if final_percent > 10: logger.debug(f"Диск I/O: {total_ops_per_sec:.1f} ops/s, {total_bytes_per_sec/(1024**2):.1f} MB/s, " f"Загрузка: {final_percent:.1f}% (ops: {ops_percent:.1f}%, bytes: {bytes_percent:.1f}%)") # Округляем до целого числа return round(final_percent) except Exception as e: logger.error(f"Ошибка при расчете процента загрузки диска: {e}") return 0 def get_metrics_data(self) -> Dict: """Получение данных для метрик Prometheus""" system_info = self.get_system_info() if not system_info: return {} return { 'cpu_usage_percent': system_info.get('cpu_percent', 0), 'ram_usage_percent': system_info.get('ram_percent', 0), 'disk_usage_percent': system_info.get('disk_percent', 0), 'load_average_1m': system_info.get('load_avg_1m', 0), 'load_average_5m': system_info.get('load_avg_5m', 0), 'load_average_15m': system_info.get('load_avg_15m', 0), 'swap_usage_percent': system_info.get('swap_percent', 0), 'disk_io_percent': system_info.get('disk_io_percent', 0), 'system_uptime_seconds': self._get_system_uptime(), 'monitor_uptime_seconds': time.time() - self.monitor_start_time } def check_alerts(self, system_info: Dict) -> Tuple[bool, Optional[str]]: """Проверка необходимости отправки алертов с учетом задержек""" current_time = time.time() alerts = [] recoveries = [] # Проверка CPU с задержкой if system_info['cpu_percent'] > self.threshold: if not self.alert_states['cpu']: # Первое превышение порога if self.alert_start_times['cpu'] is None: self.alert_start_times['cpu'] = current_time logger.debug(f"CPU превысил порог {self.threshold}%: {system_info['cpu_percent']:.1f}% - начинаем отсчет задержки {self.alert_delays['cpu']}s") # Проверяем, прошла ли задержка if self.alert_delays['cpu'] == 0 or current_time - self.alert_start_times['cpu'] >= self.alert_delays['cpu']: self.alert_states['cpu'] = True alerts.append(('cpu', system_info['cpu_percent'], f"Нагрузка за 1 мин: {system_info['load_avg_1m']}")) logger.warning(f"CPU ALERT: {system_info['cpu_percent']:.1f}% > {self.threshold}% (задержка {self.alert_delays['cpu']}s)") else: # CPU ниже порога - сбрасываем состояние if self.alert_states['cpu']: self.alert_states['cpu'] = False recoveries.append(('cpu', system_info['cpu_percent'])) logger.info(f"CPU RECOVERY: {system_info['cpu_percent']:.1f}% < {self.recovery_threshold}%") # Сбрасываем время начала превышения self.alert_start_times['cpu'] = None # Проверка RAM с задержкой if system_info['ram_percent'] > self.threshold: if not self.alert_states['ram']: # Первое превышение порога if self.alert_start_times['ram'] is None: self.alert_start_times['ram'] = current_time logger.debug(f"RAM превысил порог {self.threshold}%: {system_info['ram_percent']:.1f}% - начинаем отсчет задержки {self.alert_delays['ram']}s") # Проверяем, прошла ли задержка if self.alert_delays['ram'] == 0 or current_time - self.alert_start_times['ram'] >= self.alert_delays['ram']: self.alert_states['ram'] = True alerts.append(('ram', system_info['ram_percent'], f"Используется: {system_info['ram_used']} GB из {system_info['ram_total']} GB")) logger.warning(f"RAM ALERT: {system_info['ram_percent']:.1f}% > {self.threshold}% (задержка {self.alert_delays['ram']}s)") else: # RAM ниже порога - сбрасываем состояние if self.alert_states['ram']: self.alert_states['ram'] = False recoveries.append(('ram', system_info['ram_percent'])) logger.info(f"RAM RECOVERY: {system_info['ram_percent']:.1f}% < {self.recovery_threshold}%") # Сбрасываем время начала превышения self.alert_start_times['ram'] = None # Проверка диска с задержкой if system_info['disk_percent'] > self.threshold: if not self.alert_states['disk']: # Первое превышение порога if self.alert_start_times['disk'] is None: self.alert_start_times['disk'] = current_time logger.debug(f"Disk превысил порог {self.threshold}%: {system_info['disk_percent']:.1f}% - начинаем отсчет задержки {self.alert_delays['disk']}s") # Проверяем, прошла ли задержка if self.alert_delays['disk'] == 0 or current_time - self.alert_start_times['disk'] >= self.alert_delays['disk']: self.alert_states['disk'] = True alerts.append(('disk', system_info['disk_percent'], f"Свободно: {system_info['disk_free']} GB на /")) logger.warning(f"DISK ALERT: {system_info['disk_percent']:.1f}% > {self.threshold}% (задержка {self.alert_delays['disk']}s)") else: # Диск ниже порога - сбрасываем состояние if self.alert_states['disk']: self.alert_states['disk'] = False recoveries.append(('disk', system_info['disk_percent'])) logger.info(f"DISK RECOVERY: {system_info['disk_percent']:.1f}% < {self.recovery_threshold}%") # Сбрасываем время начала превышения self.alert_start_times['disk'] = None return alerts, recoveries def _get_host_psutil(self): """Получение psutil с доступом к хосту""" if self.is_docker_host_monitoring: # Переключаемся на директории хоста os.environ['PROC_ROOT'] = '/host/proc' os.environ['SYS_ROOT'] = '/host/sys' # Перезагружаем psutil для использования новых путей import importlib import psutil importlib.reload(psutil) return psutil return psutil def _get_host_cpu_info(self): """Получение информации о CPU хоста""" try: if self.is_docker_host_monitoring: # Читаем информацию о CPU напрямую из /proc with open('/host/proc/cpuinfo', 'r') as f: cpu_info = f.read() # Подсчитываем количество ядер cpu_count = cpu_info.count('processor') # Читаем load average with open('/host/proc/loadavg', 'r') as f: load_avg = f.read().strip().split()[:3] load_avg = [float(x) for x in load_avg] # Читаем статистику CPU with open('/host/proc/stat', 'r') as f: cpu_stat = f.readline().strip().split()[1:] cpu_stat = [int(x) for x in cpu_stat] # Рассчитываем процент CPU (упрощенный метод) # В реальности нужно сравнивать с предыдущими значениями cpu_percent = 0.0 # Будет рассчитано в get_system_info return { 'cpu_count': cpu_count, 'load_avg': load_avg, 'cpu_stat': cpu_stat } else: # Используем стандартный psutil return { 'cpu_count': psutil.cpu_count(), 'load_avg': psutil.getloadavg(), 'cpu_stat': None } except Exception as e: logger.error(f"Ошибка при получении информации о CPU хоста: {e}") return None def _get_host_memory_info(self): """Получение информации о памяти хоста""" try: if self.is_docker_host_monitoring: # Читаем информацию о памяти из /proc/meminfo with open('/host/proc/meminfo', 'r') as f: mem_info = f.read() # Парсим значения mem_lines = mem_info.split('\n') mem_data = {} for line in mem_lines: if ':' in line: key, value = line.split(':', 1) mem_data[key.strip()] = int(value.strip().split()[0]) * 1024 # Конвертируем в байты # Рассчитываем проценты total = mem_data.get('MemTotal', 0) available = mem_data.get('MemAvailable', 0) used = total - available ram_percent = (used / total * 100) if total > 0 else 0 # Swap swap_total = mem_data.get('SwapTotal', 0) swap_free = mem_data.get('SwapFree', 0) swap_used = swap_total - swap_free swap_percent = (swap_used / swap_total * 100) if swap_total > 0 else 0 return { 'ram_total': total, 'ram_used': used, 'ram_percent': ram_percent, 'swap_total': swap_total, 'swap_used': swap_used, 'swap_percent': swap_percent } else: # Используем стандартный psutil memory = psutil.virtual_memory() swap = psutil.swap_memory() return { 'ram_total': memory.total, 'ram_used': memory.used, 'ram_percent': memory.percent, 'swap_total': swap.total, 'swap_used': swap.used, 'swap_percent': swap.percent } except Exception as e: logger.error(f"Ошибка при получении информации о памяти хоста: {e}") return None def _get_host_disk_info(self): """Получение информации о диске хоста""" try: if self.is_docker_host_monitoring: # Используем df для получения информации о диске import subprocess result = subprocess.run(['df', '/'], capture_output=True, text=True) if result.returncode == 0: lines = result.stdout.strip().split('\n') if len(lines) >= 2: parts = lines[1].split() if len(parts) >= 4: total_kb = int(parts[1]) used_kb = int(parts[2]) available_kb = int(parts[3]) total = total_kb * 1024 used = used_kb * 1024 available = available_kb * 1024 percent = (used / total * 100) if total > 0 else 0 return { 'total': total, 'used': used, 'free': available, 'percent': percent } # Fallback к стандартному psutil return None else: # Используем стандартный psutil return None except Exception as e: logger.error(f"Ошибка при получении информации о диске хоста: {e}") return None def _check_helper_bot_status(self) -> Tuple[str, str]: """Проверка статуса helper_bot через HTTP endpoint""" try: import requests logger.info("Проверяем статус helper_bot через HTTP endpoint /status") # Обращаемся к endpoint /status в helper_bot url = 'http://bots_telegram_bot:8080/status' logger.info(f"Отправляем HTTP запрос к: {url}") response = requests.get(url, timeout=5) logger.info(f"Получен HTTP ответ: статус {response.status_code}") if response.status_code == 200: try: data = response.json() logger.info(f"Получены данные: {data}") status = data.get('status', 'unknown') uptime = data.get('uptime', 'unknown') if status == 'running': result = "✅", f"Uptime {uptime}" logger.info(f"Helper_bot работает: {result}") return result elif status == 'starting': result = "🔄", f"Запуск: {uptime}" logger.info(f"Helper_bot запускается: {result}") return result else: result = "⚠️", f"Статус: {status}" logger.warning(f"Helper_bot необычный статус: {result}") return result except (ValueError, KeyError) as e: # Если не удалось распарсить JSON, но статус 200 logger.warning(f"Не удалось распарсить JSON ответ: {e}, но статус 200") result = "✅", "HTTP: доступен" logger.info(f"Helper_bot доступен: {result}") return result else: logger.warning(f"HTTP статус не 200: {response.status_code}") return "⚠️", f"HTTP: {response.status_code}" except requests.exceptions.Timeout: logger.error("HTTP запрос к helper_bot завершился таймаутом") return "⚠️", "HTTP: таймаут" except requests.exceptions.ConnectionError as e: logger.error(f"HTTP ошибка соединения с helper_bot: {e}") return "❌", "HTTP: нет соединения" except ImportError: logger.debug("requests не доступен для HTTP проверки") return "❌", "HTTP: requests недоступен" except Exception as e: logger.error(f"Неожиданная ошибка при HTTP проверке helper_bot: {e}") return "❌", f"HTTP: ошибка"