Files
prod/infra/monitoring/message_sender.py
Andrey b34da5015d Implement AnonBot integration and monitoring enhancements
- Added AnonBot service to docker-compose with resource limits and environment variables.
- Updated Makefile to include commands for AnonBot logs, restart, and dependency checks.
- Enhanced Grafana dashboards with AnonBot health metrics and database connection statistics.
- Implemented AnonBot status retrieval in the message sender for improved monitoring.
- Updated Prometheus configuration to scrape metrics from AnonBot service.
2025-09-08 23:17:24 +03:00

379 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import aiohttp
import logging
from datetime import datetime
from typing import Dict, List, Tuple
try:
from .metrics_collector import MetricsCollector
except ImportError:
from metrics_collector import MetricsCollector
logger = logging.getLogger(__name__)
class MessageSender:
def __init__(self):
# Получаем переменные окружения
self.telegram_bot_token = os.getenv('TELEGRAM_MONITORING_BOT_TOKEN')
self.group_for_logs = os.getenv('GROUP_MONITORING_FOR_LOGS')
self.important_logs = os.getenv('IMPORTANT_MONITORING_LOGS')
# Интервал отправки статуса в минутах (по умолчанию 2 минуты)
self.status_update_interval_minutes = int(os.getenv('STATUS_UPDATE_INTERVAL_MINUTES', 2))
# Создаем экземпляр сборщика метрик
self.metrics_collector = MetricsCollector()
# Время последней отправки статуса
self.last_status_time = None
if not self.telegram_bot_token:
logger.warning("TELEGRAM_MONITORING_BOT_TOKEN не установлен в переменных окружения")
if not self.group_for_logs:
logger.warning("GROUP_MONITORING_FOR_LOGS не установлен в переменных окружения")
if not self.important_logs:
logger.warning("IMPORTANT_MONITORING_LOGS не установлен в переменных окружения")
logger.info(f"Интервал отправки статуса установлен: {self.status_update_interval_minutes} минут")
async def send_telegram_message(self, chat_id: str, message: str) -> bool:
"""Отправка сообщения в Telegram через прямое обращение к API"""
if not self.telegram_bot_token:
logger.error("TELEGRAM_MONITORING_BOT_TOKEN не установлен")
return False
try:
async with aiohttp.ClientSession() as session:
url = f"https://api.telegram.org/bot{self.telegram_bot_token}/sendMessage"
payload = {
"chat_id": chat_id,
"text": message,
"parse_mode": "HTML"
}
async with session.post(url, json=payload) as response:
if response.status == 200:
logger.info(f"Сообщение успешно отправлено в чат {chat_id}")
return True
else:
response_text = await response.text()
logger.error(f"Ошибка отправки в Telegram: {response.status} - {response_text}")
return False
except Exception as e:
logger.error(f"Ошибка при отправке сообщения в Telegram: {e}")
return False
async def get_anonbot_status(self) -> Tuple[str, str]:
"""Получение статуса AnonBot через HTTP API"""
try:
async with aiohttp.ClientSession() as session:
# AnonBot доступен через Docker network
url = "http://bots_anon_bot:8081/status"
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as response:
if response.status == 200:
data = await response.json()
status = data.get('status', 'unknown')
uptime = data.get('uptime', 'unknown')
# Форматируем статус с эмодзи
if status == 'running':
status_emoji = ""
elif status == 'stopped':
status_emoji = ""
else:
status_emoji = "⚠️"
return f"{status_emoji}", uptime
else:
logger.warning(f"AnonBot API вернул статус {response.status}")
return "⚠️ AnonBot", "API недоступен"
except aiohttp.ClientError as e:
logger.warning(f"Ошибка подключения к AnonBot API: {e}")
return "", "Недоступен"
except Exception as e:
logger.error(f"Неожиданная ошибка при получении статуса AnonBot: {e}")
return "⚠️", "Ошибка"
def should_send_status(self) -> bool:
"""Проверка, нужно ли отправить статус (каждые N минут)"""
now = datetime.now()
# Логируем для диагностики
import logging
logger = logging.getLogger(__name__)
if self.last_status_time is None:
logger.info(f"should_send_status: last_status_time is None, отправляем статус")
return True
# Вычисляем разницу в минутах
time_diff_minutes = (now - self.last_status_time).total_seconds() / 60
logger.info(f"should_send_status: прошло {time_diff_minutes:.1f} минут с последней отправки, нужно {self.status_update_interval_minutes} минут")
# Проверяем, что прошло N минут с последней отправки
if time_diff_minutes >= self.status_update_interval_minutes:
logger.info(f"should_send_status: отправляем статус (прошло {time_diff_minutes:.1f} минут)")
return True
logger.info(f"should_send_status: статус не отправляем (прошло {time_diff_minutes:.1f} минут)")
return False
def should_send_startup_status(self) -> bool:
"""Проверка, нужно ли отправить статус при запуске"""
# Отправляем статус при запуске только если он еще не был отправлен
if self.last_status_time is None:
logger.info("should_send_startup_status: отправляем статус при запуске")
return True
logger.info("should_send_startup_status: статус уже был отправлен, пропускаем")
return False
def _get_disk_space_emoji(self, disk_percent: float) -> str:
"""Получение эмодзи для дискового пространства"""
if disk_percent < 60:
return "🟢"
elif disk_percent < 90:
return "⚠️"
else:
return "🚨"
def _get_cpu_emoji(self, cpu_percent: float) -> str:
"""Получение эмодзи для CPU"""
if cpu_percent < 50:
return "🟢"
elif cpu_percent < 80:
return "⚠️"
else:
return "🚨"
def _get_memory_emoji(self, memory_percent: float) -> str:
"""Получение эмодзи для памяти (RAM/Swap)"""
if memory_percent < 60:
return "🟢"
elif memory_percent < 85:
return "⚠️"
else:
return "🚨"
def _get_load_average_emoji(self, load_avg: float, cpu_count: int) -> str:
"""Получение эмодзи для Load Average"""
# Load Average считается нормальным если < 1.0 на ядро
# Критичным если > 2.0 на ядро
load_per_core = load_avg / cpu_count
if load_per_core < 1.0:
return "🟢"
elif load_per_core < 2.0:
return "⚠️"
else:
return "🚨"
def _get_io_wait_emoji(self, io_wait_percent: float) -> str:
"""Получение эмодзи для IO Wait"""
# IO Wait считается нормальным если < 5%
# Критичным если > 20%
if io_wait_percent < 5:
return "🟢"
elif io_wait_percent < 20:
return "⚠️"
else:
return "🚨"
async def get_status_message(self, system_info: Dict) -> str:
"""Формирование сообщения со статусом сервера"""
try:
helper_bot_status, helper_bot_uptime = self.metrics_collector.check_process_status('helper_bot')
# Получаем статус AnonBot
anonbot_status, anonbot_uptime = await self.get_anonbot_status()
# Получаем эмодзи для всех метрик
cpu_emoji = self._get_cpu_emoji(system_info['cpu_percent'])
ram_emoji = self._get_memory_emoji(system_info['ram_percent'])
swap_emoji = self._get_memory_emoji(system_info['swap_percent'])
la_emoji = self._get_load_average_emoji(system_info['load_avg_1m'], system_info['cpu_count'])
io_wait_emoji = self._get_io_wait_emoji(system_info['io_wait_percent'])
disk_emoji = self._get_disk_space_emoji(system_info['disk_percent'])
# Определяем уровень мониторинга
monitoring_level = system_info.get('monitoring_level', 'unknown')
level_emoji = "🖥️" if monitoring_level == 'host' else "📦"
level_text = "Хост" if monitoring_level == 'host' else "Контейнер"
message = f"""{level_emoji} **Статус {level_text}** | <code>{system_info['current_time']}</code>
---------------------------------
**📊 Общая нагрузка:**
CPU: <b>{system_info['cpu_percent']}%</b> {cpu_emoji} | LA: <b>{system_info['load_avg_1m']} / {system_info['cpu_count']}</b> {la_emoji} | IO Wait: <b>{system_info['io_wait_percent']}%</b> {io_wait_emoji}
**💾 Память:**
RAM: <b>{system_info['ram_used']}/{system_info['ram_total']} GB</b> ({system_info['ram_percent']}%) {ram_emoji}
Swap: <b>{system_info['swap_used']}/{system_info['swap_total']} GB</b> ({system_info['swap_percent']}%) {swap_emoji}
**🗂️ Дисковое пространство:**
Диск (/): <b>{system_info['disk_used']}/{system_info['disk_total']} GB</b> ({system_info['disk_percent']}%) {disk_emoji}
**💿 Диск I/O:**
Read: <b>{system_info['disk_read_speed']}</b> | Write: <b>{system_info['disk_write_speed']}</b>
Диск загружен: <b>{system_info['disk_io_percent']}%</b>
**🤖 Процессы:**
{helper_bot_status} helper-bot - {helper_bot_uptime}
{anonbot_status} AnonBot - {anonbot_uptime}
---------------------------------
⏰ Uptime сервера: {system_info['system_uptime']}
🔍 Уровень мониторинга: {level_text} ({monitoring_level})"""
return message
except Exception as e:
logger.error(f"Ошибка при формировании статуса сервера: {e}")
return f"Ошибка при получении статуса сервера: {e}"
def get_alert_message(self, metric_name: str, current_value: float, details: str) -> str:
"""Формирование сообщения об алерте"""
try:
# Получаем информацию о задержке для данного метрика
delay_info = ""
if hasattr(self.metrics_collector, 'alert_delays'):
metric_type = metric_name.lower().replace('использование ', '').replace('заполнение диска (/)', 'disk')
if 'cpu' in metric_type:
delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['cpu']} сек"
elif 'память' in metric_type or 'ram' in metric_type:
delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['ram']} сек"
elif 'диск' in metric_type or 'disk' in metric_type:
delay_info = f"⏱️ Задержка срабатывания: {self.metrics_collector.alert_delays['disk']} сек"
message = f"""🚨 **ALERT: Высокая нагрузка на сервере!**
---------------------------------
**Показатель:** {metric_name}
**Текущее значение:** <b>{current_value}%</b> ⚠️
**Пороговое значение:** 80%
**Детали:**
{details}
{delay_info}
**Сервер:** `{self.metrics_collector.os_type.upper()}`
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
---------------------------------"""
return message
except Exception as e:
logger.error(f"Ошибка при формировании алерта: {e}")
return f"Ошибка при формировании алерта: {e}"
def get_recovery_message(self, metric_name: str, current_value: float, peak_value: float) -> str:
"""Формирование сообщения о восстановлении"""
try:
message = f"""✅ **RECOVERY: Нагрузка нормализовалась**
---------------------------------
**Показатель:** {metric_name}
**Текущее значение:** <b>{current_value}%</b> ✔️
**Было превышение:** До {peak_value}%
**Сервер:** `{self.metrics_collector.os_type.upper()}`
**Время:** `{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}`
---------------------------------"""
return message
except Exception as e:
logger.error(f"Ошибка при формировании сообщения о восстановлении: {e}")
return f"Ошибка при формировании сообщения о восстановлении: {e}"
async def send_status_message(self) -> bool:
"""Отправка статуса сервера в группу логов"""
if not self.group_for_logs:
logger.warning("GROUP_MONITORING_FOR_LOGS не установлен, пропускаем отправку статуса")
return False
try:
system_info = self.metrics_collector.get_system_info()
if not system_info:
logger.error("Не удалось получить информацию о системе")
return False
status_message = await self.get_status_message(system_info)
success = await self.send_telegram_message(self.group_for_logs, status_message)
# Обновляем время последней отправки только при успешной отправке
if success:
self.last_status_time = datetime.now()
logger.info("send_status_message: время последней отправки обновлено")
return success
except Exception as e:
logger.error(f"Ошибка при отправке статуса: {e}")
return False
async def send_alert_message(self, metric_type: str, current_value: float, details: str) -> bool:
"""Отправка сообщения об алерте в важные логи"""
if not self.important_logs:
logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку алерта")
return False
try:
metric_names = {
'cpu': 'Использование CPU',
'ram': 'Использование оперативной памяти',
'disk': 'Заполнение диска (/)'
}
metric_name = metric_names.get(metric_type, metric_type)
alert_message = self.get_alert_message(metric_name, current_value, details)
return await self.send_telegram_message(self.important_logs, alert_message)
except Exception as e:
logger.error(f"Ошибка при отправке алерта: {e}")
return False
async def send_recovery_message(self, metric_type: str, current_value: float, peak_value: float) -> bool:
"""Отправка сообщения о восстановлении в важные логи"""
if not self.important_logs:
logger.warning("IMPORTANT_MONITORING_LOGS не установлен, пропускаем отправку сообщения о восстановлении")
return False
try:
metric_names = {
'cpu': 'Использование CPU',
'ram': 'Использование оперативной памяти',
'disk': 'Заполнение диска (/)'
}
metric_name = metric_names.get(metric_type, metric_type)
recovery_message = self.get_recovery_message(metric_name, current_value, peak_value)
return await self.send_telegram_message(self.important_logs, recovery_message)
except Exception as e:
logger.error(f"Ошибка при отправке сообщения о восстановлении: {e}")
return False
async def process_alerts_and_recoveries(self) -> None:
"""Обработка алертов и восстановлений"""
try:
system_info = self.metrics_collector.get_system_info()
if not system_info:
return
# Проверка алертов
alerts, recoveries = self.metrics_collector.check_alerts(system_info)
# Отправка алертов
for metric_type, value, details in alerts:
await self.send_alert_message(metric_type, value, details)
logger.warning(f"ALERT отправлен: {metric_type} - {value}% - {details}")
# Отправка сообщений о восстановлении
for metric_type, value in recoveries:
# Находим пиковое значение для сообщения о восстановлении
peak_value = self.metrics_collector.threshold
await self.send_recovery_message(metric_type, value, peak_value)
logger.info(f"RECOVERY отправлен: {metric_type} - {value}%")
except Exception as e:
logger.error(f"Ошибка при обработке алертов и восстановлений: {e}")