Files
prod/infra/monitoring/message_sender.py

253 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import aiohttp
import logging
from datetime import datetime
from typing import Dict, List, Tuple
try:
from .metrics_collector import MetricsCollector
except ImportError:
from metrics_collector import MetricsCollector
logger = logging.getLogger(__name__)
class MessageSender:
def __init__(self):
# Получаем переменные окружения
self.telegram_bot_token = os.getenv('TELEGRAM_MONITORING_BOT_TOKEN')
self.group_for_logs = os.getenv('GROUP_MONITORING_FOR_LOGS')
self.important_logs = os.getenv('IMPORTANT_MONITORING_LOGS')
# Создаем экземпляр сборщика метрик
self.metrics_collector = MetricsCollector()
# Время последней отправки статуса
self.last_status_time = None
if not self.telegram_bot_token:
logger.warning("TELEGRAM_MONITORING_BOT_TOKEN не установлен в переменных окружения")
if not self.group_for_logs:
logger.warning("GROUP_MONITORING_FOR_LOGS не установлен в переменных окружения")
if not self.important_logs:
logger.warning("IMPORTANT_MONITORING_LOGS не установлен в переменных окружения")
async def send_telegram_message(self, chat_id: str, message: str) -> bool:
"""Отправка сообщения в Telegram через прямое обращение к API"""
if not self.telegram_bot_token:
logger.error("TELEGRAM_MONITORING_BOT_TOKEN не установлен")
return False
try:
async with aiohttp.ClientSession() as session:
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message,
"parse_mode": "HTML"
}
async with session.post(url, json=payload) as response:
if response.status == 200:
logger.info(f"Сообщение успешно отправлено в чат {chat_id}")
return True
else:
response_text = await response.text()
logger.error(f"Ошибка отправки в Telegram: {response.status} - {response_text}")
return False
except Exception as e:
logger.error(f"Ошибка при отправке сообщения в Telegram: {e}")
return False
def should_send_status(self) -> bool:
"""Проверка, нужно ли отправить статус (каждые 4 часа в 00 минут)"""
now = datetime.now()
# Проверяем, что сейчас 00 минут часа и час кратен 4 (0, 4, 8, 12, 16, 20)
if now.minute == 0 and now.hour % 4 == 0:
# Проверяем, не отправляли ли мы уже статус в этот час
if (self.last_status_time is None or
self.last_status_time.hour != now.hour or
self.last_status_time.day != now.day):
self.last_status_time = now
return True
return False
def should_send_startup_status(self) -> bool:
"""Проверка, нужно ли отправить статус при запуске"""
return self.last_status_time is None
def _get_disk_space_emoji(self, disk_percent: float) -> str:
"""Получение эмодзи для дискового пространства"""
if disk_percent < 60:
return "🟢"
elif disk_percent < 90:
return "⚠️"
else:
return "🚨"
def get_status_message(self, system_info: Dict) -> str:
"""Формирование сообщения со статусом сервера"""
try:
voice_bot_status, voice_bot_uptime = self.metrics_collector.check_process_status('voice_bot')
helper_bot_status, helper_bot_uptime = self.metrics_collector.check_process_status('helper_bot')
# Получаем эмодзи для дискового пространства
disk_emoji = self._get_disk_space_emoji(system_info['disk_percent'])
message = f"""🖥 **Статус Сервера** | <code>{system_info['current_time']}</code>
---------------------------------
**📊 Общая нагрузка:**
CPU: <b>{system_info['cpu_percent']}%</b> | LA: <b>{system_info['load_avg_1m']} / {system_info['cpu_count']}</b> | IO Wait: <b>{system_info['disk_percent']}%</b>
**💾 Память:**
RAM: <b>{system_info['ram_used']}/{system_info['ram_total']} GB</b> ({system_info['ram_percent']}%)
Swap: <b>{system_info['swap_used']}/{system_info['swap_total']} GB</b> ({system_info['swap_percent']}%)
**🗂️ Дисковое пространство:**
Диск (/): <b>{system_info['disk_used']}/{system_info['disk_total']} GB</b> ({system_info['disk_percent']}%) {disk_emoji}
**💿 Диск I/O:**
Read: <b>{system_info['disk_read_speed']}</b> | Write: <b>{system_info['disk_write_speed']}</b>
Диск загружен: <b>{system_info['disk_io_percent']}%</b>
**🤖 Процессы:**
{voice_bot_status} voice-bot - {voice_bot_uptime}
{helper_bot_status} helper-bot - {helper_bot_uptime}
---------------------------------
⏰ Uptime сервера: {system_info['system_uptime']}"""
return message
except Exception as e:
logger.error(f"Ошибка при формировании статуса сервера: {e}")
return f"Ошибка при получении статуса сервера: {e}"
def get_alert_message(self, metric_name: str, current_value: float, details: str) -> str:
"""Формирование сообщения об алерте"""
try:
message = f"""🚨 **ALERT: Высокая нагрузка на сервере!**
---------------------------------
**Показатель:** {metric_name}
**Текущее значение:** <b>{current_value}%</b> ⚠️
**Пороговое значение:** 80%
**Детали:**
{details}
**Сервер:** `{self.metrics_collector.os_type.upper()}`
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
---------------------------------"""
return message
except Exception as e:
logger.error(f"Ошибка при формировании алерта: {e}")
return f"Ошибка при формировании алерта: {e}"
def get_recovery_message(self, metric_name: str, current_value: float, peak_value: float) -> str:
"""Формирование сообщения о восстановлении"""
try:
message = f"""✅ **RECOVERY: Нагрузка нормализовалась**
---------------------------------
**Показатель:** {metric_name}
**Текущее значение:** <b>{current_value}%</b> ✔️
**Было превышение:** До {peak_value}%
**Сервер:** `{self.metrics_collector.os_type.upper()}`
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
---------------------------------"""
return message
except Exception as e:
logger.error(f"Ошибка при формировании сообщения о восстановлении: {e}")
return f"Ошибка при формировании сообщения о восстановлении: {e}"
async def send_status_message(self) -> bool:
"""Отправка статуса сервера в группу логов"""
if not self.group_for_logs:
logger.warning("GROUP_MONITORING_FOR_LOGS не установлен, пропускаем отправку статуса")
return False
try:
system_info = self.metrics_collector.get_system_info()
if not system_info:
logger.error("Не удалось получить информацию о системе")
return False
status_message = self.get_status_message(system_info)
return await self.send_telegram_message(self.group_for_logs, status_message)
except Exception as e:
logger.error(f"Ошибка при отправке статуса: {e}")
return False
async def send_alert_message(self, metric_type: str, current_value: float, details: str) -> bool:
"""Отправка сообщения об алерте в важные логи"""
if not self.important_logs:
logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку алерта")
return False
try:
metric_names = {
'cpu': 'Использование CPU',
'ram': 'Использование оперативной памяти',
'disk': 'Заполнение диска (/)'
}
metric_name = metric_names.get(metric_type, metric_type)
alert_message = self.get_alert_message(metric_name, current_value, details)
return await self.send_telegram_message(self.important_logs, alert_message)
except Exception as e:
logger.error(f"Ошибка при отправке алерта: {e}")
return False
async def send_recovery_message(self, metric_type: str, current_value: float, peak_value: float) -> bool:
"""Отправка сообщения о восстановлении в важные логи"""
if not self.important_logs:
logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку сообщения о восстановлении")
return False
try:
metric_names = {
'cpu': 'Использование CPU',
'ram': 'Использование оперативной памяти',
'disk': 'Заполнение диска (/)'
}
metric_name = metric_names.get(metric_type, metric_type)
recovery_message = self.get_recovery_message(metric_name, current_value, peak_value)
return await self.send_telegram_message(self.important_logs, recovery_message)
except Exception as e:
logger.error(f"Ошибка при отправке сообщения о восстановлении: {e}")
return False
async def process_alerts_and_recoveries(self) -> None:
"""Обработка алертов и восстановлений"""
try:
system_info = self.metrics_collector.get_system_info()
if not system_info:
return
# Проверка алертов
alerts, recoveries = self.metrics_collector.check_alerts(system_info)
# Отправка алертов
for metric_type, value, details in alerts:
await self.send_alert_message(metric_type, value, details)
logger.warning(f"ALERT отправлен: {metric_type} - {value}% - {details}")
# Отправка сообщений о восстановлении
for metric_type, value in recoveries:
# Находим пиковое значение для сообщения о восстановлении
peak_value = self.metrics_collector.threshold
await self.send_recovery_message(metric_type, value, peak_value)
logger.info(f"RECOVERY отправлен: {metric_type} - {value}%")
except Exception as e:
logger.error(f"Ошибка при обработке алертов и восстановлений: {e}")