Refactor metrics handling and remove scheduler
- Removed the metrics scheduler functionality from the bot, transitioning to real-time metrics updates via middleware. - Enhanced logging for metrics operations across various handlers to improve monitoring and debugging capabilities. - Integrated metrics tracking for user activities and database errors, providing better insights into bot performance. - Cleaned up code by removing obsolete comments and unused imports, improving overall readability and maintainability.
This commit is contained in:
@@ -70,6 +70,14 @@ class BotMetrics:
|
||||
registry=self.registry
|
||||
)
|
||||
|
||||
# Database errors counter
|
||||
self.db_errors_total = Counter(
|
||||
'db_errors_total',
|
||||
'Total number of database errors',
|
||||
['error_type', 'query_type', 'table_name', 'operation'],
|
||||
registry=self.registry
|
||||
)
|
||||
|
||||
# Message processing metrics
|
||||
self.messages_processed_total = Counter(
|
||||
'messages_processed_total',
|
||||
@@ -92,7 +100,14 @@ class BotMetrics:
|
||||
self.rate_limit_hits_total = Counter(
|
||||
'rate_limit_hits_total',
|
||||
'Total number of rate limit hits',
|
||||
['limit_type', 'handler_type'],
|
||||
['limit_type', 'user_id', 'action'],
|
||||
registry=self.registry
|
||||
)
|
||||
# User activity metrics
|
||||
self.user_activity_total = Counter(
|
||||
'user_activity_total',
|
||||
'Total user activity events',
|
||||
['activity_type', 'user_type', 'chat_type'],
|
||||
registry=self.registry
|
||||
)
|
||||
|
||||
@@ -121,8 +136,8 @@ class BotMetrics:
|
||||
status=status
|
||||
).observe(duration)
|
||||
|
||||
def set_active_users(self, count: int, user_type: str = "total"):
|
||||
"""Set the number of active users."""
|
||||
def set_active_users(self, count: int, user_type: str = "daily"):
|
||||
"""Set the number of active users for a specific type."""
|
||||
self.active_users.labels(user_type=user_type).set(count)
|
||||
|
||||
def record_db_query(self, query_type: str, duration: float, table_name: str = "unknown", operation: str = "unknown"):
|
||||
@@ -275,6 +290,12 @@ def db_query_time(query_type: str = "unknown", table_name: str = "unknown", oper
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_db_query(query_type, duration, table_name, operation)
|
||||
metrics.record_db_error(
|
||||
type(e).__name__,
|
||||
query_type,
|
||||
table_name,
|
||||
operation
|
||||
)
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
"database",
|
||||
@@ -293,6 +314,12 @@ def db_query_time(query_type: str = "unknown", table_name: str = "unknown", oper
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
metrics.record_db_query(query_type, duration, table_name, operation)
|
||||
metrics.record_db_error(
|
||||
type(e).__name__,
|
||||
query_type,
|
||||
table_name,
|
||||
operation
|
||||
)
|
||||
metrics.record_error(
|
||||
type(e).__name__,
|
||||
"database",
|
||||
|
||||
@@ -1,206 +0,0 @@
|
||||
"""
|
||||
Metrics Scheduler for periodic updates of bot metrics.
|
||||
Automatically updates active users, system metrics, and other periodic data.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone, timedelta
|
||||
from typing import Optional
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
|
||||
from .metrics import metrics
|
||||
from .base_dependency_factory import get_global_instance
|
||||
|
||||
|
||||
class MetricsScheduler:
|
||||
"""Scheduler for periodic metrics updates."""
|
||||
|
||||
def __init__(self):
|
||||
self.scheduler = AsyncIOScheduler()
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.bdf = None
|
||||
self.bot_db = None
|
||||
self.is_running = False
|
||||
|
||||
async def initialize(self):
|
||||
"""Initialize scheduler with database connection."""
|
||||
try:
|
||||
self.bdf = get_global_instance()
|
||||
self.bot_db = self.bdf.get_db()
|
||||
self.logger.info("✅ Metrics scheduler initialized successfully")
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to initialize metrics scheduler: {e}")
|
||||
|
||||
def start_scheduler(self):
|
||||
"""Start the metrics scheduler."""
|
||||
if self.is_running:
|
||||
self.logger.warning("⚠️ Metrics scheduler is already running")
|
||||
return
|
||||
|
||||
try:
|
||||
# Update active users every 5 minutes
|
||||
self.scheduler.add_job(
|
||||
self._update_active_users_metric,
|
||||
IntervalTrigger(minutes=5),
|
||||
id='update_active_users',
|
||||
name='Update Active Users Metric',
|
||||
replace_existing=True
|
||||
)
|
||||
|
||||
# Update system metrics every minute
|
||||
self.scheduler.add_job(
|
||||
self._update_system_metrics,
|
||||
IntervalTrigger(minutes=1),
|
||||
id='update_system_metrics',
|
||||
name='Update System Metrics',
|
||||
replace_existing=True
|
||||
)
|
||||
|
||||
# Daily metrics reset at midnight
|
||||
self.scheduler.add_job(
|
||||
self._daily_metrics_reset,
|
||||
CronTrigger(hour=0, minute=0),
|
||||
id='daily_metrics_reset',
|
||||
name='Daily Metrics Reset',
|
||||
replace_existing=True
|
||||
)
|
||||
|
||||
# Start scheduler
|
||||
self.scheduler.start()
|
||||
self.is_running = True
|
||||
self.logger.info("✅ Metrics scheduler started successfully")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to start metrics scheduler: {e}")
|
||||
|
||||
def stop_scheduler(self):
|
||||
"""Stop the metrics scheduler."""
|
||||
if not self.is_running:
|
||||
self.logger.warning("⚠️ Metrics scheduler is not running")
|
||||
return
|
||||
|
||||
try:
|
||||
self.scheduler.shutdown()
|
||||
self.is_running = False
|
||||
self.logger.info("✅ Metrics scheduler stopped successfully")
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to stop metrics scheduler: {e}")
|
||||
|
||||
async def _update_active_users_metric(self):
|
||||
"""Update active users metric from database."""
|
||||
try:
|
||||
if not self.bot_db:
|
||||
await self.initialize()
|
||||
if not self.bot_db:
|
||||
self.logger.warning("⚠️ Cannot update active users: no database connection")
|
||||
return
|
||||
|
||||
self.logger.debug("📊 Updating active users metric...")
|
||||
|
||||
# Count active users (last 24 hours)
|
||||
import time
|
||||
current_timestamp = int(time.time())
|
||||
one_day_ago = current_timestamp - (24 * 60 * 60)
|
||||
|
||||
active_users_query = """
|
||||
SELECT COUNT(DISTINCT user_id) as active_users
|
||||
FROM our_users
|
||||
WHERE date_changed > ?
|
||||
"""
|
||||
|
||||
await self.bot_db.connect()
|
||||
await self.bot_db.cursor.execute(active_users_query, (one_day_ago,))
|
||||
result = await self.bot_db.cursor.fetchone()
|
||||
active_users = result[0] if result else 0
|
||||
await self.bot_db.close()
|
||||
|
||||
# Update metrics
|
||||
metrics.set_active_users(active_users, "daily")
|
||||
metrics.set_active_users(active_users, "total")
|
||||
|
||||
self.logger.debug(f"📊 Active users updated: {active_users}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to update active users metric: {e}")
|
||||
# Set fallback value
|
||||
metrics.set_active_users(0, "daily")
|
||||
metrics.set_active_users(0, "total")
|
||||
|
||||
async def _update_system_metrics(self):
|
||||
"""Update system-related metrics."""
|
||||
try:
|
||||
# You can add system metrics here (CPU, memory, etc.)
|
||||
# For now, we'll just log that the job is running
|
||||
self.logger.debug("📊 System metrics update job running...")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to update system metrics: {e}")
|
||||
|
||||
async def _daily_metrics_reset(self):
|
||||
"""Reset daily metrics at midnight."""
|
||||
try:
|
||||
self.logger.info("🔄 Daily metrics reset job running...")
|
||||
|
||||
# You can add daily metrics reset logic here
|
||||
# For example, reset daily counters, update retention metrics, etc.
|
||||
|
||||
self.logger.info("✅ Daily metrics reset completed")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"❌ Failed to reset daily metrics: {e}")
|
||||
|
||||
async def force_update_active_users(self):
|
||||
"""Force update of active users metric (for testing)."""
|
||||
await self._update_active_users_metric()
|
||||
|
||||
def get_scheduler_status(self) -> dict:
|
||||
"""Get current scheduler status."""
|
||||
if not self.is_running:
|
||||
return {"status": "stopped", "jobs": 0}
|
||||
|
||||
jobs = self.scheduler.get_jobs()
|
||||
return {
|
||||
"status": "running",
|
||||
"jobs": len(jobs),
|
||||
"job_details": [
|
||||
{
|
||||
"id": job.id,
|
||||
"name": job.name,
|
||||
"next_run": job.next_run_time.isoformat() if job.next_run_time else None
|
||||
}
|
||||
for job in jobs
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
# Global metrics scheduler instance
|
||||
metrics_scheduler: Optional[MetricsScheduler] = None
|
||||
|
||||
|
||||
def get_metrics_scheduler() -> MetricsScheduler:
|
||||
"""Get global metrics scheduler instance."""
|
||||
global metrics_scheduler
|
||||
if metrics_scheduler is None:
|
||||
metrics_scheduler = MetricsScheduler()
|
||||
return metrics_scheduler
|
||||
|
||||
|
||||
async def start_metrics_scheduler() -> MetricsScheduler:
|
||||
"""Start metrics scheduler and return instance."""
|
||||
global metrics_scheduler
|
||||
if metrics_scheduler is None:
|
||||
metrics_scheduler = MetricsScheduler()
|
||||
|
||||
await metrics_scheduler.initialize()
|
||||
metrics_scheduler.start_scheduler()
|
||||
return metrics_scheduler
|
||||
|
||||
|
||||
def stop_metrics_scheduler():
|
||||
"""Stop metrics scheduler if running."""
|
||||
global metrics_scheduler
|
||||
if metrics_scheduler:
|
||||
metrics_scheduler.stop_scheduler()
|
||||
Reference in New Issue
Block a user