- Modified Grafana dashboards - Updated message sender and metrics collector - Added new rate limiting dashboard - Removed count_tests.py
342 lines
17 KiB
Python
342 lines
17 KiB
Python
import os
|
||
import aiohttp
|
||
import logging
|
||
from datetime import datetime
|
||
from typing import Dict, List, Tuple
|
||
try:
|
||
from .metrics_collector import MetricsCollector
|
||
except ImportError:
|
||
from metrics_collector import MetricsCollector
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class MessageSender:
|
||
def __init__(self):
|
||
# Получаем переменные окружения
|
||
self.telegram_bot_token = os.getenv('TELEGRAM_MONITORING_BOT_TOKEN')
|
||
self.group_for_logs = os.getenv('GROUP_MONITORING_FOR_LOGS')
|
||
self.important_logs = os.getenv('IMPORTANT_MONITORING_LOGS')
|
||
|
||
# Интервал отправки статуса в минутах (по умолчанию 2 минуты)
|
||
self.status_update_interval_minutes = int(os.getenv('STATUS_UPDATE_INTERVAL_MINUTES', 2))
|
||
|
||
# Создаем экземпляр сборщика метрик
|
||
self.metrics_collector = MetricsCollector()
|
||
|
||
# Время последней отправки статуса
|
||
self.last_status_time = None
|
||
|
||
if not self.telegram_bot_token:
|
||
logger.warning("TELEGRAM_MONITORING_BOT_TOKEN не установлен в переменных окружения")
|
||
if not self.group_for_logs:
|
||
logger.warning("GROUP_MONITORING_FOR_LOGS не установлен в переменных окружения")
|
||
if not self.important_logs:
|
||
logger.warning("IMPORTANT_MONITORING_LOGS не установлен в переменных окружения")
|
||
|
||
logger.info(f"Интервал отправки статуса установлен: {self.status_update_interval_minutes} минут")
|
||
|
||
async def send_telegram_message(self, chat_id: str, message: str) -> bool:
|
||
"""Отправка сообщения в Telegram через прямое обращение к API"""
|
||
if not self.telegram_bot_token:
|
||
logger.error("TELEGRAM_MONITORING_BOT_TOKEN не установлен")
|
||
return False
|
||
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
|
||
payload = {
|
||
"chat_id": chat_id,
|
||
"text": message,
|
||
"parse_mode": "HTML"
|
||
}
|
||
|
||
async with session.post(url, json=payload) as response:
|
||
if response.status == 200:
|
||
logger.info(f"Сообщение успешно отправлено в чат {chat_id}")
|
||
return True
|
||
else:
|
||
response_text = await response.text()
|
||
logger.error(f"Ошибка отправки в Telegram: {response.status} - {response_text}")
|
||
return False
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при отправке сообщения в Telegram: {e}")
|
||
return False
|
||
|
||
def should_send_status(self) -> bool:
|
||
"""Проверка, нужно ли отправить статус (каждые N минут)"""
|
||
now = datetime.now()
|
||
|
||
# Логируем для диагностики
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
if self.last_status_time is None:
|
||
logger.info(f"should_send_status: last_status_time is None, отправляем статус")
|
||
return True
|
||
|
||
# Вычисляем разницу в минутах
|
||
time_diff_minutes = (now - self.last_status_time).total_seconds() / 60
|
||
logger.info(f"should_send_status: прошло {time_diff_minutes:.1f} минут с последней отправки, нужно {self.status_update_interval_minutes} минут")
|
||
|
||
# Проверяем, что прошло N минут с последней отправки
|
||
if time_diff_minutes >= self.status_update_interval_minutes:
|
||
logger.info(f"should_send_status: отправляем статус (прошло {time_diff_minutes:.1f} минут)")
|
||
return True
|
||
|
||
logger.info(f"should_send_status: статус не отправляем (прошло {time_diff_minutes:.1f} минут)")
|
||
return False
|
||
|
||
def should_send_startup_status(self) -> bool:
|
||
"""Проверка, нужно ли отправить статус при запуске"""
|
||
# Отправляем статус при запуске только если он еще не был отправлен
|
||
if self.last_status_time is None:
|
||
logger.info("should_send_startup_status: отправляем статус при запуске")
|
||
return True
|
||
logger.info("should_send_startup_status: статус уже был отправлен, пропускаем")
|
||
return False
|
||
|
||
def _get_disk_space_emoji(self, disk_percent: float) -> str:
|
||
"""Получение эмодзи для дискового пространства"""
|
||
if disk_percent < 60:
|
||
return "🟢"
|
||
elif disk_percent < 90:
|
||
return "⚠️"
|
||
else:
|
||
return "🚨"
|
||
|
||
def _get_cpu_emoji(self, cpu_percent: float) -> str:
|
||
"""Получение эмодзи для CPU"""
|
||
if cpu_percent < 50:
|
||
return "🟢"
|
||
elif cpu_percent < 80:
|
||
return "⚠️"
|
||
else:
|
||
return "🚨"
|
||
|
||
def _get_memory_emoji(self, memory_percent: float) -> str:
|
||
"""Получение эмодзи для памяти (RAM/Swap)"""
|
||
if memory_percent < 60:
|
||
return "🟢"
|
||
elif memory_percent < 85:
|
||
return "⚠️"
|
||
else:
|
||
return "🚨"
|
||
|
||
def _get_load_average_emoji(self, load_avg: float, cpu_count: int) -> str:
|
||
"""Получение эмодзи для Load Average"""
|
||
# Load Average считается нормальным если < 1.0 на ядро
|
||
# Критичным если > 2.0 на ядро
|
||
load_per_core = load_avg / cpu_count
|
||
if load_per_core < 1.0:
|
||
return "🟢"
|
||
elif load_per_core < 2.0:
|
||
return "⚠️"
|
||
else:
|
||
return "🚨"
|
||
|
||
def _get_io_wait_emoji(self, io_wait_percent: float) -> str:
|
||
"""Получение эмодзи для IO Wait"""
|
||
# IO Wait считается нормальным если < 5%
|
||
# Критичным если > 20%
|
||
if io_wait_percent < 5:
|
||
return "🟢"
|
||
elif io_wait_percent < 20:
|
||
return "⚠️"
|
||
else:
|
||
return "🚨"
|
||
|
||
def get_status_message(self, system_info: Dict) -> str:
|
||
"""Формирование сообщения со статусом сервера"""
|
||
try:
|
||
helper_bot_status, helper_bot_uptime = self.metrics_collector.check_process_status('helper_bot')
|
||
|
||
# Получаем эмодзи для всех метрик
|
||
cpu_emoji = self._get_cpu_emoji(system_info['cpu_percent'])
|
||
ram_emoji = self._get_memory_emoji(system_info['ram_percent'])
|
||
swap_emoji = self._get_memory_emoji(system_info['swap_percent'])
|
||
la_emoji = self._get_load_average_emoji(system_info['load_avg_1m'], system_info['cpu_count'])
|
||
io_wait_emoji = self._get_io_wait_emoji(system_info['io_wait_percent'])
|
||
disk_emoji = self._get_disk_space_emoji(system_info['disk_percent'])
|
||
|
||
# Определяем уровень мониторинга
|
||
monitoring_level = system_info.get('monitoring_level', 'unknown')
|
||
level_emoji = "🖥️" if monitoring_level == 'host' else "📦"
|
||
level_text = "Хост" if monitoring_level == 'host' else "Контейнер"
|
||
|
||
message = f"""{level_emoji} **Статус {level_text}** | <code>{system_info['current_time']}</code>
|
||
---------------------------------
|
||
**📊 Общая нагрузка:**
|
||
CPU: <b>{system_info['cpu_percent']}%</b> {cpu_emoji} | LA: <b>{system_info['load_avg_1m']} / {system_info['cpu_count']}</b> {la_emoji} | IO Wait: <b>{system_info['io_wait_percent']}%</b> {io_wait_emoji}
|
||
|
||
**💾 Память:**
|
||
RAM: <b>{system_info['ram_used']}/{system_info['ram_total']} GB</b> ({system_info['ram_percent']}%) {ram_emoji}
|
||
Swap: <b>{system_info['swap_used']}/{system_info['swap_total']} GB</b> ({system_info['swap_percent']}%) {swap_emoji}
|
||
|
||
**🗂️ Дисковое пространство:**
|
||
Диск (/): <b>{system_info['disk_used']}/{system_info['disk_total']} GB</b> ({system_info['disk_percent']}%) {disk_emoji}
|
||
|
||
**💿 Диск I/O:**
|
||
Read: <b>{system_info['disk_read_speed']}</b> | Write: <b>{system_info['disk_write_speed']}</b>
|
||
Диск загружен: <b>{system_info['disk_io_percent']}%</b>
|
||
|
||
**🤖 Процессы:**
|
||
{helper_bot_status} helper-bot - {helper_bot_uptime}
|
||
---------------------------------
|
||
⏰ Uptime сервера: {system_info['system_uptime']}
|
||
🔍 Уровень мониторинга: {level_text} ({monitoring_level})"""
|
||
|
||
return message
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при формировании статуса сервера: {e}")
|
||
return f"Ошибка при получении статуса сервера: {e}"
|
||
|
||
def get_alert_message(self, metric_name: str, current_value: float, details: str) -> str:
|
||
"""Формирование сообщения об алерте"""
|
||
try:
|
||
# Получаем информацию о задержке для данного метрика
|
||
delay_info = ""
|
||
if hasattr(self.metrics_collector, 'alert_delays'):
|
||
metric_type = metric_name.lower().replace('использование ', '').replace('заполнение диска (/)', 'disk')
|
||
if 'cpu' in metric_type:
|
||
delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['cpu']} сек"
|
||
elif 'память' in metric_type or 'ram' in metric_type:
|
||
delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['ram']} сек"
|
||
elif 'диск' in metric_type or 'disk' in metric_type:
|
||
delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['disk']} сек"
|
||
|
||
message = f"""🚨 **ALERT: Высокая нагрузка на сервере!**
|
||
---------------------------------
|
||
**Показатель:** {metric_name}
|
||
**Текущее значение:** <b>{current_value}%</b> ⚠️
|
||
**Пороговое значение:** 80%
|
||
|
||
**Детали:**
|
||
{details}
|
||
|
||
{delay_info}
|
||
|
||
**Сервер:** `{self.metrics_collector.os_type.upper()}`
|
||
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
|
||
---------------------------------"""
|
||
|
||
return message
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при формировании алерта: {e}")
|
||
return f"Ошибка при формировании алерта: {e}"
|
||
|
||
def get_recovery_message(self, metric_name: str, current_value: float, peak_value: float) -> str:
|
||
"""Формирование сообщения о восстановлении"""
|
||
try:
|
||
message = f"""✅ **RECOVERY: Нагрузка нормализовалась**
|
||
---------------------------------
|
||
**Показатель:** {metric_name}
|
||
**Текущее значение:** <b>{current_value}%</b> ✔️
|
||
**Было превышение:** До {peak_value}%
|
||
|
||
**Сервер:** `{self.metrics_collector.os_type.upper()}`
|
||
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
|
||
---------------------------------"""
|
||
|
||
return message
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при формировании сообщения о восстановлении: {e}")
|
||
return f"Ошибка при формировании сообщения о восстановлении: {e}"
|
||
|
||
async def send_status_message(self) -> bool:
|
||
"""Отправка статуса сервера в группу логов"""
|
||
if not self.group_for_logs:
|
||
logger.warning("GROUP_MONITORING_FOR_LOGS не установлен, пропускаем отправку статуса")
|
||
return False
|
||
|
||
try:
|
||
system_info = self.metrics_collector.get_system_info()
|
||
if not system_info:
|
||
logger.error("Не удалось получить информацию о системе")
|
||
return False
|
||
|
||
status_message = self.get_status_message(system_info)
|
||
success = await self.send_telegram_message(self.group_for_logs, status_message)
|
||
|
||
# Обновляем время последней отправки только при успешной отправке
|
||
if success:
|
||
self.last_status_time = datetime.now()
|
||
logger.info("send_status_message: время последней отправки обновлено")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при отправке статуса: {e}")
|
||
return False
|
||
|
||
async def send_alert_message(self, metric_type: str, current_value: float, details: str) -> bool:
|
||
"""Отправка сообщения об алерте в важные логи"""
|
||
if not self.important_logs:
|
||
logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку алерта")
|
||
return False
|
||
|
||
try:
|
||
metric_names = {
|
||
'cpu': 'Использование CPU',
|
||
'ram': 'Использование оперативной памяти',
|
||
'disk': 'Заполнение диска (/)'
|
||
}
|
||
|
||
metric_name = metric_names.get(metric_type, metric_type)
|
||
alert_message = self.get_alert_message(metric_name, current_value, details)
|
||
return await self.send_telegram_message(self.important_logs, alert_message)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при отправке алерта: {e}")
|
||
return False
|
||
|
||
async def send_recovery_message(self, metric_type: str, current_value: float, peak_value: float) -> bool:
|
||
"""Отправка сообщения о восстановлении в важные логи"""
|
||
if not self.important_logs:
|
||
logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку сообщения о восстановлении")
|
||
return False
|
||
|
||
try:
|
||
metric_names = {
|
||
'cpu': 'Использование CPU',
|
||
'ram': 'Использование оперативной памяти',
|
||
'disk': 'Заполнение диска (/)'
|
||
}
|
||
|
||
metric_name = metric_names.get(metric_type, metric_type)
|
||
recovery_message = self.get_recovery_message(metric_name, current_value, peak_value)
|
||
return await self.send_telegram_message(self.important_logs, recovery_message)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при отправке сообщения о восстановлении: {e}")
|
||
return False
|
||
|
||
async def process_alerts_and_recoveries(self) -> None:
|
||
"""Обработка алертов и восстановлений"""
|
||
try:
|
||
system_info = self.metrics_collector.get_system_info()
|
||
if not system_info:
|
||
return
|
||
|
||
# Проверка алертов
|
||
alerts, recoveries = self.metrics_collector.check_alerts(system_info)
|
||
|
||
# Отправка алертов
|
||
for metric_type, value, details in alerts:
|
||
await self.send_alert_message(metric_type, value, details)
|
||
logger.warning(f"ALERT отправлен: {metric_type} - {value}% - {details}")
|
||
|
||
# Отправка сообщений о восстановлении
|
||
for metric_type, value in recoveries:
|
||
# Находим пиковое значение для сообщения о восстановлении
|
||
peak_value = self.metrics_collector.threshold
|
||
await self.send_recovery_message(metric_type, value, peak_value)
|
||
logger.info(f"RECOVERY отправлен: {metric_type} - {value}%")
|
||
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при обработке алертов и восстановлений: {e}")
|