import asyncio import os import psutil import time import platform from datetime import datetime, timedelta from typing import Dict, Optional, Tuple import logging logger = logging.getLogger(__name__) class ServerMonitor: def __init__(self, bot, group_for_logs: str, important_logs: str): self.bot = bot self.group_for_logs = group_for_logs self.important_logs = important_logs # Определяем ОС self.os_type = self._detect_os() logger.info(f"Обнаружена ОС: {self.os_type}") # Пороговые значения для алертов self.threshold = 80.0 self.recovery_threshold = 75.0 # Состояние алертов для предотвращения спама self.alert_states = { 'cpu': False, 'ram': False, 'disk': False } # PID файлы для отслеживания процессов self.pid_files = { 'voice_bot': 'voice_bot.pid', 'helper_bot': 'helper_bot.pid' } # Время последней отправки статуса self.last_status_time = None # Для расчета скорости диска self.last_disk_io = None self.last_disk_io_time = None # Время запуска бота для расчета uptime self.bot_start_time = time.time() def _detect_os(self) -> str: """Определение типа операционной системы""" system = platform.system().lower() if system == "darwin": return "macos" elif system == "linux": return "ubuntu" else: return "unknown" def _get_disk_path(self) -> str: """Получение пути к диску в зависимости от ОС""" if self.os_type == "macos": return "/" elif self.os_type == "ubuntu": return "/" else: return "/" def _get_disk_usage(self) -> Optional[object]: """Получение информации о диске с учетом ОС""" try: if self.os_type == "macos": # На macOS используем diskutil для получения реального использования диска return self._get_macos_disk_usage() else: disk_path = self._get_disk_path() return psutil.disk_usage(disk_path) except Exception as e: logger.error(f"Ошибка при получении информации о диске: {e}") return None def _get_macos_disk_usage(self) -> Optional[object]: """Получение информации о диске на macOS через diskutil""" try: import subprocess import re # Получаем информацию о диске через diskutil result = subprocess.run(['diskutil', 'info', '/'], capture_output=True, text=True) if result.returncode != 0: # Fallback к psutil return psutil.disk_usage('/') output = result.stdout # Извлекаем размеры из вывода diskutil total_match = re.search(r'Container Total Space:\s+(\d+\.\d+)\s+GB', output) free_match = re.search(r'Container Free Space:\s+(\d+\.\d+)\s+GB', output) if total_match and free_match: total_gb = float(total_match.group(1)) free_gb = float(free_match.group(1)) used_gb = total_gb - free_gb # Создаем объект, похожий на результат psutil.disk_usage class DiskUsage: def __init__(self, total, used, free): self.total = total * (1024**3) # Конвертируем в байты self.used = used * (1024**3) self.free = free * (1024**3) return DiskUsage(total_gb, used_gb, free_gb) else: # Fallback к psutil return psutil.disk_usage('/') except Exception as e: logger.error(f"Ошибка при получении информации о диске macOS: {e}") # Fallback к psutil return psutil.disk_usage('/') def _get_disk_io_counters(self): """Получение статистики диска с учетом ОС""" try: if self.os_type == "macos": # На macOS может быть несколько дисков, берем основной return psutil.disk_io_counters(perdisk=False) elif self.os_type == "ubuntu": # На Ubuntu обычно один диск return psutil.disk_io_counters(perdisk=False) else: return psutil.disk_io_counters() except Exception as e: logger.error(f"Ошибка при получении статистики диска: {e}") return None def _get_system_uptime(self) -> float: """Получение uptime системы с учетом ОС""" try: if self.os_type == "macos": # На macOS используем boot_time boot_time = psutil.boot_time() return time.time() - boot_time elif self.os_type == "ubuntu": # На Ubuntu также используем boot_time boot_time = psutil.boot_time() return time.time() - boot_time else: boot_time = psutil.boot_time() return time.time() - boot_time except Exception as e: logger.error(f"Ошибка при получении uptime системы: {e}") return 0.0 def get_bot_uptime(self) -> str: """Получение uptime бота""" uptime_seconds = time.time() - self.bot_start_time return self._format_uptime(uptime_seconds) def get_system_info(self) -> Dict: """Получение информации о системе""" try: # CPU cpu_percent = psutil.cpu_percent(interval=1) load_avg = psutil.getloadavg() cpu_count = psutil.cpu_count() # Память memory = psutil.virtual_memory() swap = psutil.swap_memory() # Используем единый расчет для всех ОС: used / total для получения процента занятой памяти # Это обеспечивает консистентность между macOS и Ubuntu ram_percent = (memory.used / memory.total) * 100 # Диск disk = self._get_disk_usage() disk_io = self._get_disk_io_counters() if disk is None: logger.error("Не удалось получить информацию о диске") return {} # Расчет скорости диска disk_read_speed, disk_write_speed = self._calculate_disk_speed(disk_io) # Система system_uptime = self._get_system_uptime() # Получаем имя хоста в зависимости от ОС if self.os_type == "macos": hostname = os.uname().nodename elif self.os_type == "ubuntu": hostname = os.uname().nodename else: hostname = "unknown" return { 'cpu_percent': cpu_percent, 'load_avg_1m': round(load_avg[0], 2), 'load_avg_5m': round(load_avg[1], 2), 'load_avg_15m': round(load_avg[2], 2), 'cpu_count': cpu_count, 'ram_used': round(memory.used / (1024**3), 2), 'ram_total': round(memory.total / (1024**3), 2), 'ram_percent': round(ram_percent, 1), # Исправленный процент занятой памяти 'swap_used': round(swap.used / (1024**3), 2), 'swap_total': round(swap.total / (1024**3), 2), 'swap_percent': swap.percent, 'disk_used': round(disk.used / (1024**3), 2), 'disk_total': round(disk.total / (1024**3), 2), 'disk_percent': round((disk.used / disk.total) * 100, 1), 'disk_free': round(disk.free / (1024**3), 2), 'disk_read_speed': disk_read_speed, 'disk_write_speed': disk_write_speed, 'disk_io_percent': self._calculate_disk_io_percent(), 'system_uptime': self._format_uptime(system_uptime), 'bot_uptime': self.get_bot_uptime(), 'server_hostname': hostname, 'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } except Exception as e: logger.error(f"Ошибка при получении информации о системе: {e}") return {} def _get_disk_space_emoji(self, disk_percent: float) -> str: """Получение эмодзи для дискового пространства""" if disk_percent < 60: return "🟢" elif disk_percent < 90: return "⚠️" else: return "🚨" def _format_bytes(self, bytes_value: int) -> str: """Форматирование байтов в человекочитаемый вид""" if bytes_value == 0: return "0 B" size_names = ["B", "KB", "MB", "GB", "TB"] i = 0 while bytes_value >= 1024 and i < len(size_names) - 1: bytes_value /= 1024.0 i += 1 return f"{bytes_value:.1f} {size_names[i]}" def _format_uptime(self, seconds: float) -> str: """Форматирование времени работы системы""" days = int(seconds // 86400) hours = int((seconds % 86400) // 3600) minutes = int((seconds % 3600) // 60) if days > 0: return f"{days}д {hours}ч {minutes}м" elif hours > 0: return f"{hours}ч {minutes}м" else: return f"{minutes}м" def check_process_status(self, process_name: str) -> Tuple[str, str]: """Проверка статуса процесса и возврат статуса с uptime""" try: # Сначала проверяем по PID файлу pid_file = self.pid_files.get(process_name) if pid_file and os.path.exists(pid_file): try: with open(pid_file, 'r') as f: content = f.read().strip() if content and content != '# Этот файл будет автоматически обновляться при запуске бота': pid = int(content) if psutil.pid_exists(pid): # Получаем uptime процесса try: proc = psutil.Process(pid) proc_uptime = time.time() - proc.create_time() uptime_str = self._format_uptime(proc_uptime) return "✅", f"Uptime {uptime_str}" except: return "✅", "Uptime неизвестно" except (ValueError, FileNotFoundError): pass # Проверяем по имени процесса более точно for proc in psutil.process_iter(['pid', 'name', 'cmdline']): try: proc_name = proc.info['name'].lower() cmdline = ' '.join(proc.info['cmdline']).lower() if proc.info['cmdline'] else '' # Более точная проверка для каждого бота if process_name == 'voice_bot': # Проверяем voice_bot if ('voice_bot' in proc_name or 'voice_bot' in cmdline or 'voice_bot_v2.py' in cmdline): # Получаем uptime процесса try: proc_uptime = time.time() - proc.create_time() uptime_str = self._format_uptime(proc_uptime) return "✅", f"Uptime {uptime_str}" except: return "✅", "Uptime неизвестно" elif process_name == 'helper_bot': # Проверяем helper_bot if ('helper_bot' in proc_name or 'helper_bot' in cmdline or 'run_helper.py' in cmdline or 'python' in proc_name and 'helper_bot' in cmdline): # Получаем uptime процесса try: proc_uptime = time.time() - proc.create_time() uptime_str = self._format_uptime(proc_uptime) return "✅", f"Uptime {uptime_str}" except: return "✅", "Uptime неизвестно" except (psutil.NoSuchProcess, psutil.AccessDenied): continue return "❌", "Выключен" except Exception as e: logger.error(f"Ошибка при проверке процесса {process_name}: {e}") return "❌", "Выключен" def should_send_status(self) -> bool: """Проверка, нужно ли отправить статус (каждые 30 минут в 00 и 30 минут часа)""" now = datetime.now() # Проверяем, что сейчас 00 или 30 минут часа if now.minute in [0, 30]: # Проверяем, не отправляли ли мы уже статус в эту минуту if (self.last_status_time is None or self.last_status_time.hour != now.hour or self.last_status_time.minute != now.minute): self.last_status_time = now return True return False def _calculate_disk_speed(self, current_disk_io) -> Tuple[str, str]: """Расчет скорости чтения/записи диска""" current_time = time.time() if self.last_disk_io is None or self.last_disk_io_time is None: self.last_disk_io = current_disk_io self.last_disk_io_time = current_time return "0 B/s", "0 B/s" time_diff = current_time - self.last_disk_io_time if time_diff < 1: # Минимальный интервал 1 секунда return "0 B/s", "0 B/s" read_diff = current_disk_io.read_bytes - self.last_disk_io.read_bytes write_diff = current_disk_io.write_bytes - self.last_disk_io.write_bytes read_speed = read_diff / time_diff write_speed = write_diff / time_diff # Обновляем предыдущие значения self.last_disk_io = current_disk_io self.last_disk_io_time = current_time return self._format_bytes(read_speed) + "/s", self._format_bytes(write_speed) + "/s" def _calculate_disk_io_percent(self) -> int: """Расчет процента загрузки диска на основе IOPS""" try: # Получаем статистику диска disk_io = self._get_disk_io_counters() if disk_io is None: return 0 # Простая эвристика: считаем общее количество операций total_ops = disk_io.read_count + disk_io.write_count # Нормализуем к проценту (это приблизительная оценка) # На macOS обычно нормальная нагрузка до 1000-5000 операций в секунду if total_ops < 1000: return 10 elif total_ops < 5000: return 30 elif total_ops < 10000: return 50 elif total_ops < 20000: return 70 else: return 90 except: return 0 def should_send_startup_status(self) -> bool: """Проверка, нужно ли отправить статус при запуске""" return self.last_status_time is None async def send_startup_message(self): """Отправка сообщения о запуске бота""" try: message = f"""🚀 **Бот запущен!** --------------------------------- **Время запуска:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} **Сервер:** `{psutil.os.uname().nodename}` **Система:** {psutil.os.uname().sysname} {psutil.os.uname().release} **ОС:** {self.os_type.upper()} ✅ Мониторинг сервера активирован ✅ Статус будет отправляться каждые 30 минут (в 00 и 30 минут часа) ✅ Алерты будут отправляться при превышении пороговых значений ---------------------------------""" await self.bot.send_message( chat_id=self.important_logs, text=message, parse_mode='HTML' ) logger.info("Сообщение о запуске бота отправлено") except Exception as e: logger.error(f"Ошибка при отправке сообщения о запуске: {e}") async def send_shutdown_message(self): """Отправка сообщения об отключении бота""" try: # Получаем финальную информацию о системе system_info = self.get_system_info() if not system_info: system_info = { 'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'server_hostname': psutil.os.uname().nodename } message = f"""🛑 **Бот отключен!** --------------------------------- **Время отключения:** {system_info['current_time']} **Сервер:** `{system_info['server_hostname']}` ❌ Мониторинг сервера остановлен ❌ Статус больше не будет отправляться ❌ Алерты отключены ⚠️ **Внимание:** Проверьте состояние сервера! ---------------------------------""" await self.bot.send_message( chat_id=self.important_logs, text=message, parse_mode='HTML' ) logger.info("Сообщение об отключении бота отправлено") except Exception as e: logger.error(f"Ошибка при отправке сообщения об отключении: {e}") def check_alerts(self, system_info: Dict) -> Tuple[bool, Optional[str]]: """Проверка необходимости отправки алертов""" alerts = [] # Проверка CPU if system_info['cpu_percent'] > self.threshold and not self.alert_states['cpu']: self.alert_states['cpu'] = True alerts.append(('cpu', system_info['cpu_percent'], f"Нагрузка за 1 мин: {system_info['load_avg_1m']}")) # Проверка RAM if system_info['ram_percent'] > self.threshold and not self.alert_states['ram']: self.alert_states['ram'] = True alerts.append(('ram', system_info['ram_percent'], f"Используется: {system_info['ram_used']} GB из {system_info['ram_total']} GB")) # Проверка диска if system_info['disk_percent'] > self.threshold and not self.alert_states['disk']: self.alert_states['disk'] = True alerts.append(('disk', system_info['disk_percent'], f"Свободно: {system_info['disk_free']} GB на /")) # Проверка восстановления recoveries = [] if system_info['cpu_percent'] < self.recovery_threshold and self.alert_states['cpu']: self.alert_states['cpu'] = False recoveries.append(('cpu', system_info['cpu_percent'])) if system_info['ram_percent'] < self.recovery_threshold and self.alert_states['ram']: self.alert_states['ram'] = False recoveries.append(('ram', system_info['ram_percent'])) if system_info['disk_percent'] < self.recovery_threshold and self.alert_states['disk']: self.alert_states['disk'] = False recoveries.append(('disk', system_info['disk_percent'])) return alerts, recoveries async def send_status_message(self, system_info: Dict): """Отправка сообщения со статусом сервера""" try: voice_bot_status, voice_bot_uptime = self.check_process_status('voice_bot') helper_bot_status, helper_bot_uptime = self.check_process_status('helper_bot') # Получаем эмодзи для дискового пространства disk_emoji = self._get_disk_space_emoji(system_info['disk_percent']) message = f"""🖥 **Статус Сервера** | {system_info['current_time']} --------------------------------- **📊 Общая нагрузка:** CPU: {system_info['cpu_percent']}% | LA: {system_info['load_avg_1m']} / {system_info['cpu_count']} | IO Wait: {system_info['disk_percent']}% **💾 Память:** RAM: {system_info['ram_used']}/{system_info['ram_total']} GB ({system_info['ram_percent']}%) Swap: {system_info['swap_used']}/{system_info['swap_total']} GB ({system_info['swap_percent']}%) **🗂️ Дисковое пространство:** Диск (/): {system_info['disk_used']}/{system_info['disk_total']} GB ({system_info['disk_percent']}%) {disk_emoji} **💿 Диск I/O:** Read: {system_info['disk_read_speed']} | Write: {system_info['disk_write_speed']} Диск загружен: {system_info['disk_io_percent']}% **🤖 Процессы:** {voice_bot_status} voice-bot - {voice_bot_uptime} {helper_bot_status} helper-bot - {helper_bot_uptime} --------------------------------- ⏰ Uptime сервера: {system_info['system_uptime']}""" await self.bot.send_message( chat_id=self.group_for_logs, text=message, parse_mode='HTML' ) logger.info("Статус сервера отправлен") except Exception as e: logger.error(f"Ошибка при отправке статуса сервера: {e}") async def send_alert_message(self, metric_name: str, current_value: float, details: str): """Отправка сообщения об алерте""" try: message = f"""🚨 **ALERT: Высокая нагрузка на сервере!** --------------------------------- **Показатель:** {metric_name} **Текущее значение:** {current_value}% ⚠️ **Пороговое значение:** 80% **Детали:** {details} **Сервер:** `{psutil.os.uname().nodename}` **Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}` ---------------------------------""" await self.bot.send_message( chat_id=self.important_logs, text=message, parse_mode='HTML' ) logger.warning(f"Алерт отправлен: {metric_name} - {current_value}%") except Exception as e: logger.error(f"Ошибка при отправке алерта: {e}") async def send_recovery_message(self, metric_name: str, current_value: float, peak_value: float): """Отправка сообщения о восстановлении""" try: message = f"""✅ **RECOVERY: Нагрузка нормализовалась** --------------------------------- **Показатель:** {metric_name} **Текущее значение:** {current_value}% ✔️ **Было превышение:** До {peak_value}% **Сервер:** `{psutil.os.uname().nodename}` **Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}` ---------------------------------""" await self.bot.send_message( chat_id=self.important_logs, text=message, parse_mode='HTML' ) logger.info(f"Сообщение о восстановлении отправлено: {metric_name}") except Exception as e: logger.error(f"Ошибка при отправке сообщения о восстановлении: {e}") async def monitor_loop(self): """Основной цикл мониторинга""" logger.info(f"Модуль мониторинга сервера запущен на {self.os_type.upper()}") # Отправляем сообщение о запуске при первом запуске if self.should_send_startup_status(): await self.send_startup_message() while True: try: system_info = self.get_system_info() if not system_info: await asyncio.sleep(60) continue # Проверка алертов alerts, recoveries = self.check_alerts(system_info) # Отправка алертов for metric_type, value, details in alerts: metric_names = { 'cpu': 'Использование CPU', 'ram': 'Использование оперативной памяти', 'disk': 'Заполнение диска (/)' } await self.send_alert_message(metric_names[metric_type], value, details) # Отправка сообщений о восстановлении for metric_type, value in recoveries: metric_names = { 'cpu': 'Использование CPU', 'ram': 'Использование оперативной памяти', 'disk': 'Заполнение диска (/)' } # Находим пиковое значение (используем 80% как пример) await self.send_recovery_message(metric_names[metric_type], value, 80.0) # Отправка статуса каждые 30 минут в 00 и 30 минут часа if self.should_send_status(): await self.send_status_message(system_info) # Пауза между проверками (1 минута) await asyncio.sleep(60) except Exception as e: logger.error(f"Ошибка в цикле мониторинга: {e}") await asyncio.sleep(60)