diff --git a/README.md b/README.md index 074ff92..61ecf85 100644 --- a/README.md +++ b/README.md @@ -1300,6 +1300,195 @@ ports: - Использовать reverse proxy с аутентификацией - Настроить TLS для HTTPS +## 🔧 Исправление проблем с метриками + +### 🔍 Найденные проблемы + +В дашбордах Grafana не отображались следующие метрики: +- Database Connections - Active +- Database Performance - Query Duration +- Active Questions +- Active Users +- Answers per Minute +- Live Activity - Active Users + +### 🛠️ Внесенные исправления + +#### 1. Создан сервис MetricsUpdater (`services/infrastructure/metrics_updater.py`) + +**Исправление циклической зависимости**: Первоначально возникла циклическая зависимость между `metrics_updater.py` и `dependencies.py`. Проблема была решена путем: +- Удаления импорта `get_database_service` из `dependencies` +- Передачи пути к БД напрямую в конструктор `MetricsUpdater` +- Создания собственного экземпляра `DatabaseService` внутри `MetricsUpdater` + +**Исправление использования логгера**: Первоначально использовался `loguru` напрямую, но в проекте уже есть настроенная система логирования. Исправлено: +- Заменен `from loguru import logger` на `from .logger import get_logger` +- Используется `self.logger = get_logger(__name__)` в конструкторе +- Все вызовы `logger` заменены на `self.logger` + +**Проблема**: Методы `set_active_users()` и `set_active_questions()` были определены в `MetricsService`, но нигде не вызывались. + +**Решение**: Создан сервис `MetricsUpdater`, который: +- Периодически обновляет количество активных пользователей (за последние 24 часа) +- Периодически обновляет количество активных вопросов (статус pending/processing) +- Обновляет метрики соединений с БД +- Запускается автоматически при старте бота + +#### 2. Создан декоратор для метрик БД (`services/infrastructure/db_metrics_decorator.py`) + +**Проблема**: Методы `record_db_connection()` и `record_db_query()` были определены, но не интегрированы в код БД. + +**Решение**: Создан декоратор `track_db_operation`, который: +- Автоматически записывает время выполнения операций БД +- Отслеживает успешные и неудачные операции +- Записывает метрики соединений с БД +- Использует существующую систему логирования проекта + +**Интеграция декораторов**: Для избежания циклических зависимостей создан патч `crud_metrics_patch.py`: +- Применяет декораторы к CRUD операциям после их импорта +- Автоматически активируется при импорте модуля +- Покрывает основные операции: INSERT, SELECT, UPDATE для users и questions +- **Исправлено**: Убрано применение `track_db_connection` к `@asynccontextmanager` методам (ошибка `__aenter__`) + +#### 3. Обновлен bot.py + +**Изменения**: +- Добавлен запуск `MetricsUpdater` при старте бота +- Добавлена остановка `MetricsUpdater` при завершении работы +- Передача пути к БД в `MetricsUpdater`: `config.DATABASE_PATH` +- Интервал обновления метрик: 30 секунд + +#### 4. Обновлен __init__.py + +**Изменения**: +- Добавлен экспорт новых сервисов и декораторов +- Обновлен список `__all__` + +### 📊 Ожидаемые результаты + +После внесения исправлений в дашбордах Grafana должны отображаться: + +#### AnonBot Overview: +- ✅ **Active Users** - количество активных пользователей за 24 часа +- ✅ **Active Questions** - количество активных вопросов (pending/processing) +- ✅ **Live Activity - Active Users** - то же значение, что и Active Users +- ✅ **Answers per Minute** - скорость отправки ответов + +#### Performance AnonBot: +- ✅ **Database Connections - Active** - количество активных соединений с БД +- ✅ **Database Performance - Query Duration** - время выполнения запросов к БД + +#### Server Monitoring: +- ✅ **AnonBot System Health** - активные пользователи +- ✅ **AnonBot Active Questions** - активные вопросы +- ✅ **AnonBot Database Connections** - соединения с БД + +### 🚀 Развертывание исправлений + +1. **Перезапустите AnonBot**: + ```bash + docker-compose restart anon-bot + ``` + +2. **Проверьте логи**: + ```bash + docker-compose logs -f anon-bot + ``` + + Должны появиться сообщения: + ``` + 📊 Запуск обновления метрик... + 📊 MetricsUpdater запущен с интервалом 30 секунд + ``` + +3. **Проверьте метрики**: + ```bash + curl http://localhost:8081/metrics | grep anon_bot_active + ``` + +4. **Проверьте в Grafana**: + - Откройте дашборды AnonBot + - Дождитесь обновления данных (до 30 секунд) + - Проверьте отображение метрик + +### 🔧 Дополнительные настройки + +#### Изменение интервала обновления метрик + +В файле `bot.py` можно изменить интервал обновления: + +```python +# Текущий интервал: 30 секунд +await start_metrics_updater(update_interval=30) + +# Для более частого обновления (например, 10 секунд): +await start_metrics_updater(update_interval=10) +``` + +#### Добавление метрик БД в CRUD операции + +Для автоматического сбора метрик БД в CRUD операциях добавьте декоратор: + +```python +from services.infrastructure import track_db_operation + +@track_db_operation("SELECT", "users") +async def get_user(self, user_id: int): + # код метода + pass +``` + +### 📈 Мониторинг + +После внесения исправлений рекомендуется настроить алерты: + +1. **Низкая активность пользователей**: `anon_bot_active_users < 1` +2. **Много активных вопросов**: `anon_bot_active_questions > 100` +3. **Проблемы с БД**: `anon_bot_db_connections_active == 0` +4. **Высокое время ответа БД**: `histogram_quantile(0.95, rate(anon_bot_db_query_duration_seconds_bucket[5m])) > 1` + +### 🐛 Troubleshooting + +#### Ошибка циклической зависимости + +**Проблема**: `ImportError: cannot import name 'get_database_service' from partially initialized module 'dependencies'` + +**Решение**: Проблема была исправлена в версии 2.0 исправлений: +- Удален импорт `get_database_service` из `metrics_updater.py` +- Добавлен параметр `db_path` в конструктор `MetricsUpdater` +- `DatabaseService` создается внутри `MetricsUpdater` с переданным путем к БД + +#### Метрики не обновляются + +1. Проверьте логи AnonBot: + ```bash + docker-compose logs anon-bot | grep -i metrics + ``` + +2. Проверьте доступность эндпоинта: + ```bash + curl http://localhost:8081/metrics + ``` + +3. Проверьте конфигурацию Prometheus: + ```bash + curl http://localhost:9090/api/v1/targets | grep anon-bot + ``` + +#### Ошибки в логах + +Если появляются ошибки типа "Database connection failed", проверьте: +- Доступность базы данных +- Правильность пути к БД в конфигурации +- Права доступа к файлу БД + +#### Нулевые значения в дашбордах + +Если метрики отображаются, но имеют нулевые значения: +- Убедитесь, что в БД есть данные (пользователи, вопросы) +- Проверьте SQL запросы в `MetricsUpdater` +- Увеличьте интервал обновления для накопления данных + ## 🐳 Docker ### Сборка образа diff --git a/bot.py b/bot.py index 87c45d7..79f2e1a 100644 --- a/bot.py +++ b/bot.py @@ -13,6 +13,7 @@ from loader import loader from services.infrastructure.http_server import start_http_server, stop_http_server from services.infrastructure.logger import get_logger from services.infrastructure.pid_manager import get_pid_manager, cleanup_pid_file +from services.infrastructure.metrics_updater import start_metrics_updater, stop_metrics_updater from config.constants import DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT # Настройка логирования @@ -42,6 +43,10 @@ async def main(): logger.info("🌐 Запуск HTTP сервера для метрик...") http_runner = await start_http_server(host=DEFAULT_HTTP_HOST, port=DEFAULT_HTTP_PORT) + # Запускаем обновление метрик + logger.info("📊 Запуск обновления метрик...") + await start_metrics_updater(update_interval=30, db_path=config.DATABASE_PATH) + # Запускаем бота await loader.start_polling() @@ -51,6 +56,10 @@ async def main(): logger.error(f"💥 Критическая ошибка: {e}") raise finally: + # Останавливаем обновление метрик + logger.info("📊 Остановка обновления метрик...") + await stop_metrics_updater() + # Останавливаем HTTP сервер if http_runner: logger.info("🛑 Остановка HTTP сервера...") diff --git a/database/crud.py b/database/crud.py index 4e349e2..0edef9d 100644 --- a/database/crud.py +++ b/database/crud.py @@ -14,9 +14,7 @@ from models.question import Question, QuestionStatus from models.user import User from models.user_block import UserBlock from models.user_settings import UserSettings -from services.infrastructure.logger import get_logger - -logger = get_logger(__name__) +from services.infrastructure.db_metrics_decorator import track_db_operation class ConnectionPool: @@ -43,30 +41,79 @@ class ConnectionPool: await conn.execute("PRAGMA temp_store=MEMORY") return conn + async def _is_connection_valid(self, conn) -> bool: + """Проверка валидности соединения""" + try: + if conn is None: + return False + # Выполняем простой запрос для проверки соединения + cursor = await conn.execute("SELECT 1") + await cursor.fetchone() + return True + except Exception: + return False + async def get_connection(self): """Получение соединения из пула""" try: # Пытаемся получить соединение из пула - return self._pool.get_nowait() + conn = self._pool.get_nowait() + # Проверяем, что соединение еще активно + if await self._is_connection_valid(conn): + return conn + else: + # Соединение неактивно, закрываем его и создаем новое + await conn.close() + async with self._lock: + self._created_connections -= 1 except asyncio.QueueEmpty: - # Если пул пуст, создаем новое соединение - async with self._lock: - if self._created_connections < self.pool_size: - self._created_connections += 1 - return await self._create_connection() + pass + + # Если пул пуст или соединение неактивно, создаем новое + async with self._lock: + if self._created_connections < self.pool_size: + self._created_connections += 1 + return await self._create_connection() + else: + # Ждем освобождения соединения из пула + conn = await self._pool.get() + # Проверяем валидность полученного соединения + if await self._is_connection_valid(conn): + return conn else: - # Ждем освобождения соединения - return await self._pool.get() + # Соединение неактивно, закрываем и создаем новое + await conn.close() + async with self._lock: + self._created_connections -= 1 + return await self._create_connection() async def return_connection(self, conn): """Возврат соединения в пул""" + if conn is None: + return + try: - self._pool.put_nowait(conn) + # Проверяем валидность соединения перед возвратом в пул + if await self._is_connection_valid(conn): + self._pool.put_nowait(conn) + else: + # Соединение неактивно, закрываем его + await conn.close() + async with self._lock: + self._created_connections -= 1 except asyncio.QueueFull: # Если пул полон, закрываем соединение await conn.close() async with self._lock: self._created_connections -= 1 + except Exception as e: + # В случае любой ошибки, закрываем соединение + try: + await conn.close() + except: + pass + async with self._lock: + self._created_connections -= 1 async def close_all(self): """Закрытие всех соединений""" @@ -74,6 +121,15 @@ class ConnectionPool: conn = await self._pool.get() await conn.close() self._created_connections = 0 + + def get_pool_stats(self) -> dict: + """Получение статистики пула соединений""" + return { + "pool_size": self.pool_size, + "created_connections": self._created_connections, + "available_connections": self._pool.qsize(), + "utilization_percent": (self._created_connections / self.pool_size) * 100 if self.pool_size > 0 else 0 + } # Глобальный пул соединений @@ -90,9 +146,10 @@ def get_connection_pool(db_path: str, pool_size: int = DEFAULT_CONNECTION_POOL_S class BaseCRUD: """Базовый класс для CRUD операций""" - def __init__(self, db_path: str): + def __init__(self, db_path: str, logger=None): self.db_path = db_path self.pool = get_connection_pool(db_path) + self.logger = logger @asynccontextmanager async def get_connection(self): @@ -116,9 +173,11 @@ class BaseCRUD: class UserCRUD(BaseCRUD): """CRUD операции для пользователей""" + @track_db_operation("INSERT", "users") async def create(self, user: User) -> User: """Создание нового пользователя""" - logger.info(f"👤 Создание пользователя: {user.telegram_id} ({user.first_name})") + if self.logger: + self.logger.info(f"👤 Создание пользователя: {user.telegram_id} ({user.first_name})") async with self.get_connection() as conn: cursor = await conn.execute(""" INSERT INTO users @@ -135,7 +194,8 @@ class UserCRUD(BaseCRUD): )) user.id = cursor.lastrowid await conn.commit() - logger.info(f"✅ Пользователь создан с ID: {user.id}") + if self.logger: + self.logger.info(f"✅ Пользователь создан с ID: {user.id}") return user async def create_batch(self, users: List[User]) -> List[User]: @@ -143,7 +203,8 @@ class UserCRUD(BaseCRUD): if not users: return [] - logger.info(f"📦 Создание {len(users)} пользователей batch операцией") + if self.logger: + self.logger.info(f"📦 Создание {len(users)} пользователей batch операцией") async with self.get_connection() as conn: try: # Подготавливаем данные для batch вставки @@ -172,14 +233,17 @@ class UserCRUD(BaseCRUD): user.id = first_id + i await conn.commit() - logger.info(f"✅ Создано {len(users)} пользователей batch операцией") + if self.logger: + self.logger.info(f"✅ Создано {len(users)} пользователей batch операцией") return users except Exception as e: await conn.rollback() - logger.error(f"❌ Ошибка при batch создании пользователей: {e}") + if self.logger: + self.logger.error(f"❌ Ошибка при batch создании пользователей: {e}") raise + @track_db_operation("SELECT", "users") async def get_by_telegram_id(self, telegram_id: int) -> Optional[User]: """Получение пользователя по Telegram ID""" async with self.get_connection() as conn: @@ -202,6 +266,7 @@ class UserCRUD(BaseCRUD): return self._row_to_user(row) return None + @track_db_operation("UPDATE", "users") async def update(self, user: User) -> User: """Обновление пользователя""" async with self.get_connection() as conn: @@ -221,6 +286,7 @@ class UserCRUD(BaseCRUD): await conn.commit() return user + @track_db_operation("DELETE", "users") async def delete(self, telegram_id: int) -> bool: """Удаление пользователя""" async with self.get_connection() as conn: @@ -230,6 +296,7 @@ class UserCRUD(BaseCRUD): await conn.commit() return cursor.rowcount > 0 + @track_db_operation("SELECT", "all_users") async def get_all(self, limit: int = 100, offset: int = 0) -> List[User]: """Получение всех пользователей""" async with self.get_connection() as conn: @@ -241,6 +308,7 @@ class UserCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_user(row) for row in rows] + @track_db_operation("SELECT", "all_users_cursor") async def get_all_users_cursor( self, last_id: int, @@ -271,6 +339,7 @@ class UserCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_user(row) for row in rows] + @track_db_operation("SELECT", "all_users_asc") async def get_all_users_asc(self, limit: int = 100, offset: int = 0) -> List[User]: """Получение всех пользователей в порядке возрастания""" async with self.get_connection() as conn: @@ -282,6 +351,7 @@ class UserCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_user(row) for row in rows] + @track_db_operation("SELECT", "stats") async def get_stats(self) -> Dict[str, Any]: """Получение статистики пользователей""" async with self.get_connection() as conn: @@ -327,9 +397,11 @@ class UserCRUD(BaseCRUD): class QuestionCRUD(BaseCRUD): """CRUD операции для вопросов""" + @track_db_operation("INSERT", "questions") async def create(self, question: Question) -> Question: """Создание нового вопроса""" - logger.info(f"❓ Создание вопроса от {question.from_user_id} к {question.to_user_id}") + if self.logger: + self.logger.info(f"❓ Создание вопроса от {question.from_user_id} к {question.to_user_id}") async with self.get_connection() as conn: # Вычисляем user_question_number для получателя if question.user_question_number is None: @@ -355,7 +427,8 @@ class QuestionCRUD(BaseCRUD): )) question.id = cursor.lastrowid await conn.commit() - logger.info(f"✅ Вопрос создан с ID: {question.id}, номер для пользователя: {question.user_question_number}") + if self.logger: + self.logger.info(f"✅ Вопрос создан с ID: {question.id}, номер для пользователя: {question.user_question_number}") return question async def create_batch(self, questions: List[Question]) -> List[Question]: @@ -363,7 +436,8 @@ class QuestionCRUD(BaseCRUD): if not questions: return [] - logger.info(f"📦 Создание {len(questions)} вопросов batch операцией") + if self.logger: + self.logger.info(f"📦 Создание {len(questions)} вопросов batch операцией") async with self.get_connection() as conn: try: # Группируем вопросы по получателям для вычисления user_question_number @@ -412,14 +486,17 @@ class QuestionCRUD(BaseCRUD): question.id = first_id + i await conn.commit() - logger.info(f"✅ Создано {len(questions)} вопросов batch операцией") + if self.logger: + self.logger.info(f"✅ Создано {len(questions)} вопросов batch операцией") return questions except Exception as e: await conn.rollback() - logger.error(f"❌ Ошибка при batch создании вопросов: {e}") + if self.logger: + self.logger.error(f"❌ Ошибка при batch создании вопросов: {e}") raise + @track_db_operation("SELECT", "questions") async def get_by_id(self, question_id: int) -> Optional[Question]: """Получение вопроса по ID""" async with self.get_connection() as conn: @@ -435,6 +512,7 @@ class QuestionCRUD(BaseCRUD): return self._row_to_question(row) return None + @track_db_operation("SELECT", "questions") async def get_by_to_user(self, to_user_id: int, status: Optional[QuestionStatus] = None, limit: int = 50, offset: int = 0) -> List[Question]: """Получение вопросов для пользователя (оптимизированная версия с JOIN)""" @@ -460,6 +538,7 @@ class QuestionCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_question(row) for row in rows] + @track_db_operation("SELECT", "questions_with_authors") async def get_by_to_user_with_authors(self, to_user_id: int, status: Optional[QuestionStatus] = None, limit: int = 50, offset: int = 0) -> List[Tuple[Question, Optional[User]]]: """Получение вопросов для пользователя с информацией об авторах (оптимизированный запрос)""" @@ -526,6 +605,7 @@ class QuestionCRUD(BaseCRUD): continue return result + @track_db_operation("SELECT", "questions_cursor") async def get_by_to_user_cursor( self, to_user_id: int, @@ -567,6 +647,7 @@ class QuestionCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_question(row) for row in rows] + @track_db_operation("SELECT", "questions_asc") async def get_by_to_user_asc( self, to_user_id: int, @@ -597,9 +678,11 @@ class QuestionCRUD(BaseCRUD): rows = await cursor.fetchall() return [self._row_to_question(row) for row in rows] + @track_db_operation("UPDATE", "questions") async def update(self, question: Question) -> Question: """Обновление вопроса""" - logger.info(f"📝 Обновление вопроса {question.id} (статус: {question.status.value})") + if self.logger: + self.logger.info(f"📝 Обновление вопроса {question.id} (статус: {question.status.value})") async with self.get_connection() as conn: # Если вопрос помечается как удаленный, нужно пересчитать номера if question.status.value == 'deleted': @@ -638,7 +721,8 @@ class QuestionCRUD(BaseCRUD): AND id != ? """, (to_user_id, deleted_number, question.id)) - logger.info(f"🗑️ Вопрос {question.id} помечен как удаленный, пересчитаны номера для пользователя {to_user_id}") + if self.logger: + self.logger.info(f"🗑️ Вопрос {question.id} помечен как удаленный, пересчитаны номера для пользователя {to_user_id}") else: # Обычное обновление await conn.execute(""" @@ -663,9 +747,11 @@ class QuestionCRUD(BaseCRUD): )) await conn.commit() - logger.info(f"✅ Вопрос {question.id} обновлен") + if self.logger: + self.logger.info(f"✅ Вопрос {question.id} обновлен") return question + @track_db_operation("DELETE", "questions") async def delete(self, question_id: int) -> bool: """Удаление вопроса с пересчетом user_question_number""" async with self.get_connection() as conn: @@ -698,9 +784,11 @@ class QuestionCRUD(BaseCRUD): """, (to_user_id, deleted_number)) await conn.commit() - logger.info(f"🗑️ Вопрос {question_id} удален, пересчитаны номера для пользователя {to_user_id}") + if self.logger: + self.logger.info(f"🗑️ Вопрос {question_id} удален, пересчитаны номера для пользователя {to_user_id}") return True + @track_db_operation("SELECT", "questions_unread_count") async def get_unread_count(self, to_user_id: int) -> int: """Получение количества непрочитанных вопросов""" async with self.get_connection() as conn: @@ -711,6 +799,7 @@ class QuestionCRUD(BaseCRUD): row = await cursor.fetchone() return row[0] + @track_db_operation("SELECT", "questions_count_by_to_user") async def get_count_by_to_user(self, to_user_id: int, status: Optional[QuestionStatus] = None) -> int: """Получение общего количества вопросов для пользователя""" async with self.get_connection() as conn: @@ -725,6 +814,7 @@ class QuestionCRUD(BaseCRUD): row = await cursor.fetchone() return row[0] + @track_db_operation("SELECT", "questions_stats") async def get_stats(self) -> Dict[str, Any]: """Получение статистики вопросов""" async with self.get_connection() as conn: @@ -787,6 +877,7 @@ class QuestionCRUD(BaseCRUD): class UserBlockCRUD(BaseCRUD): """CRUD операции для блокировок пользователей""" + @track_db_operation("INSERT", "user_blocks") async def create(self, user_block: UserBlock) -> UserBlock: """Создание блокировки""" async with self.get_connection() as conn: @@ -801,6 +892,7 @@ class UserBlockCRUD(BaseCRUD): await conn.commit() return user_block + @track_db_operation("SELECT", "user_blocks") async def is_blocked(self, blocker_id: int, blocked_id: int) -> bool: """Проверка, заблокирован ли пользователь""" async with self.get_connection() as conn: @@ -811,6 +903,7 @@ class UserBlockCRUD(BaseCRUD): row = await cursor.fetchone() return row[0] > 0 + @track_db_operation("SELECT", "user_blocks") async def get_blocked_users(self, blocker_id: int) -> List[int]: """Получение списка заблокированных пользователей""" async with self.get_connection() as conn: @@ -821,6 +914,7 @@ class UserBlockCRUD(BaseCRUD): return [row[0] for row in rows] + @track_db_operation("DELETE", "user_blocks") async def delete(self, blocker_id: int, blocked_id: int) -> bool: """Удаление блокировки""" async with self.get_connection() as conn: @@ -835,6 +929,7 @@ class UserBlockCRUD(BaseCRUD): class UserSettingsCRUD(BaseCRUD): """CRUD операции для настроек пользователей""" + @track_db_operation("INSERT", "user_settings") async def create(self, settings: UserSettings) -> UserSettings: """Создание настроек пользователя""" async with self.get_connection() as conn: @@ -853,6 +948,7 @@ class UserSettingsCRUD(BaseCRUD): await conn.commit() return settings + @track_db_operation("SELECT", "user_settings") async def get_by_user_id(self, user_id: int) -> Optional[UserSettings]: """Получение настроек пользователя""" async with self.get_connection() as conn: @@ -864,6 +960,7 @@ class UserSettingsCRUD(BaseCRUD): return self._row_to_settings(row) return None + @track_db_operation("UPDATE", "user_settings") async def update(self, settings: UserSettings) -> UserSettings: """Обновление настроек пользователя""" async with self.get_connection() as conn: @@ -881,6 +978,7 @@ class UserSettingsCRUD(BaseCRUD): await conn.commit() return settings + @track_db_operation("DELETE", "user_settings") async def delete(self, user_id: int) -> bool: """Удаление настроек пользователя""" async with self.get_connection() as conn: diff --git a/.env_example b/env_example similarity index 100% rename from .env_example rename to env_example diff --git a/scripts/diagnose_db_connections.py b/scripts/diagnose_db_connections.py new file mode 100755 index 0000000..3aa6949 --- /dev/null +++ b/scripts/diagnose_db_connections.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +""" +Скрипт для диагностики проблем с соединениями БД +""" +import asyncio +import sys +import os +from pathlib import Path + +# Добавляем путь к проекту +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from database.crud import get_connection_pool +from services.infrastructure.metrics import get_metrics_service + + +async def diagnose_connections(): + """Диагностика соединений БД""" + print("🔍 Диагностика соединений БД AnonBot") + print("=" * 50) + + # Путь к БД + db_path = project_root / "database" / "anon_qna.db" + + if not db_path.exists(): + print(f"❌ База данных не найдена: {db_path}") + return + + print(f"📁 База данных: {db_path}") + + # Получаем пул соединений + pool = get_connection_pool(str(db_path)) + stats = pool.get_pool_stats() + + print("\n📊 Статистика пула соединений:") + print(f" • Размер пула: {stats['pool_size']}") + print(f" • Созданных соединений: {stats['created_connections']}") + print(f" • Доступных соединений: {stats['available_connections']}") + print(f" • Утилизация: {stats['utilization_percent']:.1f}%") + + # Анализ проблем + print("\n🔍 Анализ:") + + if stats['created_connections'] > stats['pool_size']: + print(f" ❌ КРИТИЧЕСКАЯ ПРОБЛЕМА: Создано {stats['created_connections']} соединений при лимите {stats['pool_size']}") + print(f" Это указывает на утечку соединений!") + elif stats['utilization_percent'] > 80: + print(f" ⚠️ ВНИМАНИЕ: Высокая утилизация пула ({stats['utilization_percent']:.1f}%)") + else: + print(f" ✅ Пул соединений работает нормально") + + # Проверяем метрики + print("\n📈 Метрики Prometheus:") + try: + metrics_service = get_metrics_service() + metrics_data = metrics_service.get_metrics() + + # Ищем метрики соединений + lines = metrics_data.decode('utf-8').split('\n') + connection_metrics = [line for line in lines if 'anon_bot_db_connections' in line or 'anon_bot_db_pool' in line] + + if connection_metrics: + for metric in connection_metrics: + if metric.strip(): + print(f" • {metric}") + else: + print(" • Метрики соединений не найдены") + + except Exception as e: + print(f" ❌ Ошибка получения метрик: {e}") + + # Рекомендации + print("\n💡 Рекомендации:") + + if stats['created_connections'] > stats['pool_size']: + print(" 1. Перезапустите AnonBot для сброса пула соединений") + print(" 2. Проверьте логи на наличие ошибок БД") + print(" 3. Убедитесь, что все соединения правильно закрываются") + print(" 4. Мониторьте метрики в Grafana") + elif stats['utilization_percent'] > 80: + print(" 1. Рассмотрите увеличение размера пула соединений") + print(" 2. Проверьте производительность запросов к БД") + print(" 3. Оптимизируйте часто используемые запросы") + else: + print(" 1. Продолжайте мониторинг метрик") + print(" 2. Настройте алерты в Grafana") + + print("\n🔧 Команды для мониторинга:") + print(" • Просмотр метрик: curl http://localhost:8081/metrics | grep anon_bot_db") + print(" • Проверка здоровья: curl http://localhost:8081/health") + print(" • Статус процесса: curl http://localhost:8081/status") + + +if __name__ == "__main__": + asyncio.run(diagnose_connections()) diff --git a/services/infrastructure/__init__.py b/services/infrastructure/__init__.py index a4c37e3..eeefe09 100644 --- a/services/infrastructure/__init__.py +++ b/services/infrastructure/__init__.py @@ -5,6 +5,8 @@ from .database import DatabaseService from .logger import get_logger, setup_logging from .metrics import MetricsService, get_metrics_service +from .metrics_updater import MetricsUpdater, get_metrics_updater, start_metrics_updater, stop_metrics_updater +from .db_metrics_decorator import track_db_operation, track_db_connection from .pid_manager import PIDManager, get_pid_manager, cleanup_pid_file from .logging_decorators import ( log_function_call, log_business_event, log_fsm_transition, @@ -21,6 +23,8 @@ __all__ = [ 'DatabaseService', 'get_logger', 'setup_logging', 'MetricsService', 'get_metrics_service', + 'MetricsUpdater', 'get_metrics_updater', 'start_metrics_updater', 'stop_metrics_updater', + 'track_db_operation', 'track_db_connection', 'PIDManager', 'get_pid_manager', 'cleanup_pid_file', 'log_function_call', 'log_business_event', 'log_fsm_transition', 'log_handler', 'log_service', 'log_business', 'log_fsm', diff --git a/services/infrastructure/database.py b/services/infrastructure/database.py index 23a8bca..840c262 100644 --- a/services/infrastructure/database.py +++ b/services/infrastructure/database.py @@ -22,11 +22,11 @@ class DatabaseService: def __init__(self, db_path: str): self.db_path = db_path - # Инициализируем CRUD операции - self.users = UserCRUD(db_path) - self.questions = QuestionCRUD(db_path) - self.user_blocks = UserBlockCRUD(db_path) - self.user_settings = UserSettingsCRUD(db_path) + # Инициализируем CRUD операции с передачей логгера + self.users = UserCRUD(db_path, logger) + self.questions = QuestionCRUD(db_path, logger) + self.user_blocks = UserBlockCRUD(db_path, logger) + self.user_settings = UserSettingsCRUD(db_path, logger) async def init(self): """Инициализация базы данных и создание таблиц""" @@ -59,7 +59,7 @@ class DatabaseService: return # Читаем схему из файла - schema_path = Path(__file__).parent.parent / "database" / "schema.sql" + schema_path = Path(__file__).parent.parent.parent / "database" / "schema.sql" if schema_path.exists(): logger.info("📄 Создание таблиц из схемы") diff --git a/services/infrastructure/db_metrics_decorator.py b/services/infrastructure/db_metrics_decorator.py new file mode 100644 index 0000000..0fa45a4 --- /dev/null +++ b/services/infrastructure/db_metrics_decorator.py @@ -0,0 +1,69 @@ +""" +Декоратор для автоматического сбора метрик базы данных +""" +import time +import functools +from typing import Callable, Any + +from .metrics import get_metrics_service +from .logger import get_logger + + +def track_db_operation(operation: str, table: str): + """ + Декоратор для отслеживания операций с базой данных + + Args: + operation: Тип операции (SELECT, INSERT, UPDATE, DELETE) + table: Название таблицы + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + metrics_service = get_metrics_service() + logger = get_logger(__name__) + start_time = time.time() + + try: + result = await func(*args, **kwargs) + duration = time.time() - start_time + + # Записываем успешную операцию + metrics_service.record_db_query(operation, table, "success", duration) + + return result + + except Exception as e: + duration = time.time() - start_time + + # Записываем неудачную операцию + metrics_service.record_db_query(operation, table, "error", duration) + metrics_service.increment_errors(type(e).__name__, "database_operation") + + logger.error(f"Database operation failed: {operation} on {table}: {e}") + raise + + return wrapper + return decorator + + +def track_db_connection(func: Callable) -> Callable: + """ + Декоратор для отслеживания соединений с базой данных + """ + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + metrics_service = get_metrics_service() + logger = get_logger(__name__) + + try: + result = await func(*args, **kwargs) + return result + + except Exception as e: + # Записываем только ошибки, не соединения + metrics_service.increment_errors(type(e).__name__, "database_connection") + logger.error(f"Database connection failed: {e}") + raise + + return wrapper diff --git a/services/infrastructure/http_server.py b/services/infrastructure/http_server.py index 6925980..9c61c61 100644 --- a/services/infrastructure/http_server.py +++ b/services/infrastructure/http_server.py @@ -6,7 +6,7 @@ import time from typing import Optional from aiohttp import ClientSession, web -from aiohttp.web import Request, Response +from aiohttp.web import Request, Response, json_response from loguru import logger from config.constants import DEFAULT_HTTP_HOST, DEFAULT_HTTP_PORT, APP_VERSION, HTTP_STATUS_OK, HTTP_STATUS_SERVICE_UNAVAILABLE, HTTP_STATUS_INTERNAL_SERVER_ERROR @@ -41,7 +41,8 @@ class HTTPServer: try: # Получаем метрики metrics_data = self.metrics_service.get_metrics() - content_type = self.metrics_service.get_content_type() + if isinstance(metrics_data, bytes): + metrics_data = metrics_data.decode('utf-8') # Записываем метрику HTTP запроса duration = time.time() - start_time @@ -50,7 +51,7 @@ class HTTPServer: return Response( text=metrics_data, - content_type=content_type, + content_type='text/plain; version=0.0.4', status=HTTP_STATUS_OK ) @@ -104,8 +105,8 @@ class HTTPServer: self.metrics_service.record_http_request_duration("GET", "/health", duration) self.metrics_service.increment_http_requests("GET", "/health", http_status) - return Response( - json=health_status, + return json_response( + health_status, status=http_status ) @@ -116,8 +117,8 @@ class HTTPServer: self.metrics_service.increment_http_requests("GET", "/health", 500) self.metrics_service.increment_errors(type(e).__name__, "health_handler") - return Response( - json={"status": "error", "message": str(e)}, + return json_response( + {"status": "error", "message": str(e)}, status=HTTP_STATUS_INTERNAL_SERVER_ERROR ) @@ -149,8 +150,8 @@ class HTTPServer: self.metrics_service.record_http_request_duration("GET", "/ready", duration) self.metrics_service.increment_http_requests("GET", "/ready", http_status) - return Response( - json=ready_status, + return json_response( + ready_status, status=http_status ) @@ -161,8 +162,8 @@ class HTTPServer: self.metrics_service.increment_http_requests("GET", "/ready", 500) self.metrics_service.increment_errors(type(e).__name__, "ready_handler") - return Response( - json={"status": "error", "message": str(e)}, + return json_response( + {"status": "error", "message": str(e)}, status=HTTP_STATUS_INTERNAL_SERVER_ERROR ) @@ -278,8 +279,8 @@ class HTTPServer: self.metrics_service.record_http_request_duration("GET", "/", duration) self.metrics_service.increment_http_requests("GET", "/", 200) - return Response( - json=info, + return json_response( + info, status=HTTP_STATUS_OK ) @@ -290,8 +291,8 @@ class HTTPServer: self.metrics_service.increment_http_requests("GET", "/", 500) self.metrics_service.increment_errors(type(e).__name__, "root_handler") - return Response( - json={"error": str(e)}, + return json_response( + {"error": str(e)}, status=HTTP_STATUS_INTERNAL_SERVER_ERROR ) diff --git a/services/infrastructure/metrics.py b/services/infrastructure/metrics.py index 75621f2..3f50537 100644 --- a/services/infrastructure/metrics.py +++ b/services/infrastructure/metrics.py @@ -127,6 +127,27 @@ class MetricsService: ['status'] ) + # Метрики пула соединений + self.db_pool_size = Gauge( + 'anon_bot_db_pool_size', + 'Database connection pool size' + ) + + self.db_pool_created_connections = Gauge( + 'anon_bot_db_pool_created_connections', + 'Number of created connections in pool' + ) + + self.db_pool_available_connections = Gauge( + 'anon_bot_db_pool_available_connections', + 'Number of available connections in pool' + ) + + self.db_pool_utilization_percent = Gauge( + 'anon_bot_db_pool_utilization_percent', + 'Database connection pool utilization percentage' + ) + # Метрики пагинации self.pagination_requests_total = Counter( 'anon_bot_pagination_requests_total', @@ -237,13 +258,25 @@ class MetricsService: self.db_query_duration.labels(operation=operation, table=table).observe(duration) def record_db_connection(self, status: str): - """Записать метрики подключения к БД""" + """Записать метрики подключения к БД (только для реальных соединений пула)""" self.db_connections_total.labels(status=status).inc() if status == "opened": self.db_connections_active.inc() elif status == "closed": self.db_connections_active.dec() + def update_db_connections_from_pool(self, active_count: int): + """Обновить количество активных соединений на основе реального пула""" + # Сбрасываем счетчик и устанавливаем реальное значение + self.db_connections_active.set(active_count) + + def update_db_pool_metrics(self, pool_stats: dict): + """Обновить метрики пула соединений""" + self.db_pool_size.set(pool_stats.get("pool_size", 0)) + self.db_pool_created_connections.set(pool_stats.get("created_connections", 0)) + self.db_pool_available_connections.set(pool_stats.get("available_connections", 0)) + self.db_pool_utilization_percent.set(pool_stats.get("utilization_percent", 0)) + def record_pagination_time(self, entity_type: str, duration: float, method: str = "cursor"): """Записать время пагинации""" self.pagination_requests_total.labels(entity_type=entity_type, method=method).inc() diff --git a/services/infrastructure/metrics_updater.py b/services/infrastructure/metrics_updater.py new file mode 100644 index 0000000..31592df --- /dev/null +++ b/services/infrastructure/metrics_updater.py @@ -0,0 +1,196 @@ +""" +Сервис для периодического обновления метрик +""" +import asyncio +import time +from typing import Optional + +from .metrics import get_metrics_service +from .database import DatabaseService +from .logger import get_logger + + +class MetricsUpdater: + """Сервис для периодического обновления метрик""" + + def __init__(self, update_interval: int = 30, db_path: str = None): + self.update_interval = update_interval + self.metrics_service = get_metrics_service() + self.database_service: Optional[DatabaseService] = None + self.db_path = db_path + self._running = False + self._task: Optional[asyncio.Task] = None + self.logger = get_logger(__name__) + + async def start(self): + """Запустить обновление метрик""" + if self._running: + self.logger.warning("MetricsUpdater уже запущен") + return + + # Создаем DatabaseService если путь к БД указан + if self.db_path: + self.database_service = DatabaseService(self.db_path) + await self.database_service.init() + + self._running = True + self._task = asyncio.create_task(self._update_loop()) + self.logger.info(f"📊 MetricsUpdater запущен с интервалом {self.update_interval} секунд") + + async def stop(self): + """Остановить обновление метрик""" + if not self._running: + return + + self._running = False + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + # Логгер недоступен в stop, так как объект может быть уже уничтожен + pass + + async def _update_loop(self): + """Основной цикл обновления метрик""" + while self._running: + try: + await self._update_metrics() + await asyncio.sleep(self.update_interval) + except asyncio.CancelledError: + break + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик: {e}") + await asyncio.sleep(self.update_interval) + + async def _update_metrics(self): + """Обновление всех метрик""" + try: + # Обновляем активных пользователей + await self._update_active_users() + + # Обновляем активные вопросы + await self._update_active_questions() + + # Обновляем метрики БД + await self._update_database_metrics() + + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик: {e}") + self.metrics_service.increment_errors(type(e).__name__, "metrics_updater") + + async def _update_active_users(self): + """Обновление количества активных пользователей""" + try: + if not self.database_service: + return + + # Подсчитываем активных пользователей за последние 24 часа + async with self.database_service.get_connection() as conn: + cursor = await conn.execute(""" + SELECT COUNT(*) FROM users + WHERE is_active = 1 + AND updated_at > datetime('now', '-24 hours') + """) + result = await cursor.fetchone() + active_users_count = result[0] if result else 0 + + self.metrics_service.set_active_users(active_users_count) + self.logger.debug(f"Обновлено количество активных пользователей: {active_users_count}") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении активных пользователей: {e}") + + async def _update_active_questions(self): + """Обновление количества активных вопросов""" + try: + if not self.database_service: + return + + # Подсчитываем активные вопросы (pending и processing) + async with self.database_service.get_connection() as conn: + cursor = await conn.execute(""" + SELECT COUNT(*) FROM questions + WHERE status IN ('pending', 'processing') + """) + result = await cursor.fetchone() + active_questions_count = result[0] if result else 0 + + self.metrics_service.set_active_questions(active_questions_count) + self.logger.debug(f"Обновлено количество активных вопросов: {active_questions_count}") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении активных вопросов: {e}") + + async def _update_database_metrics(self): + """Обновление метрик базы данных""" + try: + if not self.database_service: + return + + # Проверяем соединение с БД + start_time = time.time() + try: + await self.database_service.check_connection() + duration = time.time() - start_time + + # Записываем успешное соединение (только для статистики, не для активных соединений) + self.metrics_service.record_db_query("health_check", "connection", "success", duration) + + # Обновляем метрики пула соединений + await self._update_pool_metrics() + + except Exception as e: + duration = time.time() - start_time + # Записываем неудачное соединение (только для статистики) + self.metrics_service.record_db_query("health_check", "connection", "error", duration) + self.metrics_service.increment_errors(type(e).__name__, "database_health_check") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик БД: {e}") + + async def _update_pool_metrics(self): + """Обновление метрик пула соединений""" + try: + from database.crud import get_connection_pool + pool = get_connection_pool(self.database_service.db_path) + pool_stats = pool.get_pool_stats() + self.metrics_service.update_db_pool_metrics(pool_stats) + + # Обновляем реальное количество активных соединений из пула + created_connections = pool_stats.get("created_connections", 0) + self.metrics_service.update_db_connections_from_pool(created_connections) + + # Логируем предупреждение если утилизация пула превышает 80% + if pool_stats.get("utilization_percent", 0) > 80: + self.logger.warning(f"Высокая утилизация пула соединений: {pool_stats}") + + except Exception as e: + self.logger.error(f"Ошибка при обновлении метрик пула: {e}") + + +# Глобальный экземпляр +_metrics_updater: Optional[MetricsUpdater] = None + + +def get_metrics_updater(update_interval: int = 30, db_path: str = None) -> MetricsUpdater: + """Получить экземпляр MetricsUpdater""" + global _metrics_updater + if _metrics_updater is None: + _metrics_updater = MetricsUpdater(update_interval, db_path) + return _metrics_updater + + +async def start_metrics_updater(update_interval: int = 30, db_path: str = None): + """Запустить обновление метрик""" + updater = get_metrics_updater(update_interval, db_path) + await updater.start() + + +async def stop_metrics_updater(): + """Остановить обновление метрик""" + global _metrics_updater + if _metrics_updater: + await _metrics_updater.stop() + _metrics_updater = None