import os import aiohttp import logging from datetime import datetime from typing import Dict, List, Tuple try: from .metrics_collector import MetricsCollector except ImportError: from metrics_collector import MetricsCollector logger = logging.getLogger(__name__) class MessageSender: def __init__(self): # Получаем переменные окружения self.telegram_bot_token = os.getenv('TELEGRAM_MONITORING_BOT_TOKEN') self.group_for_logs = os.getenv('GROUP_MONITORING_FOR_LOGS') self.important_logs = os.getenv('IMPORTANT_MONITORING_LOGS') # Интервал отправки статуса в минутах (по умолчанию 2 минуты) self.status_update_interval_minutes = int(os.getenv('STATUS_UPDATE_INTERVAL_MINUTES', 2)) # Создаем экземпляр сборщика метрик self.metrics_collector = MetricsCollector() # Время последней отправки статуса self.last_status_time = None if not self.telegram_bot_token: logger.warning("TELEGRAM_MONITORING_BOT_TOKEN не установлен в переменных окружения") if not self.group_for_logs: logger.warning("GROUP_MONITORING_FOR_LOGS не установлен в переменных окружения") if not self.important_logs: logger.warning("IMPORTANT_MONITORING_LOGS не установлен в переменных окружения") logger.info(f"Интервал отправки статуса установлен: {self.status_update_interval_minutes} минут") async def send_telegram_message(self, chat_id: str, message: str) -> bool: """Отправка сообщения в Telegram через прямое обращение к API""" if not self.telegram_bot_token: logger.error("TELEGRAM_MONITORING_BOT_TOKEN не установлен") return False try: async with aiohttp.ClientSession() as session: url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage" payload = { "chat_id": chat_id, "text": message, "parse_mode": "HTML" } async with session.post(url, json=payload) as response: if response.status == 200: logger.info(f"Сообщение успешно отправлено в чат {chat_id}") return True else: response_text = await response.text() logger.error(f"Ошибка отправки в Telegram: {response.status} - {response_text}") return False except Exception as e: logger.error(f"Ошибка при отправке сообщения в Telegram: {e}") return False def should_send_status(self) -> bool: """Проверка, нужно ли отправить статус (каждые N минут)""" now = datetime.now() # Логируем для диагностики import logging logger = logging.getLogger(__name__) if self.last_status_time is None: logger.info(f"should_send_status: last_status_time is None, отправляем статус") self.last_status_time = now return True # Вычисляем разницу в минутах time_diff_minutes = (now - self.last_status_time).total_seconds() / 60 logger.info(f"should_send_status: прошло {time_diff_minutes:.1f} минут с последней отправки, нужно {self.status_update_interval_minutes} минут") # Проверяем, что прошло N минут с последней отправки if time_diff_minutes >= self.status_update_interval_minutes: logger.info(f"should_send_status: отправляем статус (прошло {time_diff_minutes:.1f} минут)") self.last_status_time = now return True logger.info(f"should_send_status: статус не отправляем (прошло {time_diff_minutes:.1f} минут)") return False def should_send_startup_status(self) -> bool: """Проверка, нужно ли отправить статус при запуске""" return self.last_status_time is None def _get_disk_space_emoji(self, disk_percent: float) -> str: """Получение эмодзи для дискового пространства""" if disk_percent < 60: return "🟢" elif disk_percent < 90: return "⚠️" else: return "🚨" def _get_cpu_emoji(self, cpu_percent: float) -> str: """Получение эмодзи для CPU""" if cpu_percent < 50: return "🟢" elif cpu_percent < 80: return "⚠️" else: return "🚨" def _get_memory_emoji(self, memory_percent: float) -> str: """Получение эмодзи для памяти (RAM/Swap)""" if memory_percent < 60: return "🟢" elif memory_percent < 85: return "⚠️" else: return "🚨" def _get_load_average_emoji(self, load_avg: float, cpu_count: int) -> str: """Получение эмодзи для Load Average""" # Load Average считается нормальным если < 1.0 на ядро # Критичным если > 2.0 на ядро load_per_core = load_avg / cpu_count if load_per_core < 1.0: return "🟢" elif load_per_core < 2.0: return "⚠️" else: return "🚨" def _get_io_wait_emoji(self, io_wait_percent: float) -> str: """Получение эмодзи для IO Wait""" # IO Wait считается нормальным если < 5% # Критичным если > 20% if io_wait_percent < 5: return "🟢" elif io_wait_percent < 20: return "⚠️" else: return "🚨" def get_status_message(self, system_info: Dict) -> str: """Формирование сообщения со статусом сервера""" try: helper_bot_status, helper_bot_uptime = self.metrics_collector.check_process_status('helper_bot') # Получаем эмодзи для всех метрик cpu_emoji = self._get_cpu_emoji(system_info['cpu_percent']) ram_emoji = self._get_memory_emoji(system_info['ram_percent']) swap_emoji = self._get_memory_emoji(system_info['swap_percent']) la_emoji = self._get_load_average_emoji(system_info['load_avg_1m'], system_info['cpu_count']) io_wait_emoji = self._get_io_wait_emoji(system_info['io_wait_percent']) disk_emoji = self._get_disk_space_emoji(system_info['disk_percent']) # Определяем уровень мониторинга monitoring_level = system_info.get('monitoring_level', 'unknown') level_emoji = "🖥️" if monitoring_level == 'host' else "📦" level_text = "Хост" if monitoring_level == 'host' else "Контейнер" message = f"""{level_emoji} **Статус {level_text}** | {system_info['current_time']} --------------------------------- **📊 Общая нагрузка:** CPU: {system_info['cpu_percent']}% {cpu_emoji} | LA: {system_info['load_avg_1m']} / {system_info['cpu_count']} {la_emoji} | IO Wait: {system_info['io_wait_percent']}% {io_wait_emoji} **💾 Память:** RAM: {system_info['ram_used']}/{system_info['ram_total']} GB ({system_info['ram_percent']}%) {ram_emoji} Swap: {system_info['swap_used']}/{system_info['swap_total']} GB ({system_info['swap_percent']}%) {swap_emoji} **🗂️ Дисковое пространство:** Диск (/): {system_info['disk_used']}/{system_info['disk_total']} GB ({system_info['disk_percent']}%) {disk_emoji} **💿 Диск I/O:** Read: {system_info['disk_read_speed']} | Write: {system_info['disk_write_speed']} Диск загружен: {system_info['disk_io_percent']}% **🤖 Процессы:** {helper_bot_status} helper-bot - {helper_bot_uptime} --------------------------------- ⏰ Uptime сервера: {system_info['system_uptime']} 🔍 Уровень мониторинга: {level_text} ({monitoring_level})""" return message except Exception as e: logger.error(f"Ошибка при формировании статуса сервера: {e}") return f"Ошибка при получении статуса сервера: {e}" def get_alert_message(self, metric_name: str, current_value: float, details: str) -> str: """Формирование сообщения об алерте""" try: # Получаем информацию о задержке для данного метрика delay_info = "" if hasattr(self.metrics_collector, 'alert_delays'): metric_type = metric_name.lower().replace('использование ', '').replace('заполнение диска (/)', 'disk') if 'cpu' in metric_type: delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['cpu']} сек" elif 'память' in metric_type or 'ram' in metric_type: delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['ram']} сек" elif 'диск' in metric_type or 'disk' in metric_type: delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['disk']} сек" message = f"""🚨 **ALERT: Высокая нагрузка на сервере!** --------------------------------- **Показатель:** {metric_name} **Текущее значение:** {current_value}% ⚠️ **Пороговое значение:** 80% **Детали:** {details} {delay_info} **Сервер:** `{self.metrics_collector.os_type.upper()}` **Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}` ---------------------------------""" return message except Exception as e: logger.error(f"Ошибка при формировании алерта: {e}") return f"Ошибка при формировании алерта: {e}" def get_recovery_message(self, metric_name: str, current_value: float, peak_value: float) -> str: """Формирование сообщения о восстановлении""" try: message = f"""✅ **RECOVERY: Нагрузка нормализовалась** --------------------------------- **Показатель:** {metric_name} **Текущее значение:** {current_value}% ✔️ **Было превышение:** До {peak_value}% **Сервер:** `{self.metrics_collector.os_type.upper()}` **Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}` ---------------------------------""" return message except Exception as e: logger.error(f"Ошибка при формировании сообщения о восстановлении: {e}") return f"Ошибка при формировании сообщения о восстановлении: {e}" async def send_status_message(self) -> bool: """Отправка статуса сервера в группу логов""" if not self.group_for_logs: logger.warning("GROUP_MONITORING_FOR_LOGS не установлен, пропускаем отправку статуса") return False try: system_info = self.metrics_collector.get_system_info() if not system_info: logger.error("Не удалось получить информацию о системе") return False status_message = self.get_status_message(system_info) return await self.send_telegram_message(self.group_for_logs, status_message) except Exception as e: logger.error(f"Ошибка при отправке статуса: {e}") return False async def send_alert_message(self, metric_type: str, current_value: float, details: str) -> bool: """Отправка сообщения об алерте в важные логи""" if not self.important_logs: logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку алерта") return False try: metric_names = { 'cpu': 'Использование CPU', 'ram': 'Использование оперативной памяти', 'disk': 'Заполнение диска (/)' } metric_name = metric_names.get(metric_type, metric_type) alert_message = self.get_alert_message(metric_name, current_value, details) return await self.send_telegram_message(self.important_logs, alert_message) except Exception as e: logger.error(f"Ошибка при отправке алерта: {e}") return False async def send_recovery_message(self, metric_type: str, current_value: float, peak_value: float) -> bool: """Отправка сообщения о восстановлении в важные логи""" if not self.important_logs: logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку сообщения о восстановлении") return False try: metric_names = { 'cpu': 'Использование CPU', 'ram': 'Использование оперативной памяти', 'disk': 'Заполнение диска (/)' } metric_name = metric_names.get(metric_type, metric_type) recovery_message = self.get_recovery_message(metric_name, current_value, peak_value) return await self.send_telegram_message(self.important_logs, recovery_message) except Exception as e: logger.error(f"Ошибка при отправке сообщения о восстановлении: {e}") return False async def process_alerts_and_recoveries(self) -> None: """Обработка алертов и восстановлений""" try: system_info = self.metrics_collector.get_system_info() if not system_info: return # Проверка алертов alerts, recoveries = self.metrics_collector.check_alerts(system_info) # Отправка алертов for metric_type, value, details in alerts: await self.send_alert_message(metric_type, value, details) logger.warning(f"ALERT отправлен: {metric_type} - {value}% - {details}") # Отправка сообщений о восстановлении for metric_type, value in recoveries: # Находим пиковое значение для сообщения о восстановлении peak_value = self.metrics_collector.threshold await self.send_recovery_message(metric_type, value, peak_value) logger.info(f"RECOVERY отправлен: {metric_type} - {value}%") except Exception as e: logger.error(f"Ошибка при обработке алертов и восстановлений: {e}")