From 23c30a78e28f224295bdfa0bb8bf4c92bb31cd2a Mon Sep 17 00:00:00 2001 From: Andrey Date: Mon, 8 Sep 2025 23:18:55 +0300 Subject: [PATCH] Remove .env_example file and implement MetricsUpdater service for enhanced metrics tracking. Update bot.py to start and stop metrics updater, and improve database connection handling in CRUD operations with metrics tracking. Update README with details on metrics issues and fixes. --- README.md | 189 +++++++++++++++++ bot.py | 9 + database/crud.py | 152 +++++++++++--- .env_example => env_example | 0 scripts/diagnose_db_connections.py | 96 +++++++++ services/infrastructure/__init__.py | 4 + services/infrastructure/database.py | 12 +- .../infrastructure/db_metrics_decorator.py | 69 ++++++ services/infrastructure/http_server.py | 31 +-- services/infrastructure/metrics.py | 35 +++- services/infrastructure/metrics_updater.py | 196 ++++++++++++++++++ 11 files changed, 744 insertions(+), 49 deletions(-) rename .env_example => env_example (100%) create mode 100755 scripts/diagnose_db_connections.py create mode 100644 services/infrastructure/db_metrics_decorator.py create mode 100644 services/infrastructure/metrics_updater.py diff --git a/README.md b/README.md index 074ff92..61ecf85 100644 --- a/README.md +++ b/README.md @@ -1300,6 +1300,195 @@ ports: - Использовать reverse proxy с аутентификацией - Настроить TLS для HTTPS +## 🔧 Исправление проблем с метриками + +### 🔍 Найденные проблемы + +В дашбордах Grafana не отображались следующие метрики: +- Database Connections - Active +- Database Performance - Query Duration +- Active Questions +- Active Users +- Answers per Minute +- Live Activity - Active Users + +### 🛠️ Внесенные исправления + +#### 1. Создан сервис MetricsUpdater (`services/infrastructure/metrics_updater.py`) + +**Исправление циклической зависимости**: Первоначально возникла циклическая зависимость между `metrics_updater.py` и `dependencies.py`. Проблема была решена путем: +- Удаления импорта `get_database_service` из `dependencies` +- Передачи пути к БД напрямую в конструктор `MetricsUpdater` +- Создания собственного экземпляра `DatabaseService` внутри `MetricsUpdater` + +**Исправление использования логгера**: Первоначально использовался `loguru` напрямую, но в проекте уже есть настроенная система логирования. Исправлено: +- Заменен `from loguru import logger` на `from .logger import get_logger` +- Используется `self.logger = get_logger(__name__)` в конструкторе +- Все вызовы `logger` заменены на `self.logger` + +**Проблема**: Методы `set_active_users()` и `set_active_questions()` были определены в `MetricsService`, но нигде не вызывались. + +**Решение**: Создан сервис `MetricsUpdater`, который: +- Периодически обновляет количество активных пользователей (за последние 24 часа) +- Периодически обновляет количество активных вопросов (статус pending/processing) +- Обновляет метрики соединений с БД +- Запускается автоматически при старте бота + +#### 2. Создан декоратор для метрик БД (`services/infrastructure/db_metrics_decorator.py`) + +**Проблема**: Методы `record_db_connection()` и `record_db_query()` были определены, но не интегрированы в код БД. + +**Решение**: Создан декоратор `track_db_operation`, который: +- Автоматически записывает время выполнения операций БД +- Отслеживает успешные и неудачные операции +- Записывает метрики соединений с БД +- Использует существующую систему логирования проекта + +**Интеграция декораторов**: Для избежания циклических зависимостей создан патч `crud_metrics_patch.py`: +- Применяет декораторы к CRUD операциям после их импорта +- Автоматически активируется при импорте модуля +- Покрывает основные операции: INSERT, SELECT, UPDATE для users и questions +- **Исправлено**: Убрано применение `track_db_connection` к `@asynccontextmanager` методам (ошибка `__aenter__`) + +#### 3. Обновлен bot.py + +**Изменения**: +- Добавлен запуск `MetricsUpdater` при старте бота +- Добавлена остановка `MetricsUpdater` при завершении работы +- Передача пути к БД в `MetricsUpdater`: `config.DATABASE_PATH` +- Интервал обновления метрик: 30 секунд + +#### 4. Обновлен __init__.py + +**Изменения**: +- Добавлен экспорт новых сервисов и декораторов +- Обновлен список `__all__` + +### 📊 Ожидаемые результаты + +После внесения исправлений в дашбордах Grafana должны отображаться: + +#### AnonBot Overview: +- ✅ **Active Users** - количество активных пользователей за 24 часа +- ✅ **Active Questions** - количество активных вопросов (pending/processing) +- ✅ **Live Activity - Active Users** - то же значение, что и Active Users +- ✅ **Answers per Minute** - скорость отправки ответов + +#### Performance AnonBot: +- ✅ **Database Connections - Active** - количество активных соединений с БД +- ✅ **Database Performance - Query Duration** - время выполнения запросов к БД + +#### Server Monitoring: +- ✅ **AnonBot System Health** - активные пользователи +- ✅ **AnonBot Active Questions** - активные вопросы +- ✅ **AnonBot Database Connections** - соединения с БД + +### 🚀 Развертывание исправлений + +1. **Перезапустите AnonBot**: + ```bash + docker-compose restart anon-bot + ``` + +2. **Проверьте логи**: + ```bash + docker-compose logs -f anon-bot + ``` + + Должны появиться сообщения: + ``` + 📊 Запуск обновления метрик... + 📊 MetricsUpdater запущен с интервалом 30 секунд + ``` + +3. **Проверьте метрики**: + ```bash + curl http://localhost:8081/metrics | grep anon_bot_active + ``` + +4. **Проверьте в Grafana**: + - Откройте дашборды AnonBot + - Дождитесь обновления данных (до 30 секунд) + - Проверьте отображение метрик + +### 🔧 Дополнительные настройки + +#### Изменение интервала обновления метрик + +В файле `bot.py` можно изменить интервал обновления: + +```python +# Текущий интервал: 30 секунд +await start_metrics_updater(update_interval=30) + +# Для более частого обновления (например, 10 секунд): +await start_metrics_updater(update_interval=10) +``` + +#### Добавление метрик БД в CRUD операции + +Для автоматического сбора метрик БД в CRUD операциях добавьте декоратор: + +```python +from services.infrastructure import track_db_operation + +@track_db_operation("SELECT", "users") +async def get_user(self, user_id: int): + # код метода + pass +``` + +### 📈 Мониторинг + +После внесения исправлений рекомендуется настроить алерты: + +1. **Низкая активность пользователей**: `anon_bot_active_users < 1` +2. **Много активных вопросов**: `anon_bot_active_questions > 100` +3. **Проблемы с БД**: `anon_bot_db_connections_active == 0` +4. **Высокое время ответа БД**: `histogram_quantile(0.95, rate(anon_bot_db_query_duration_seconds_bucket[5m])) > 1` + +### 🐛 Troubleshooting + +#### Ошибка циклической зависимости + +**Проблема**: `ImportError: cannot import name 'get_database_service' from partially initialized module 'dependencies'` + +**Решение**: Проблема была исправлена в версии 2.0 исправлений: +- Удален импорт `get_database_service` из `metrics_updater.py` +- Добавлен параметр `db_path` в конструктор `MetricsUpdater` +- `DatabaseService` создается внутри `MetricsUpdater` с переданным путем к БД + +#### Метрики не обновляются + +1. Проверьте логи AnonBot: + ```bash + docker-compose logs anon-bot | grep -i metrics + ``` + +2. Проверьте доступность эндпоинта: + ```bash + curl http://localhost:8081/metrics + ``` + +3. Проверьте конфигурацию Prometheus: + ```bash + curl http://localhost:9090/api/v1/targets | grep anon-bot + ``` + +#### Ошибки в логах + +Если появляются ошибки типа "Database connection failed", проверьте: +- Доступность базы данных +- Правильность пути к БД в конфигурации +- Права доступа к файлу БД + +#### Нулевые значения в дашбордах + +Если метрики отображаются, но имеют нулевые значения: +- Убедитесь, что в БД есть данные (пользователи, вопросы) +- Проверьте SQL запросы в `MetricsUpdater` +- Увеличьте интервал обновления для накопления данных + ## 🐳 Docker ### Сборка образа diff --git a/bot.py b/bot.py index 87c45d7..79f2e1a 100644 --- a/bot.py +++ b/bot.py @@ -13,6 +13,7 @@ from loader import loader from services.infrastructure.http_server import start_http_server, stop_http_server from services.infrastructure.logger import get_logger from services.infrastructure.pid_manager import get_pid_manager, cleanup_pid_file +from services.infrastructure.metrics_updater import start_metrics_updater, stop_metrics_updater from config.constants import DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT # Настройка логирования @@ -42,6 +43,10 @@ async def main(): logger.info("🌐 Запуск HTTP сервера для метрик...") http_runner = await start_http_server(host=DEFAULT_HTTP_HOST, port=DEFAULT_HTTP_PORT) + # Запускаем обновление метрик + logger.info("📊 Запуск обновления метрик...") + await start_metrics_updater(update_interval=30, db_path=config.DATABASE_PATH) + # Запускаем бота await loader.start_polling() @@ -51,6 +56,10 @@ async def main(): logger.error(f"💥 Критическая ошибка: {e}") raise finally: + # Останавливаем обновление метрик + logger.info("📊 Остановка обновления метрик...") + await stop_metrics_updater() + # Останавливаем HTTP сервер if http_runner: logger.info("🛑 Остановка HTTP сервера...") diff --git a/database/crud.py b/database/crud.py index 4e349e2..0edef9d 100644 --- a/database/crud.py +++ b/database/crud.py @@ -14,9 +14,7 @@ from models.question import Question, QuestionStatus from models.user import User from models.user_block import UserBlock from models.user_settings import UserSettings -from services.infrastructure.logger import get_logger - -logger = get_logger(__name__) +from services.infrastructure.db_metrics_decorator import track_db_operation class ConnectionPool: @@ -43,30 +41,79 @@ class ConnectionPool: await conn.execute("PRAGMA temp_store=MEMORY") return conn + async def _is_connection_valid(self, conn) -> bool: + """Проверка валидности соединения""" + try: + if conn is None: + return False + # Выполняем простой запрос для проверки соединения + cursor = await conn.execute("SELECT 1") + await cursor.fetchone() + return True + except Exception: + return False + async def get_connection(self): """Получение соединения из пула""" try: # Пытаемся получить соединение из пула - return self._pool.get_nowait() + conn = self._pool.get_nowait() + # Проверяем, что соединение еще активно + if await self._is_connection_valid(conn): + return conn + else: + # Соединение неактивно, закрываем его и создаем новое + await conn.close() + async with self._lock: + self._created_connections -= 1 except asyncio.QueueEmpty: - # Если пул пуст, создаем новое соединение - async with self._lock: - if self._created_connections < self.pool_size: - self._created_connections += 1 - return await self._create_connection() + pass + + # Если пул пуст или соединение неактивно, создаем новое + async with self._lock: + if self._created_connections < self.pool_size: + self._created_connections += 1 + return await self._create_connection() + else: + # Ждем освобождения соединения из пула + conn = await self._pool.get() + # Проверяем валидность полученного соединения + if await self._is_connection_valid(conn): + return conn else: - # Ждем освобождения соединения - return await self._pool.get() + # Соединение неактивно, закрываем и создаем новое + await conn.close() + async with self._lock: + self._created_connections -= 1 + return await self._create_connection() async def return_connection(self, conn): """Возврат соединения в пул""" + if conn is None: + return + try: - self._pool.put_nowait(conn) + # Проверяем валидность соединения перед возвратом в пул + if await self._is_connection_valid(conn): + self._pool.put_nowait(conn) + else: + # Соединение неактивно, закрываем его + await conn.close() + async with self._lock: + self._created_connections -= 1 except asyncio.QueueFull: # Если пул полон, закрываем соединение await conn.close() async with self._lock: self._created_connections -= 1 + except Exception as e: + # В случае любой ошибки, закрываем соединение + try: + await conn.close() + except: + pass + async with self._lock: + self._created_connections -= 1 async def close_all(self): """Закрытие всех соединений""" @@ -74,6 +121,15 @@ class ConnectionPool: conn = await self._pool.get() await conn.close() self._created_connections = 0 + + def get_pool_stats(self) -> dict: + """Получение статистики пула соединений""" + return { + "pool_size": self.pool_size, + "created_connections": self._created_connections, + "available_connections": self._pool.qsize(), + "utilization_percent": (self._created_connections / self.pool_size) * 100 if self.pool_size > 0 else 0 + } # Глобальный пул соединений @@ -90,9 +146,10 @@ def get_connection_pool(db_path: str, pool_size: int = DEFAULT_CONNECTION_POOL_S class BaseCRUD: """Базовый класс для CRUD операций""" - def __init__(self, db_path: str): + def __init__(self, db_path: str, logger=None): self.db_path = db_path self.pool = get_connection_pool(db_path) + self.logger = logger @asynccontextmanager async def get_connection(self): @@ -116,9 +173,11 @@ class BaseCRUD: class UserCRUD(BaseCRUD): """CRUD операции для пользователей""" + @track_db_operation("INSERT", "users") async def create(self, user: User) -> User: """Создание нового пользователя""" - logger.info(f"👤 Создание пользователя: {user.telegram_id} ({user.first_name})") + if self.logger: + self.logger.info(f"👤 Создание пользователя: {user.telegram_id} ({user.first_name})") async with self.get_connection() as conn: cursor = await conn.execute(""" INSERT INTO users @@ -135,7 +194,8 @@ class UserCRUD(BaseCRUD): )) user.id = cursor.lastrowid await conn.commit() - logger.info(f"✅ Пользователь создан с ID: {user.id}") + if self.logger: + self.logger.info(f"✅ Пользователь создан с ID: {user.id}") return user async def create_batch(self, users: List[User]) -> List[User]: @@ -143,7 +203,8 @@ class UserCRUD(BaseCRUD): if not users: return [] - logger.info(f"📦 Создание {len(users)} пользователей batch операцией") + if self.logger: + self.logger.info(f"📦 Создание {len(users)} пользователей batch операцией") async with self.get_connection() as conn: try: # Подготавливаем данные для batch вставки @@ -172,14 +233,17 @@ class UserCRUD(BaseCRUD): user.id = first_id + i await conn.commit() - logger.info(f"✅ Создано {len(users)} пользователей batch операцией") + if self.logger: + self.logger.info(f"✅ Создано {len(users)} пользователей batch операцией") return users except Exception as e: await conn.rollback() - logger.error(f"❌ Ошибка при batch создании пользователей: {e}") + if self.logger: + self.logger.error(f"❌ Ошибка при batch создании пользователей: {e}") raise + @track_db_operation("SELECT", "users") async def get_by_telegram_id(self, telegram_id: int) -> Optional[User]: """Получение пользователя по Telegram ID""" async with self.get_connection() as conn: @@ -202,6 +266,7 @@ class UserCRUD(BaseCRUD): return self._row_to_user(row) return None + @track_db_operation("UPDATE", "users") async def update(self, user: User) -> User: """Обновление пользователя""" async with self.get_connection() as conn: @@ -221,6 +286,7 @@ class UserCRUD(BaseCRUD): await conn.commit() return user + @track_db_operation("DELETE", "users") async def delete(self, telegram_id: int) -> bool: """Удаление пользователя""" async with self.get_connection() as conn: @@ -230,6 +296,7 @@ class UserCRUD(BaseCRUD): await conn.commit() return cursor.rowcount > 0 + @track_db_operation("SELECT", "all_users") async def get_all(self, limit: int = 100, offset: int = 0) -> List[User]: """Получение всех пользователей""" async with self.get_connection() as conn: @@ -241,6 +308,7 @@ class UserCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_user(row) for row in rows] + @track_db_operation("SELECT", "all_users_cursor") async def get_all_users_cursor( self, last_id: int, @@ -271,6 +339,7 @@ class UserCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_user(row) for row in rows] + @track_db_operation("SELECT", "all_users_asc") async def get_all_users_asc(self, limit: int = 100, offset: int = 0) -> List[User]: """Получение всех пользователей в порядке возрастания""" async with self.get_connection() as conn: @@ -282,6 +351,7 @@ class UserCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_user(row) for row in rows] + @track_db_operation("SELECT", "stats") async def get_stats(self) -> Dict[str, Any]: """Получение статистики пользователей""" async with self.get_connection() as conn: @@ -327,9 +397,11 @@ class UserCRUD(BaseCRUD): class QuestionCRUD(BaseCRUD): """CRUD операции для вопросов""" + @track_db_operation("INSERT", "questions") async def create(self, question: Question) -> Question: """Создание нового вопроса""" - logger.info(f"❓ Создание вопроса от {question.from_user_id} к {question.to_user_id}") + if self.logger: + self.logger.info(f"❓ Создание вопроса от {question.from_user_id} к {question.to_user_id}") async with self.get_connection() as conn: # Вычисляем user_question_number для получателя if question.user_question_number is None: @@ -355,7 +427,8 @@ class QuestionCRUD(BaseCRUD): )) question.id = cursor.lastrowid await conn.commit() - logger.info(f"✅ Вопрос создан с ID: {question.id}, номер для пользователя: {question.user_question_number}") + if self.logger: + self.logger.info(f"✅ Вопрос создан с ID: {question.id}, номер для пользователя: {question.user_question_number}") return question async def create_batch(self, questions: List[Question]) -> List[Question]: @@ -363,7 +436,8 @@ class QuestionCRUD(BaseCRUD): if not questions: return [] - logger.info(f"📦 Создание {len(questions)} вопросов batch операцией") + if self.logger: + self.logger.info(f"📦 Создание {len(questions)} вопросов batch операцией") async with self.get_connection() as conn: try: # Группируем вопросы по получателям для вычисления user_question_number @@ -412,14 +486,17 @@ class QuestionCRUD(BaseCRUD): question.id = first_id + i await conn.commit() - logger.info(f"✅ Создано {len(questions)} вопросов batch операцией") + if self.logger: + self.logger.info(f"✅ Создано {len(questions)} вопросов batch операцией") return questions except Exception as e: await conn.rollback() - logger.error(f"❌ Ошибка при batch создании вопросов: {e}") + if self.logger: + self.logger.error(f"❌ Ошибка при batch создании вопросов: {e}") raise + @track_db_operation("SELECT", "questions") async def get_by_id(self, question_id: int) -> Optional[Question]: """Получение вопроса по ID""" async with self.get_connection() as conn: @@ -435,6 +512,7 @@ class QuestionCRUD(BaseCRUD): return self._row_to_question(row) return None + @track_db_operation("SELECT", "questions") async def get_by_to_user(self, to_user_id: int, status: Optional[QuestionStatus] = None, limit: int = 50, offset: int = 0) -> List[Question]: """Получение вопросов для пользователя (оптимизированная версия с JOIN)""" @@ -460,6 +538,7 @@ class QuestionCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_question(row) for row in rows] + @track_db_operation("SELECT", "questions_with_authors") async def get_by_to_user_with_authors(self, to_user_id: int, status: Optional[QuestionStatus] = None, limit: int = 50, offset: int = 0) -> List[Tuple[Question, Optional[User]]]: """Получение вопросов для пользователя с информацией об авторах (оптимизированный запрос)""" @@ -526,6 +605,7 @@ class QuestionCRUD(BaseCRUD): continue return result + @track_db_operation("SELECT", "questions_cursor") async def get_by_to_user_cursor( self, to_user_id: int, @@ -567,6 +647,7 @@ class QuestionCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_question(row) for row in rows] + @track_db_operation("SELECT", "questions_asc") async def get_by_to_user_asc( self, to_user_id: int, @@ -597,9 +678,11 @@ class QuestionCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_question(row) for row in rows] + @track_db_operation("UPDATE", "questions") async def update(self, question: Question) -> Question: """Обновление вопроса""" - logger.info(f"📝 Обновление вопроса {question.id} (статус: {question.status.value})") + if self.logger: + self.logger.info(f"📝 Обновление вопроса {question.id} (статус: {question.status.value})") async with self.get_connection() as conn: # Если вопрос помечается как удаленный, нужно пересчитать номера if question.status.value == 'deleted': @@ -638,7 +721,8 @@ class QuestionCRUD(BaseCRUD): AND id != ? """, (to_user_id, deleted_number, question.id)) - logger.info(f"🗑️ Вопрос {question.id} помечен как удаленный, пересчитаны номера для пользователя {to_user_id}") + if self.logger: + self.logger.info(f"🗑️ Вопрос {question.id} помечен как удаленный, пересчитаны номера для пользователя {to_user_id}") else: # Обычное обновление await conn.execute(""" @@ -663,9 +747,11 @@ class QuestionCRUD(BaseCRUD): )) await conn.commit() - logger.info(f"✅ Вопрос {question.id} обновлен") + if self.logger: + self.logger.info(f"✅ Вопрос {question.id} обновлен") return question + @track_db_operation("DELETE", "questions") async def delete(self, question_id: int) -> bool: """Удаление вопроса с пересчетом user_question_number""" async with self.get_connection() as conn: @@ -698,9 +784,11 @@ class QuestionCRUD(BaseCRUD): """, (to_user_id, deleted_number)) await conn.commit() - logger.info(f"🗑️ Вопрос {question_id} удален, пересчитаны номера для пользователя {to_user_id}") + if self.logger: + self.logger.info(f"🗑️ Вопрос {question_id} удален, пересчитаны номера для пользователя {to_user_id}") return True + @track_db_operation("SELECT", "questions_unread_count") async def get_unread_count(self, to_user_id: int) -> int: """Получение количества непрочитанных вопросов""" async with self.get_connection() as conn: @@ -711,6 +799,7 @@ class QuestionCRUD(BaseCRUD): row = await cursor.fetchone() return row[0] + @track_db_operation("SELECT", "questions_count_by_to_user") async def get_count_by_to_user(self, to_user_id: int, status: Optional[QuestionStatus] = None) -> int: """Получение общего количества вопросов для пользователя""" async with self.get_connection() as conn: @@ -725,6 +814,7 @@ class QuestionCRUD(BaseCRUD): row = await cursor.fetchone() return row[0] + @track_db_operation("SELECT", "questions_stats") async def get_stats(self) -> Dict[str, Any]: """Получение статистики вопросов""" async with self.get_connection() as conn: @@ -787,6 +877,7 @@ class QuestionCRUD(BaseCRUD): class UserBlockCRUD(BaseCRUD): """CRUD операции для блокировок пользователей""" + @track_db_operation("INSERT", "user_blocks") async def create(self, user_block: UserBlock) -> UserBlock: """Создание блокировки""" async with self.get_connection() as conn: @@ -801,6 +892,7 @@ class UserBlockCRUD(BaseCRUD): await conn.commit() return user_block + @track_db_operation("SELECT", "user_blocks") async def is_blocked(self, blocker_id: int, blocked_id: int) -> bool: """Проверка, заблокирован ли пользователь""" async with self.get_connection() as conn: @@ -811,6 +903,7 @@ class UserBlockCRUD(BaseCRUD): row = await cursor.fetchone() return row[0] > 0 + @track_db_operation("SELECT", "user_blocks") async def get_blocked_users(self, blocker_id: int) -> List[int]: """Получение списка заблокированных пользователей""" async with self.get_connection() as conn: @@ -821,6 +914,7 @@ class UserBlockCRUD(BaseCRUD): return [row[0] for row in rows] + @track_db_operation("DELETE", "user_blocks") async def delete(self, blocker_id: int, blocked_id: int) -> bool: """Удаление блокировки""" async with self.get_connection() as conn: @@ -835,6 +929,7 @@ class UserBlockCRUD(BaseCRUD): class UserSettingsCRUD(BaseCRUD): """CRUD операции для настроек пользователей""" + @track_db_operation("INSERT", "user_settings") async def create(self, settings: UserSettings) -> UserSettings: """Создание настроек пользователя""" async with self.get_connection() as conn: @@ -853,6 +948,7 @@ class UserSettingsCRUD(BaseCRUD): await conn.commit() return settings + @track_db_operation("SELECT", "user_settings") async def get_by_user_id(self, user_id: int) -> Optional[UserSettings]: """Получение настроек пользователя""" async with self.get_connection() as conn: @@ -864,6 +960,7 @@ class UserSettingsCRUD(BaseCRUD): return self._row_to_settings(row) return None + @track_db_operation("UPDATE", "user_settings") async def update(self, settings: UserSettings) -> UserSettings: """Обновление настроек пользователя""" async with self.get_connection() as conn: @@ -881,6 +978,7 @@ class UserSettingsCRUD(BaseCRUD): await conn.commit() return settings + @track_db_operation("DELETE", "user_settings") async def delete(self, user_id: int) -> bool: """Удаление настроек пользователя""" async with self.get_connection() as conn: diff --git a/.env_example b/env_example similarity index 100% rename from .env_example rename to env_example diff --git a/scripts/diagnose_db_connections.py b/scripts/diagnose_db_connections.py new file mode 100755 index 0000000..3aa6949 --- /dev/null +++ b/scripts/diagnose_db_connections.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Скрипт для диагностики проблем с соединениями БД +""" +import asyncio +import sys +import os +from pathlib import Path + +# Добавляем путь к проекту +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from database.crud import get_connection_pool +from services.infrastructure.metrics import get_metrics_service + + +async def diagnose_connections(): + """Диагностика соединений БД""" + print("🔍 Диагностика соединений БД AnonBot") + print("=" * 50) + + # Путь к БД + db_path = project_root / "database" / "anon_qna.db" + + if not db_path.exists(): + print(f"❌ База данных не найдена: {db_path}") + return + + print(f"📁 База данных: {db_path}") + + # Получаем пул соединений + pool = get_connection_pool(str(db_path)) + stats = pool.get_pool_stats() + + print("\n📊 Статистика пула соединений:") + print(f" • Размер пула: {stats['pool_size']}") + print(f" • Созданных соединений: {stats['created_connections']}") + print(f" • Доступных соединений: {stats['available_connections']}") + print(f" • Утилизация: {stats['utilization_percent']:.1f}%") + + # Анализ проблем + print("\n🔍 Анализ:") + + if stats['created_connections'] > stats['pool_size']: + print(f" ❌ КРИТИЧЕСКАЯ ПРОБЛЕМА: Создано {stats['created_connections']} соединений при лимите {stats['pool_size']}") + print(f" Это указывает на утечку соединений!") + elif stats['utilization_percent'] > 80: + print(f" ⚠️ ВНИМАНИЕ: Высокая утилизация пула ({stats['utilization_percent']:.1f}%)") + else: + print(f" ✅ Пул соединений работает нормально") + + # Проверяем метрики + print("\n📈 Метрики Prometheus:") + try: + metrics_service = get_metrics_service() + metrics_data = metrics_service.get_metrics() + + # Ищем метрики соединений + lines = metrics_data.decode('utf-8').split('\n') + connection_metrics = [line for line in lines if 'anon_bot_db_connections' in line or 'anon_bot_db_pool' in line] + + if connection_metrics: + for metric in connection_metrics: + if metric.strip(): + print(f" • {metric}") + else: + print(" • Метрики соединений не найдены") + + except Exception as e: + print(f" ❌ Ошибка получения метрик: {e}") + + # Рекомендации + print("\n💡 Рекомендации:") + + if stats['created_connections'] > stats['pool_size']: + print(" 1. Перезапустите AnonBot для сброса пула соединений") + print(" 2. Проверьте логи на наличие ошибок БД") + print(" 3. Убедитесь, что все соединения правильно закрываются") + print(" 4. Мониторьте метрики в Grafana") + elif stats['utilization_percent'] > 80: + print(" 1. Рассмотрите увеличение размера пула соединений") + print(" 2. Проверьте производительность запросов к БД") + print(" 3. Оптимизируйте часто используемые запросы") + else: + print(" 1. Продолжайте мониторинг метрик") + print(" 2. Настройте алерты в Grafana") + + print("\n🔧 Команды для мониторинга:") + print(" • Просмотр метрик: curl http://localhost:8081/metrics | grep anon_bot_db") + print(" • Проверка здоровья: curl http://localhost:8081/health") + print(" • Статус процесса: curl http://localhost:8081/status") + + +if __name__ == "__main__": + asyncio.run(diagnose_connections()) diff --git a/services/infrastructure/__init__.py b/services/infrastructure/__init__.py index a4c37e3..eeefe09 100644 --- a/services/infrastructure/__init__.py +++ b/services/infrastructure/__init__.py @@ -5,6 +5,8 @@ from .database import DatabaseService from .logger import get_logger, setup_logging from .metrics import MetricsService, get_metrics_service +from .metrics_updater import MetricsUpdater, get_metrics_updater, start_metrics_updater, stop_metrics_updater +from .db_metrics_decorator import track_db_operation, track_db_connection from .pid_manager import PIDManager, get_pid_manager, cleanup_pid_file from .logging_decorators import ( log_function_call, log_business_event, log_fsm_transition, @@ -21,6 +23,8 @@ __all__ = [ 'DatabaseService', 'get_logger', 'setup_logging', 'MetricsService', 'get_metrics_service', + 'MetricsUpdater', 'get_metrics_updater', 'start_metrics_updater', 'stop_metrics_updater', + 'track_db_operation', 'track_db_connection', 'PIDManager', 'get_pid_manager', 'cleanup_pid_file', 'log_function_call', 'log_business_event', 'log_fsm_transition', 'log_handler', 'log_service', 'log_business', 'log_fsm', diff --git a/services/infrastructure/database.py b/services/infrastructure/database.py index 23a8bca..840c262 100644 --- a/services/infrastructure/database.py +++ b/services/infrastructure/database.py @@ -22,11 +22,11 @@ class DatabaseService: def __init__(self, db_path: str): self.db_path = db_path - # Инициализируем CRUD операции - self.users = UserCRUD(db_path) - self.questions = QuestionCRUD(db_path) - self.user_blocks = UserBlockCRUD(db_path) - self.user_settings = UserSettingsCRUD(db_path) + # Инициализируем CRUD операции с передачей логгера + self.users = UserCRUD(db_path, logger) + self.questions = QuestionCRUD(db_path, logger) + self.user_blocks = UserBlockCRUD(db_path, logger) + self.user_settings = UserSettingsCRUD(db_path, logger) async def init(self): """Инициализация базы данных и создание таблиц""" @@ -59,7 +59,7 @@ class DatabaseService: return # Читаем схему из файла - schema_path = Path(__file__).parent.parent / "database" / "schema.sql" + schema_path = Path(__file__).parent.parent.parent / "database" / "schema.sql" if schema_path.exists(): logger.info("📄 Создание таблиц из схемы") diff --git a/services/infrastructure/db_metrics_decorator.py b/services/infrastructure/db_metrics_decorator.py new file mode 100644 index 0000000..0fa45a4 --- /dev/null +++ b/services/infrastructure/db_metrics_decorator.py @@ -0,0 +1,69 @@ +""" +Декоратор для автоматического сбора метрик базы данных +""" +import time +import functools +from typing import Callable, Any + +from .metrics import get_metrics_service +from .logger import get_logger + + +def track_db_operation(operation: str, table: str): + """ + Декоратор для отслеживания операций с базой данных + + Args: + operation: Тип операции (SELECT, INSERT, UPDATE, DELETE) + table: Название таблицы + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + metrics_service = get_metrics_service() + logger = get_logger(__name__) + start_time = time.time() + + try: + result = await func(*args, **kwargs) + duration = time.time() - start_time + + # Записываем успешную операцию + metrics_service.record_db_query(operation, table, "success", duration) + + return result + + except Exception as e: + duration = time.time() - start_time + + # Записываем неудачную операцию + metrics_service.record_db_query(operation, table, "error", duration) + metrics_service.increment_errors(type(e).__name__, "database_operation") + + logger.error(f"Database operation failed: {operation} on {table}: {e}") + raise + + return wrapper + return decorator + + +def track_db_connection(func: Callable) -> Callable: + """ + Декоратор для отслеживания соединений с базой данных + """ + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + metrics_service = get_metrics_service() + logger = get_logger(__name__) + + try: + result = await func(*args, **kwargs) + return result + + except Exception as e: + # Записываем только ошибки, не соединения + metrics_service.increment_errors(type(e).__name__, "database_connection") + logger.error(f"Database connection failed: {e}") + raise + + return wrapper diff --git a/services/infrastructure/http_server.py b/services/infrastructure/http_server.py index 6925980..9c61c61 100644 --- a/services/infrastructure/http_server.py +++ b/services/infrastructure/http_server.py @@ -6,7 +6,7 @@ import time from typing import Optional from aiohttp import ClientSession, web -from aiohttp.web import Request, Response +from aiohttp.web import Request, Response, json_response from loguru import logger from config.constants import DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, APP_VERSION, HTTP_STATUS_OK, HTTP_STATUS_SERVICE_UNAVAILABLE, HTTP_STATUS_INTERNAL_SERVER_ERROR @@ -41,7 +41,8 @@ class HTTPServer: try: # Получаем метрики metrics_data = self.metrics_service.get_metrics() - content_type = self.metrics_service.get_content_type() + if isinstance(metrics_data, bytes): + metrics_data = metrics_data.decode('utf-8') # Записываем метрику HTTP запроса duration = time.time() - start_time @@ -50,7 +51,7 @@ class HTTPServer: return Response( text=metrics_data, - content_type=content_type, + content_type='text/plain; version=0.0.4', status=HTTP_STATUS_OK ) @@ -104,8 +105,8 @@ class HTTPServer: self.metrics_service.record_http_request_duration("GET", "/health", duration) self.metrics_service.increment_http_requests("GET", "/health", http_status) - return Response( - json=health_status, + return json_response( + health_status, status=http_status ) @@ -116,8 +117,8 @@ class HTTPServer: self.metrics_service.increment_http_requests("GET", "/health", 500) self.metrics_service.increment_errors(type(e).__name__, "health_handler") - return Response( - json={"status": "error", "message": str(e)}, + return json_response( + {"status": "error", "message": str(e)}, status=HTTP_STATUS_INTERNAL_SERVER_ERROR ) @@ -149,8 +150,8 @@ class HTTPServer: self.metrics_service.record_http_request_duration("GET", "/ready", duration) self.metrics_service.increment_http_requests("GET", "/ready", http_status) - return Response( - json=ready_status, + return json_response( + ready_status, status=http_status ) @@ -161,8 +162,8 @@ class HTTPServer: self.metrics_service.increment_http_requests("GET", "/ready", 500) self.metrics_service.increment_errors(type(e).__name__, "ready_handler") - return Response( - json={"status": "error", "message": str(e)}, + return json_response( + {"status": "error", "message": str(e)}, status=HTTP_STATUS_INTERNAL_SERVER_ERROR ) @@ -278,8 +279,8 @@ class HTTPServer: self.metrics_service.record_http_request_duration("GET", "/", duration) self.metrics_service.increment_http_requests("GET", "/", 200) - return Response( - json=info, + return json_response( + info, status=HTTP_STATUS_OK ) @@ -290,8 +291,8 @@ class HTTPServer: self.metrics_service.increment_http_requests("GET", "/", 500) self.metrics_service.increment_errors(type(e).__name__, "root_handler") - return Response( - json={"error": str(e)}, + return json_response( + {"error": str(e)}, status=HTTP_STATUS_INTERNAL_SERVER_ERROR ) diff --git a/services/infrastructure/metrics.py b/services/infrastructure/metrics.py index 75621f2..3f50537 100644 --- a/services/infrastructure/metrics.py +++ b/services/infrastructure/metrics.py @@ -127,6 +127,27 @@ class MetricsService: ['status'] ) + # Метрики пула соединений + self.db_pool_size = Gauge( + 'anon_bot_db_pool_size', + 'Database connection pool size' + ) + + self.db_pool_created_connections = Gauge( + 'anon_bot_db_pool_created_connections', + 'Number of created connections in pool' + ) + + self.db_pool_available_connections = Gauge( + 'anon_bot_db_pool_available_connections', + 'Number of available connections in pool' + ) + + self.db_pool_utilization_percent = Gauge( + 'anon_bot_db_pool_utilization_percent', + 'Database connection pool utilization percentage' + ) + # Метрики пагинации self.pagination_requests_total = Counter( 'anon_bot_pagination_requests_total', @@ -237,13 +258,25 @@ class MetricsService: self.db_query_duration.labels(operation=operation, table=table).observe(duration) def record_db_connection(self, status: str): - """Записать метрики подключения к БД""" + """Записать метрики подключения к БД (только для реальных соединений пула)""" self.db_connections_total.labels(status=status).inc() if status == "opened": self.db_connections_active.inc() elif status == "closed": self.db_connections_active.dec() + def update_db_connections_from_pool(self, active_count: int): + """Обновить количество активных соединений на основе реального пула""" + # Сбрасываем счетчик и устанавливаем реальное значение + self.db_connections_active.set(active_count) + + def update_db_pool_metrics(self, pool_stats: dict): + """Обновить метрики пула соединений""" + self.db_pool_size.set(pool_stats.get("pool_size", 0)) + self.db_pool_created_connections.set(pool_stats.get("created_connections", 0)) + self.db_pool_available_connections.set(pool_stats.get("available_connections", 0)) + self.db_pool_utilization_percent.set(pool_stats.get("utilization_percent", 0)) + def record_pagination_time(self, entity_type: str, duration: float, method: str = "cursor"): """Записать время пагинации""" self.pagination_requests_total.labels(entity_type=entity_type, method=method).inc() diff --git a/services/infrastructure/metrics_updater.py b/services/infrastructure/metrics_updater.py new file mode 100644 index 0000000..31592df --- /dev/null +++ b/services/infrastructure/metrics_updater.py @@ -0,0 +1,196 @@ +""" +Сервис для периодического обновления метрик +""" +import asyncio +import time +from typing import Optional + +from .metrics import get_metrics_service +from .database import DatabaseService +from .logger import get_logger + + +class MetricsUpdater: + """Сервис для периодического обновления метрик""" + + def __init__(self, update_interval: int = 30, db_path: str = None): + self.update_interval = update_interval + self.metrics_service = get_metrics_service() + self.database_service: Optional[DatabaseService] = None + self.db_path = db_path + self._running = False + self._task: Optional[asyncio.Task] = None + self.logger = get_logger(__name__) + + async def start(self): + """Запустить обновление метрик""" + if self._running: + self.logger.warning("MetricsUpdater уже запущен") + return + + # Создаем DatabaseService если путь к БД указан + if self.db_path: + self.database_service = DatabaseService(self.db_path) + await self.database_service.init() + + self._running = True + self._task = asyncio.create_task(self._update_loop()) + self.logger.info(f"📊 MetricsUpdater запущен с интервалом {self.update_interval} секунд") + + async def stop(self): + """Остановить обновление метрик""" + if not self._running: + return + + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + # Логгер недоступен в stop, так как объект может быть уже уничтожен + pass + + async def _update_loop(self): + """Основной цикл обновления метрик""" + while self._running: + try: + await self._update_metrics() + await asyncio.sleep(self.update_interval) + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик: {e}") + await asyncio.sleep(self.update_interval) + + async def _update_metrics(self): + """Обновление всех метрик""" + try: + # Обновляем активных пользователей + await self._update_active_users() + + # Обновляем активные вопросы + await self._update_active_questions() + + # Обновляем метрики БД + await self._update_database_metrics() + + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик: {e}") + self.metrics_service.increment_errors(type(e).__name__, "metrics_updater") + + async def _update_active_users(self): + """Обновление количества активных пользователей""" + try: + if not self.database_service: + return + + # Подсчитываем активных пользователей за последние 24 часа + async with self.database_service.get_connection() as conn: + cursor = await conn.execute(""" + SELECT COUNT(*) FROM users + WHERE is_active = 1 + AND updated_at > datetime('now', '-24 hours') + """) + result = await cursor.fetchone() + active_users_count = result[0] if result else 0 + + self.metrics_service.set_active_users(active_users_count) + self.logger.debug(f"Обновлено количество активных пользователей: {active_users_count}") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении активных пользователей: {e}") + + async def _update_active_questions(self): + """Обновление количества активных вопросов""" + try: + if not self.database_service: + return + + # Подсчитываем активные вопросы (pending и processing) + async with self.database_service.get_connection() as conn: + cursor = await conn.execute(""" + SELECT COUNT(*) FROM questions + WHERE status IN ('pending', 'processing') + """) + result = await cursor.fetchone() + active_questions_count = result[0] if result else 0 + + self.metrics_service.set_active_questions(active_questions_count) + self.logger.debug(f"Обновлено количество активных вопросов: {active_questions_count}") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении активных вопросов: {e}") + + async def _update_database_metrics(self): + """Обновление метрик базы данных""" + try: + if not self.database_service: + return + + # Проверяем соединение с БД + start_time = time.time() + try: + await self.database_service.check_connection() + duration = time.time() - start_time + + # Записываем успешное соединение (только для статистики, не для активных соединений) + self.metrics_service.record_db_query("health_check", "connection", "success", duration) + + # Обновляем метрики пула соединений + await self._update_pool_metrics() + + except Exception as e: + duration = time.time() - start_time + # Записываем неудачное соединение (только для статистики) + self.metrics_service.record_db_query("health_check", "connection", "error", duration) + self.metrics_service.increment_errors(type(e).__name__, "database_health_check") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик БД: {e}") + + async def _update_pool_metrics(self): + """Обновление метрик пула соединений""" + try: + from database.crud import get_connection_pool + pool = get_connection_pool(self.database_service.db_path) + pool_stats = pool.get_pool_stats() + self.metrics_service.update_db_pool_metrics(pool_stats) + + # Обновляем реальное количество активных соединений из пула + created_connections = pool_stats.get("created_connections", 0) + self.metrics_service.update_db_connections_from_pool(created_connections) + + # Логируем предупреждение если утилизация пула превышает 80% + if pool_stats.get("utilization_percent", 0) > 80: + self.logger.warning(f"Высокая утилизация пула соединений: {pool_stats}") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик пула: {e}") + + +# Глобальный экземпляр +_metrics_updater: Optional[MetricsUpdater] = None + + +def get_metrics_updater(update_interval: int = 30, db_path: str = None) -> MetricsUpdater: + """Получить экземпляр MetricsUpdater""" + global _metrics_updater + if _metrics_updater is None: + _metrics_updater = MetricsUpdater(update_interval, db_path) + return _metrics_updater + + +async def start_metrics_updater(update_interval: int = 30, db_path: str = None): + """Запустить обновление метрик""" + updater = get_metrics_updater(update_interval, db_path) + await updater.start() + + +async def stop_metrics_updater(): + """Остановить обновление метрик""" + global _metrics_updater + if _metrics_updater: + await _metrics_updater.stop() + _metrics_updater = None