Files
AnonBot/tests/benchmark_db_performance.py

484 lines
22 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Benchmark тесты для проверки производительности БД
"""
import asyncio
import time
import statistics
from typing import List, Dict, Any
from dataclasses import dataclass
from models.user import User
from models.question import Question, QuestionStatus
from services.infrastructure.database import DatabaseService
# from services.business.optimized_pagination_service import OptimizedPaginationService, PaginationCursor # Файл удален
from services.infrastructure.metrics import get_metrics_service
@dataclass
class BenchmarkResult:
"""Результат benchmark теста"""
operation: str
duration: float
items_processed: int
items_per_second: float
memory_usage: float = 0.0
error_count: int = 0
class DatabaseBenchmark:
"""Класс для проведения benchmark тестов БД"""
def __init__(self, database: DatabaseService):
self.database = database
# self.pagination_service = OptimizedPaginationService() # Файл удален
self.metrics = get_metrics_service()
self.results: List[BenchmarkResult] = []
async def run_all_benchmarks(self) -> Dict[str, Any]:
"""Запуск всех benchmark тестов"""
print("🚀 Запуск benchmark тестов производительности БД...")
# Подготовка тестовых данных
await self._prepare_test_data()
# Запуск тестов
benchmarks = [
("single_insert_users", self._benchmark_single_insert_users),
("batch_insert_users", self._benchmark_batch_insert_users),
("single_insert_questions", self._benchmark_single_insert_questions),
("batch_insert_questions", self._benchmark_batch_insert_questions),
("offset_pagination", self._benchmark_offset_pagination),
("cursor_pagination", self._benchmark_cursor_pagination),
("n_plus_one_query", self._benchmark_n_plus_one_query),
("optimized_join_query", self._benchmark_optimized_join_query),
]
for name, benchmark_func in benchmarks:
try:
print(f"📊 Запуск теста: {name}")
result = await benchmark_func()
self.results.append(result)
print(f"{name}: {result.items_per_second:.2f} items/sec")
except Exception as e:
print(f"❌ Ошибка в тесте {name}: {e}")
self.results.append(BenchmarkResult(
operation=name,
duration=0.0,
items_processed=0,
items_per_second=0.0,
error_count=1
))
# Очистка тестовых данных
await self._cleanup_test_data()
return self._generate_report()
async def _prepare_test_data(self):
"""Подготовка тестовых данных"""
print("📝 Подготовка тестовых данных...")
# Создаем тестовых пользователей
self.test_users = []
for i in range(1000):
user = User(
telegram_id=9000000 + i,
username=f"test_user_{i}",
first_name=f"Test{i}",
last_name="User",
chat_id=9000000 + i,
profile_link=f"test_link_{i}",
is_active=True,
is_superuser=False
)
self.test_users.append(user)
# Создаем тестовые вопросы
self.test_questions = []
for i in range(5000):
question = Question(
from_user_id=9000000 + (i % 100), # Циклически используем пользователей
to_user_id=9000000 + ((i + 1) % 100),
message_text=f"Test question {i}",
status=QuestionStatus.PENDING
)
self.test_questions.append(question)
print(f"✅ Подготовлено {len(self.test_users)} пользователей и {len(self.test_questions)} вопросов")
async def _cleanup_test_data(self):
"""Очистка тестовых данных"""
print("🧹 Очистка тестовых данных...")
# Удаляем тестовых пользователей (вопросы удалятся каскадно)
for user in self.test_users[:10]: # Удаляем только первых 10 для скорости
try:
await self.database.users.delete(user.telegram_id)
except:
pass # Игнорируем ошибки при очистке
async def _benchmark_single_insert_users(self) -> BenchmarkResult:
"""Benchmark одиночной вставки пользователей"""
start_time = time.time()
items_processed = 0
for user in self.test_users[:100]: # Тестируем на 100 пользователях
try:
await self.database.create_user(user)
items_processed += 1
except Exception as e:
print(f"Ошибка при создании пользователя: {e}")
duration = time.time() - start_time
return BenchmarkResult(
operation="single_insert_users",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_batch_insert_users(self) -> BenchmarkResult:
"""Benchmark batch вставки пользователей"""
start_time = time.time()
# Создаем новые пользователей для batch теста
batch_users = []
for i in range(100):
user = User(
telegram_id=8000000 + i,
username=f"batch_user_{i}",
first_name=f"Batch{i}",
last_name="User",
chat_id=8000000 + i,
profile_link=f"batch_link_{i}",
is_active=True,
is_superuser=False
)
batch_users.append(user)
try:
await self.database.create_users_batch(batch_users)
items_processed = len(batch_users)
except Exception as e:
print(f"Ошибка при batch создании пользователей: {e}")
items_processed = 0
duration = time.time() - start_time
return BenchmarkResult(
operation="batch_insert_users",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_single_insert_questions(self) -> BenchmarkResult:
"""Benchmark одиночной вставки вопросов"""
start_time = time.time()
items_processed = 0
for question in self.test_questions[:100]: # Тестируем на 100 вопросах
try:
await self.database.create_question(question)
items_processed += 1
except Exception as e:
print(f"Ошибка при создании вопроса: {e}")
duration = time.time() - start_time
return BenchmarkResult(
operation="single_insert_questions",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_batch_insert_questions(self) -> BenchmarkResult:
"""Benchmark batch вставки вопросов"""
start_time = time.time()
# Создаем новые вопросы для batch теста
batch_questions = []
for i in range(100):
question = Question(
from_user_id=8000000 + (i % 10),
to_user_id=8000000 + ((i + 1) % 10),
message_text=f"Batch question {i}",
status=QuestionStatus.PENDING
)
batch_questions.append(question)
try:
await self.database.create_questions_batch(batch_questions)
items_processed = len(batch_questions)
except Exception as e:
print(f"Ошибка при batch создании вопросов: {e}")
items_processed = 0
duration = time.time() - start_time
return BenchmarkResult(
operation="batch_insert_questions",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_offset_pagination(self) -> BenchmarkResult:
"""Benchmark offset-based пагинации"""
start_time = time.time()
items_processed = 0
# Тестируем пагинацию с offset
for offset in range(0, 1000, 50): # 20 страниц по 50 элементов
try:
questions = await self.database.get_user_questions(
to_user_id=9000000, # Используем первого тестового пользователя
limit=50,
offset=offset
)
items_processed += len(questions)
except Exception as e:
print(f"Ошибка при offset пагинации: {e}")
duration = time.time() - start_time
return BenchmarkResult(
operation="offset_pagination",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_cursor_pagination(self) -> BenchmarkResult:
"""Benchmark cursor-based пагинации"""
start_time = time.time()
items_processed = 0
try:
# Тестируем cursor-based пагинацию
cursor = None
for _ in range(20): # 20 страниц
result = await self.pagination_service.paginate_questions(
database=self.database,
to_user_id=9000000,
cursor=cursor,
limit=50
)
items_processed += len(result.items)
cursor = result.cursor
if not result.has_next:
break
except Exception as e:
print(f"Ошибка при cursor пагинации: {e}")
duration = time.time() - start_time
return BenchmarkResult(
operation="cursor_pagination",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_n_plus_one_query(self) -> BenchmarkResult:
"""Benchmark N+1 запроса (неоптимизированная версия)"""
start_time = time.time()
items_processed = 0
try:
# Получаем вопросы
questions = await self.database.get_user_questions(
to_user_id=9000000,
limit=100
)
# Для каждого вопроса делаем отдельный запрос к БД (N+1 проблема)
for question in questions:
try:
if question.from_user_id:
user = await self.database.get_user(question.from_user_id)
if user:
items_processed += 1
except Exception as e:
print(f"Ошибка при получении пользователя: {e}")
except Exception as e:
print(f"Ошибка при N+1 запросе: {e}")
duration = time.time() - start_time
return BenchmarkResult(
operation="n_plus_one_query",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
async def _benchmark_optimized_join_query(self) -> BenchmarkResult:
"""Benchmark оптимизированного JOIN запроса"""
start_time = time.time()
items_processed = 0
try:
# Используем оптимизированный запрос с JOIN
questions_with_authors = await self.database.get_user_questions_with_authors(
user_id=9000000,
limit=100
)
items_processed = len(questions_with_authors)
except Exception as e:
print(f"Ошибка при оптимизированном JOIN запросе: {e}")
duration = time.time() - start_time
return BenchmarkResult(
operation="optimized_join_query",
duration=duration,
items_processed=items_processed,
items_per_second=items_processed / duration if duration > 0 else 0
)
def _generate_report(self) -> Dict[str, Any]:
"""Генерация отчета по результатам benchmark"""
if not self.results:
return {"error": "Нет результатов для анализа"}
# Группируем результаты по типам операций
operation_groups = {}
for result in self.results:
if result.operation not in operation_groups:
operation_groups[result.operation] = []
operation_groups[result.operation].append(result)
# Анализируем производительность
analysis = {}
for operation, results in operation_groups.items():
if results:
avg_performance = statistics.mean([r.items_per_second for r in results])
max_performance = max([r.items_per_second for r in results])
min_performance = min([r.items_per_second for r in results])
analysis[operation] = {
"avg_items_per_second": round(avg_performance, 2),
"max_items_per_second": round(max_performance, 2),
"min_items_per_second": round(min_performance, 2),
"total_tests": len(results),
"error_rate": sum(1 for r in results if r.error_count > 0) / len(results)
}
# Сравнение производительности
comparisons = {}
if "single_insert_users" in analysis and "batch_insert_users" in analysis:
single_perf = analysis["single_insert_users"]["avg_items_per_second"]
batch_perf = analysis["batch_insert_users"]["avg_items_per_second"]
comparisons["batch_vs_single_users"] = {
"batch_performance": batch_perf,
"single_performance": single_perf,
"improvement_factor": round(batch_perf / single_perf, 2) if single_perf > 0 else 0
}
if "offset_pagination" in analysis and "cursor_pagination" in analysis:
offset_perf = analysis["offset_pagination"]["avg_items_per_second"]
cursor_perf = analysis["cursor_pagination"]["avg_items_per_second"]
comparisons["cursor_vs_offset_pagination"] = {
"cursor_performance": cursor_perf,
"offset_performance": offset_perf,
"improvement_factor": round(cursor_perf / offset_perf, 2) if offset_perf > 0 else 0
}
if "n_plus_one_query" in analysis and "optimized_join_query" in analysis:
n_plus_one_perf = analysis["n_plus_one_query"]["avg_items_per_second"]
join_perf = analysis["optimized_join_query"]["avg_items_per_second"]
comparisons["join_vs_n_plus_one"] = {
"join_performance": join_perf,
"n_plus_one_performance": n_plus_one_perf,
"improvement_factor": round(join_perf / n_plus_one_perf, 2) if n_plus_one_perf > 0 else 0
}
return {
"summary": {
"total_benchmarks": len(self.results),
"successful_benchmarks": len([r for r in self.results if r.error_count == 0]),
"failed_benchmarks": len([r for r in self.results if r.error_count > 0])
},
"performance_analysis": analysis,
"performance_comparisons": comparisons,
"recommendations": self._generate_recommendations(analysis, comparisons)
}
def _generate_recommendations(self, analysis: Dict, comparisons: Dict) -> List[str]:
"""Генерация рекомендаций по оптимизации"""
recommendations = []
# Рекомендации по batch операциям
if "batch_vs_single_users" in comparisons:
improvement = comparisons["batch_vs_single_users"]["improvement_factor"]
if improvement > 2:
recommendations.append(f"✅ Batch операции показывают улучшение в {improvement}x раз - рекомендуется использовать для массовых вставок")
else:
recommendations.append("⚠️ Batch операции не показывают значительного улучшения - возможно, стоит пересмотреть реализацию")
# Рекомендации по пагинации
if "cursor_vs_offset_pagination" in comparisons:
improvement = comparisons["cursor_vs_offset_pagination"]["improvement_factor"]
if improvement > 1.5:
recommendations.append(f"✅ Cursor-based пагинация показывает улучшение в {improvement}x раз - рекомендуется для больших таблиц")
else:
recommendations.append("⚠️ Cursor-based пагинация не показывает значительного улучшения - возможно, данных недостаточно для демонстрации преимуществ")
# Рекомендации по JOIN запросам
if "join_vs_n_plus_one" in comparisons:
improvement = comparisons["join_vs_n_plus_one"]["improvement_factor"]
if improvement > 5:
recommendations.append(f"✅ JOIN запросы показывают улучшение в {improvement}x раз - критически важно избегать N+1 проблем")
else:
recommendations.append("⚠️ JOIN запросы не показывают ожидаемого улучшения - возможно, нужно больше данных для тестирования")
# Общие рекомендации
if not recommendations:
recommendations.append("📊 Недостаточно данных для генерации рекомендаций - увеличьте объем тестовых данных")
return recommendations
async def run_database_benchmark():
"""Запуск benchmark тестов БД"""
try:
# Инициализация БД
database = DatabaseService()
await database.init()
# Запуск benchmark
benchmark = DatabaseBenchmark(database)
results = await benchmark.run_all_benchmarks()
# Вывод результатов
print("\n" + "="*80)
print("📊 РЕЗУЛЬТАТЫ BENCHMARK ТЕСТОВ ПРОИЗВОДИТЕЛЬНОСТИ БД")
print("="*80)
print(f"\n📈 Общая статистика:")
print(f" Всего тестов: {results['summary']['total_benchmarks']}")
print(f" Успешных: {results['summary']['successful_benchmarks']}")
print(f" Неудачных: {results['summary']['failed_benchmarks']}")
print(f"\n⚡ Анализ производительности:")
for operation, stats in results['performance_analysis'].items():
print(f" {operation}:")
print(f" Средняя производительность: {stats['avg_items_per_second']} items/sec")
print(f" Максимальная: {stats['max_items_per_second']} items/sec")
print(f" Минимальная: {stats['min_items_per_second']} items/sec")
print(f" Ошибок: {stats['error_rate']:.1%}")
print(f"\n🔄 Сравнения производительности:")
for comparison, stats in results['performance_comparisons'].items():
print(f" {comparison}:")
print(f" Улучшение в {stats['improvement_factor']}x раз")
print(f"\n💡 Рекомендации:")
for recommendation in results['recommendations']:
print(f" {recommendation}")
print("\n" + "="*80)
except Exception as e:
print(f"❌ Ошибка при запуске benchmark: {e}")
if __name__ == "__main__":
asyncio.run(run_database_benchmark())