Files
telegram-helper-bot/helper_bot/server_monitor.py
Andrey dc0e5d788c Implement OS detection and enhance disk monitoring in ServerMonitor
- Added OS detection functionality to the ServerMonitor class, allowing for tailored disk usage and uptime calculations based on the operating system (macOS or Ubuntu).
- Introduced methods for retrieving disk usage and I/O statistics specific to the detected OS.
- Updated the process status check to return uptime information for monitored processes.
- Enhanced the status message format to include disk space emojis and process uptime details.
- Updated tests to reflect changes in process status checks and output formatting.
2025-08-27 20:09:48 +03:00

624 lines
28 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import psutil
import time
import platform
from datetime import datetime, timedelta
from typing import Dict, Optional, Tuple
import logging
logger = logging.getLogger(__name__)
class ServerMonitor:
def __init__(self, bot, group_for_logs: str, important_logs: str):
self.bot = bot
self.group_for_logs = group_for_logs
self.important_logs = important_logs
# Определяем ОС
self.os_type = self._detect_os()
logger.info(f"Обнаружена ОС: {self.os_type}")
# Пороговые значения для алертов
self.threshold = 80.0
self.recovery_threshold = 75.0
# Состояние алертов для предотвращения спама
self.alert_states = {
'cpu': False,
'ram': False,
'disk': False
}
# PID файлы для отслеживания процессов
self.pid_files = {
'voice_bot': 'voice_bot.pid',
'helper_bot': 'helper_bot.pid'
}
# Время последней отправки статуса
self.last_status_time = None
# Для расчета скорости диска
self.last_disk_io = None
self.last_disk_io_time = None
# Время запуска бота для расчета uptime
self.bot_start_time = time.time()
def _detect_os(self) -> str:
"""Определение типа операционной системы"""
system = platform.system().lower()
if system == "darwin":
return "macos"
elif system == "linux":
return "ubuntu"
else:
return "unknown"
def _get_disk_path(self) -> str:
"""Получение пути к диску в зависимости от ОС"""
if self.os_type == "macos":
return "/"
elif self.os_type == "ubuntu":
return "/"
else:
return "/"
def _get_disk_usage(self) -> Optional[object]:
"""Получение информации о диске с учетом ОС"""
try:
if self.os_type == "macos":
# На macOS используем diskutil для получения реального использования диска
return self._get_macos_disk_usage()
else:
disk_path = self._get_disk_path()
return psutil.disk_usage(disk_path)
except Exception as e:
logger.error(f"Ошибка при получении информации о диске: {e}")
return None
def _get_macos_disk_usage(self) -> Optional[object]:
"""Получение информации о диске на macOS через diskutil"""
try:
import subprocess
import re
# Получаем информацию о диске через diskutil
result = subprocess.run(['diskutil', 'info', '/'], capture_output=True, text=True)
if result.returncode != 0:
# Fallback к psutil
return psutil.disk_usage('/')
output = result.stdout
# Извлекаем размеры из вывода diskutil
total_match = re.search(r'Container Total Space:\s+(\d+\.\d+)\s+GB', output)
free_match = re.search(r'Container Free Space:\s+(\d+\.\d+)\s+GB', output)
if total_match and free_match:
total_gb = float(total_match.group(1))
free_gb = float(free_match.group(1))
used_gb = total_gb - free_gb
# Создаем объект, похожий на результат psutil.disk_usage
class DiskUsage:
def __init__(self, total, used, free):
self.total = total * (1024**3) # Конвертируем в байты
self.used = used * (1024**3)
self.free = free * (1024**3)
return DiskUsage(total_gb, used_gb, free_gb)
else:
# Fallback к psutil
return psutil.disk_usage('/')
except Exception as e:
logger.error(f"Ошибка при получении информации о диске macOS: {e}")
# Fallback к psutil
return psutil.disk_usage('/')
def _get_disk_io_counters(self):
"""Получение статистики диска с учетом ОС"""
try:
if self.os_type == "macos":
# На macOS может быть несколько дисков, берем основной
return psutil.disk_io_counters(perdisk=False)
elif self.os_type == "ubuntu":
# На Ubuntu обычно один диск
return psutil.disk_io_counters(perdisk=False)
else:
return psutil.disk_io_counters()
except Exception as e:
logger.error(f"Ошибка при получении статистики диска: {e}")
return None
def _get_system_uptime(self) -> float:
"""Получение uptime системы с учетом ОС"""
try:
if self.os_type == "macos":
# На macOS используем boot_time
boot_time = psutil.boot_time()
return time.time() - boot_time
elif self.os_type == "ubuntu":
# На Ubuntu также используем boot_time
boot_time = psutil.boot_time()
return time.time() - boot_time
else:
boot_time = psutil.boot_time()
return time.time() - boot_time
except Exception as e:
logger.error(f"Ошибка при получении uptime системы: {e}")
return 0.0
def get_bot_uptime(self) -> str:
"""Получение uptime бота"""
uptime_seconds = time.time() - self.bot_start_time
return self._format_uptime(uptime_seconds)
def get_system_info(self) -> Dict:
"""Получение информации о системе"""
try:
# CPU
cpu_percent = psutil.cpu_percent(interval=1)
load_avg = psutil.getloadavg()
cpu_count = psutil.cpu_count()
# Память
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
# Используем единый расчет для всех ОС: used / total для получения процента занятой памяти
# Это обеспечивает консистентность между macOS и Ubuntu
ram_percent = (memory.used / memory.total) * 100
# Диск
disk = self._get_disk_usage()
disk_io = self._get_disk_io_counters()
if disk is None:
logger.error("Не удалось получить информацию о диске")
return {}
# Расчет скорости диска
disk_read_speed, disk_write_speed = self._calculate_disk_speed(disk_io)
# Система
system_uptime = self._get_system_uptime()
# Получаем имя хоста в зависимости от ОС
if self.os_type == "macos":
hostname = os.uname().nodename
elif self.os_type == "ubuntu":
hostname = os.uname().nodename
else:
hostname = "unknown"
return {
'cpu_percent': cpu_percent,
'load_avg_1m': round(load_avg[0], 2),
'load_avg_5m': round(load_avg[1], 2),
'load_avg_15m': round(load_avg[2], 2),
'cpu_count': cpu_count,
'ram_used': round(memory.used / (1024**3), 2),
'ram_total': round(memory.total / (1024**3), 2),
'ram_percent': round(ram_percent, 1), # Исправленный процент занятой памяти
'swap_used': round(swap.used / (1024**3), 2),
'swap_total': round(swap.total / (1024**3), 2),
'swap_percent': swap.percent,
'disk_used': round(disk.used / (1024**3), 2),
'disk_total': round(disk.total / (1024**3), 2),
'disk_percent': round((disk.used / disk.total) * 100, 1),
'disk_free': round(disk.free / (1024**3), 2),
'disk_read_speed': disk_read_speed,
'disk_write_speed': disk_write_speed,
'disk_io_percent': self._calculate_disk_io_percent(),
'system_uptime': self._format_uptime(system_uptime),
'bot_uptime': self.get_bot_uptime(),
'server_hostname': hostname,
'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
logger.error(f"Ошибка при получении информации о системе: {e}")
return {}
def _get_disk_space_emoji(self, disk_percent: float) -> str:
"""Получение эмодзи для дискового пространства"""
if disk_percent < 60:
return "🟢"
elif disk_percent < 90:
return "⚠️"
else:
return "🚨"
def _format_bytes(self, bytes_value: int) -> str:
"""Форматирование байтов в человекочитаемый вид"""
if bytes_value == 0:
return "0 B"
size_names = ["B", "KB", "MB", "GB", "TB"]
i = 0
while bytes_value >= 1024 and i < len(size_names) - 1:
bytes_value /= 1024.0
i += 1
return f"{bytes_value:.1f} {size_names[i]}"
def _format_uptime(self, seconds: float) -> str:
"""Форматирование времени работы системы"""
days = int(seconds // 86400)
hours = int((seconds % 86400) // 3600)
minutes = int((seconds % 3600) // 60)
if days > 0:
return f"{days}д {hours}ч {minutes}м"
elif hours > 0:
return f"{hours}ч {minutes}м"
else:
return f"{minutes}м"
def check_process_status(self, process_name: str) -> Tuple[str, str]:
"""Проверка статуса процесса и возврат статуса с uptime"""
try:
# Сначала проверяем по PID файлу
pid_file = self.pid_files.get(process_name)
if pid_file and os.path.exists(pid_file):
try:
with open(pid_file, 'r') as f:
content = f.read().strip()
if content and content != '# Этот файл будет автоматически обновляться при запуске бота':
pid = int(content)
if psutil.pid_exists(pid):
# Получаем uptime процесса
try:
proc = psutil.Process(pid)
proc_uptime = time.time() - proc.create_time()
uptime_str = self._format_uptime(proc_uptime)
return "", f"Uptime {uptime_str}"
except:
return "", "Uptime неизвестно"
except (ValueError, FileNotFoundError):
pass
# Проверяем по имени процесса более точно
for proc in psutil.process_iter(['pid', 'name', 'cmdline']):
try:
proc_name = proc.info['name'].lower()
cmdline = ' '.join(proc.info['cmdline']).lower() if proc.info['cmdline'] else ''
# Более точная проверка для каждого бота
if process_name == 'voice_bot':
# Проверяем voice_bot
if ('voice_bot' in proc_name or
'voice_bot' in cmdline or
'voice_bot_v2.py' in cmdline):
# Получаем uptime процесса
try:
proc_uptime = time.time() - proc.create_time()
uptime_str = self._format_uptime(proc_uptime)
return "", f"Uptime {uptime_str}"
except:
return "", "Uptime неизвестно"
elif process_name == 'helper_bot':
# Проверяем helper_bot
if ('helper_bot' in proc_name or
'helper_bot' in cmdline or
'run_helper.py' in cmdline or
'python' in proc_name and 'helper_bot' in cmdline):
# Получаем uptime процесса
try:
proc_uptime = time.time() - proc.create_time()
uptime_str = self._format_uptime(proc_uptime)
return "", f"Uptime {uptime_str}"
except:
return "", "Uptime неизвестно"
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
return "", "Выключен"
except Exception as e:
logger.error(f"Ошибка при проверке процесса {process_name}: {e}")
return "", "Выключен"
def should_send_status(self) -> bool:
"""Проверка, нужно ли отправить статус (каждые 30 минут в 00 и 30 минут часа)"""
now = datetime.now()
# Проверяем, что сейчас 00 или 30 минут часа
if now.minute in [0, 30]:
# Проверяем, не отправляли ли мы уже статус в эту минуту
if (self.last_status_time is None or
self.last_status_time.hour != now.hour or
self.last_status_time.minute != now.minute):
self.last_status_time = now
return True
return False
def _calculate_disk_speed(self, current_disk_io) -> Tuple[str, str]:
"""Расчет скорости чтения/записи диска"""
current_time = time.time()
if self.last_disk_io is None or self.last_disk_io_time is None:
self.last_disk_io = current_disk_io
self.last_disk_io_time = current_time
return "0 B/s", "0 B/s"
time_diff = current_time - self.last_disk_io_time
if time_diff < 1: # Минимальный интервал 1 секунда
return "0 B/s", "0 B/s"
read_diff = current_disk_io.read_bytes - self.last_disk_io.read_bytes
write_diff = current_disk_io.write_bytes - self.last_disk_io.write_bytes
read_speed = read_diff / time_diff
write_speed = write_diff / time_diff
# Обновляем предыдущие значения
self.last_disk_io = current_disk_io
self.last_disk_io_time = current_time
return self._format_bytes(read_speed) + "/s", self._format_bytes(write_speed) + "/s"
def _calculate_disk_io_percent(self) -> int:
"""Расчет процента загрузки диска на основе IOPS"""
try:
# Получаем статистику диска
disk_io = self._get_disk_io_counters()
if disk_io is None:
return 0
# Простая эвристика: считаем общее количество операций
total_ops = disk_io.read_count + disk_io.write_count
# Нормализуем к проценту (это приблизительная оценка)
# На macOS обычно нормальная нагрузка до 1000-5000 операций в секунду
if total_ops < 1000:
return 10
elif total_ops < 5000:
return 30
elif total_ops < 10000:
return 50
elif total_ops < 20000:
return 70
else:
return 90
except:
return 0
def should_send_startup_status(self) -> bool:
"""Проверка, нужно ли отправить статус при запуске"""
return self.last_status_time is None
async def send_startup_message(self):
"""Отправка сообщения о запуске бота"""
try:
message = f"""🚀 **Бот запущен!**
---------------------------------
**Время запуска:** <code>{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</code>
**Сервер:** `{psutil.os.uname().nodename}`
**Система:** {psutil.os.uname().sysname} {psutil.os.uname().release}
**ОС:** {self.os_type.upper()}
✅ Мониторинг сервера активирован
✅ Статус будет отправляться каждые 30 минут (в 00 и 30 минут часа)
✅ Алерты будут отправляться при превышении пороговых значений
---------------------------------"""
await self.bot.send_message(
chat_id=self.important_logs,
text=message,
parse_mode='HTML'
)
logger.info("Сообщение о запуске бота отправлено")
except Exception as e:
logger.error(f"Ошибка при отправке сообщения о запуске: {e}")
async def send_shutdown_message(self):
"""Отправка сообщения об отключении бота"""
try:
# Получаем финальную информацию о системе
system_info = self.get_system_info()
if not system_info:
system_info = {
'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'server_hostname': psutil.os.uname().nodename
}
message = f"""🛑 **Бот отключен!**
---------------------------------
**Время отключения:** <code>{system_info['current_time']}</code>
**Сервер:** `{system_info['server_hostname']}`
❌ Мониторинг сервера остановлен
❌ Статус больше не будет отправляться
❌ Алерты отключены
⚠️ **Внимание:** Проверьте состояние сервера!
---------------------------------"""
await self.bot.send_message(
chat_id=self.important_logs,
text=message,
parse_mode='HTML'
)
logger.info("Сообщение об отключении бота отправлено")
except Exception as e:
logger.error(f"Ошибка при отправке сообщения об отключении: {e}")
def check_alerts(self, system_info: Dict) -> Tuple[bool, Optional[str]]:
"""Проверка необходимости отправки алертов"""
alerts = []
# Проверка CPU
if system_info['cpu_percent'] > self.threshold and not self.alert_states['cpu']:
self.alert_states['cpu'] = True
alerts.append(('cpu', system_info['cpu_percent'], f"Нагрузка за 1 мин: {system_info['load_avg_1m']}"))
# Проверка RAM
if system_info['ram_percent'] > self.threshold and not self.alert_states['ram']:
self.alert_states['ram'] = True
alerts.append(('ram', system_info['ram_percent'], f"Используется: {system_info['ram_used']} GB из {system_info['ram_total']} GB"))
# Проверка диска
if system_info['disk_percent'] > self.threshold and not self.alert_states['disk']:
self.alert_states['disk'] = True
alerts.append(('disk', system_info['disk_percent'], f"Свободно: {system_info['disk_free']} GB на /"))
# Проверка восстановления
recoveries = []
if system_info['cpu_percent'] < self.recovery_threshold and self.alert_states['cpu']:
self.alert_states['cpu'] = False
recoveries.append(('cpu', system_info['cpu_percent']))
if system_info['ram_percent'] < self.recovery_threshold and self.alert_states['ram']:
self.alert_states['ram'] = False
recoveries.append(('ram', system_info['ram_percent']))
if system_info['disk_percent'] < self.recovery_threshold and self.alert_states['disk']:
self.alert_states['disk'] = False
recoveries.append(('disk', system_info['disk_percent']))
return alerts, recoveries
async def send_status_message(self, system_info: Dict):
"""Отправка сообщения со статусом сервера"""
try:
voice_bot_status, voice_bot_uptime = self.check_process_status('voice_bot')
helper_bot_status, helper_bot_uptime = self.check_process_status('helper_bot')
# Получаем эмодзи для дискового пространства
disk_emoji = self._get_disk_space_emoji(system_info['disk_percent'])
message = f"""🖥 **Статус Сервера** | <code>{system_info['current_time']}</code>
---------------------------------
**📊 Общая нагрузка:**
CPU: <b>{system_info['cpu_percent']}%</b> | LA: <b>{system_info['load_avg_1m']} / {system_info['cpu_count']}</b> | IO Wait: <b>{system_info['disk_percent']}%</b>
**💾 Память:**
RAM: <b>{system_info['ram_used']}/{system_info['ram_total']} GB</b> ({system_info['ram_percent']}%)
Swap: <b>{system_info['swap_used']}/{system_info['swap_total']} GB</b> ({system_info['swap_percent']}%)
**🗂️ Дисковое пространство:**
Диск (/): <b>{system_info['disk_used']}/{system_info['disk_total']} GB</b> ({system_info['disk_percent']}%) {disk_emoji}
**💿 Диск I/O:**
Read: <b>{system_info['disk_read_speed']}</b> | Write: <b>{system_info['disk_write_speed']}</b>
Диск загружен: <b>{system_info['disk_io_percent']}%</b>
**🤖 Процессы:**
{voice_bot_status} voice-bot - {voice_bot_uptime}
{helper_bot_status} helper-bot - {helper_bot_uptime}
---------------------------------
⏰ Uptime сервера: {system_info['system_uptime']}"""
await self.bot.send_message(
chat_id=self.group_for_logs,
text=message,
parse_mode='HTML'
)
logger.info("Статус сервера отправлен")
except Exception as e:
logger.error(f"Ошибка при отправке статуса сервера: {e}")
async def send_alert_message(self, metric_name: str, current_value: float, details: str):
"""Отправка сообщения об алерте"""
try:
message = f"""🚨 **ALERT: Высокая нагрузка на сервере!**
---------------------------------
**Показатель:** {metric_name}
**Текущее значение:** <b>{current_value}%</b> ⚠️
**Пороговое значение:** 80%
**Детали:**
{details}
**Сервер:** `{psutil.os.uname().nodename}`
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
---------------------------------"""
await self.bot.send_message(
chat_id=self.important_logs,
text=message,
parse_mode='HTML'
)
logger.warning(f"Алерт отправлен: {metric_name} - {current_value}%")
except Exception as e:
logger.error(f"Ошибка при отправке алерта: {e}")
async def send_recovery_message(self, metric_name: str, current_value: float, peak_value: float):
"""Отправка сообщения о восстановлении"""
try:
message = f"""✅ **RECOVERY: Нагрузка нормализовалась**
---------------------------------
**Показатель:** {metric_name}
**Текущее значение:** <b>{current_value}%</b> ✔️
**Было превышение:** До {peak_value}%
**Сервер:** `{psutil.os.uname().nodename}`
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
---------------------------------"""
await self.bot.send_message(
chat_id=self.important_logs,
text=message,
parse_mode='HTML'
)
logger.info(f"Сообщение о восстановлении отправлено: {metric_name}")
except Exception as e:
logger.error(f"Ошибка при отправке сообщения о восстановлении: {e}")
async def monitor_loop(self):
"""Основной цикл мониторинга"""
logger.info(f"Модуль мониторинга сервера запущен на {self.os_type.upper()}")
# Отправляем сообщение о запуске при первом запуске
if self.should_send_startup_status():
await self.send_startup_message()
while True:
try:
system_info = self.get_system_info()
if not system_info:
await asyncio.sleep(60)
continue
# Проверка алертов
alerts, recoveries = self.check_alerts(system_info)
# Отправка алертов
for metric_type, value, details in alerts:
metric_names = {
'cpu': 'Использование CPU',
'ram': 'Использование оперативной памяти',
'disk': 'Заполнение диска (/)'
}
await self.send_alert_message(metric_names[metric_type], value, details)
# Отправка сообщений о восстановлении
for metric_type, value in recoveries:
metric_names = {
'cpu': 'Использование CPU',
'ram': 'Использование оперативной памяти',
'disk': 'Заполнение диска (/)'
}
# Находим пиковое значение (используем 80% как пример)
await self.send_recovery_message(metric_names[metric_type], value, 80.0)
# Отправка статуса каждые 30 минут в 00 и 30 минут часа
if self.should_send_status():
await self.send_status_message(system_info)
# Пауза между проверками (1 минута)
await asyncio.sleep(60)
except Exception as e:
logger.error(f"Ошибка в цикле мониторинга: {e}")
await asyncio.sleep(60)