""" Benchmark тесты для проверки производительности БД """ import asyncio import time import statistics from typing import List, Dict, Any from dataclasses import dataclass from models.user import User from models.question import Question, QuestionStatus from services.infrastructure.database import DatabaseService # from services.business.optimized_pagination_service import OptimizedPaginationService, PaginationCursor # Файл удален from services.infrastructure.metrics import get_metrics_service @dataclass class BenchmarkResult: """Результат benchmark теста""" operation: str duration: float items_processed: int items_per_second: float memory_usage: float = 0.0 error_count: int = 0 class DatabaseBenchmark: """Класс для проведения benchmark тестов БД""" def __init__(self, database: DatabaseService): self.database = database # self.pagination_service = OptimizedPaginationService() # Файл удален self.metrics = get_metrics_service() self.results: List[BenchmarkResult] = [] async def run_all_benchmarks(self) -> Dict[str, Any]: """Запуск всех benchmark тестов""" print("🚀 Запуск benchmark тестов производительности БД...") # Подготовка тестовых данных await self._prepare_test_data() # Запуск тестов benchmarks = [ ("single_insert_users", self._benchmark_single_insert_users), ("batch_insert_users", self._benchmark_batch_insert_users), ("single_insert_questions", self._benchmark_single_insert_questions), ("batch_insert_questions", self._benchmark_batch_insert_questions), ("offset_pagination", self._benchmark_offset_pagination), ("cursor_pagination", self._benchmark_cursor_pagination), ("n_plus_one_query", self._benchmark_n_plus_one_query), ("optimized_join_query", self._benchmark_optimized_join_query), ] for name, benchmark_func in benchmarks: try: print(f"📊 Запуск теста: {name}") result = await benchmark_func() self.results.append(result) print(f"✅ {name}: {result.items_per_second:.2f} items/sec") except Exception as e: print(f"❌ Ошибка в тесте {name}: {e}") self.results.append(BenchmarkResult( operation=name, duration=0.0, items_processed=0, items_per_second=0.0, error_count=1 )) # Очистка тестовых данных await self._cleanup_test_data() return self._generate_report() async def _prepare_test_data(self): """Подготовка тестовых данных""" print("📝 Подготовка тестовых данных...") # Создаем тестовых пользователей self.test_users = [] for i in range(1000): user = User( telegram_id=9000000 + i, username=f"test_user_{i}", first_name=f"Test{i}", last_name="User", chat_id=9000000 + i, profile_link=f"test_link_{i}", is_active=True, is_superuser=False ) self.test_users.append(user) # Создаем тестовые вопросы self.test_questions = [] for i in range(5000): question = Question( from_user_id=9000000 + (i % 100), # Циклически используем пользователей to_user_id=9000000 + ((i + 1) % 100), message_text=f"Test question {i}", status=QuestionStatus.PENDING ) self.test_questions.append(question) print(f"✅ Подготовлено {len(self.test_users)} пользователей и {len(self.test_questions)} вопросов") async def _cleanup_test_data(self): """Очистка тестовых данных""" print("🧹 Очистка тестовых данных...") # Удаляем тестовых пользователей (вопросы удалятся каскадно) for user in self.test_users[:10]: # Удаляем только первых 10 для скорости try: await self.database.users.delete(user.telegram_id) except: pass # Игнорируем ошибки при очистке async def _benchmark_single_insert_users(self) -> BenchmarkResult: """Benchmark одиночной вставки пользователей""" start_time = time.time() items_processed = 0 for user in self.test_users[:100]: # Тестируем на 100 пользователях try: await self.database.create_user(user) items_processed += 1 except Exception as e: print(f"Ошибка при создании пользователя: {e}") duration = time.time() - start_time return BenchmarkResult( operation="single_insert_users", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_batch_insert_users(self) -> BenchmarkResult: """Benchmark batch вставки пользователей""" start_time = time.time() # Создаем новые пользователей для batch теста batch_users = [] for i in range(100): user = User( telegram_id=8000000 + i, username=f"batch_user_{i}", first_name=f"Batch{i}", last_name="User", chat_id=8000000 + i, profile_link=f"batch_link_{i}", is_active=True, is_superuser=False ) batch_users.append(user) try: await self.database.create_users_batch(batch_users) items_processed = len(batch_users) except Exception as e: print(f"Ошибка при batch создании пользователей: {e}") items_processed = 0 duration = time.time() - start_time return BenchmarkResult( operation="batch_insert_users", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_single_insert_questions(self) -> BenchmarkResult: """Benchmark одиночной вставки вопросов""" start_time = time.time() items_processed = 0 for question in self.test_questions[:100]: # Тестируем на 100 вопросах try: await self.database.create_question(question) items_processed += 1 except Exception as e: print(f"Ошибка при создании вопроса: {e}") duration = time.time() - start_time return BenchmarkResult( operation="single_insert_questions", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_batch_insert_questions(self) -> BenchmarkResult: """Benchmark batch вставки вопросов""" start_time = time.time() # Создаем новые вопросы для batch теста batch_questions = [] for i in range(100): question = Question( from_user_id=8000000 + (i % 10), to_user_id=8000000 + ((i + 1) % 10), message_text=f"Batch question {i}", status=QuestionStatus.PENDING ) batch_questions.append(question) try: await self.database.create_questions_batch(batch_questions) items_processed = len(batch_questions) except Exception as e: print(f"Ошибка при batch создании вопросов: {e}") items_processed = 0 duration = time.time() - start_time return BenchmarkResult( operation="batch_insert_questions", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_offset_pagination(self) -> BenchmarkResult: """Benchmark offset-based пагинации""" start_time = time.time() items_processed = 0 # Тестируем пагинацию с offset for offset in range(0, 1000, 50): # 20 страниц по 50 элементов try: questions = await self.database.get_user_questions( to_user_id=9000000, # Используем первого тестового пользователя limit=50, offset=offset ) items_processed += len(questions) except Exception as e: print(f"Ошибка при offset пагинации: {e}") duration = time.time() - start_time return BenchmarkResult( operation="offset_pagination", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_cursor_pagination(self) -> BenchmarkResult: """Benchmark cursor-based пагинации""" start_time = time.time() items_processed = 0 try: # Тестируем cursor-based пагинацию cursor = None for _ in range(20): # 20 страниц result = await self.pagination_service.paginate_questions( database=self.database, to_user_id=9000000, cursor=cursor, limit=50 ) items_processed += len(result.items) cursor = result.cursor if not result.has_next: break except Exception as e: print(f"Ошибка при cursor пагинации: {e}") duration = time.time() - start_time return BenchmarkResult( operation="cursor_pagination", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_n_plus_one_query(self) -> BenchmarkResult: """Benchmark N+1 запроса (неоптимизированная версия)""" start_time = time.time() items_processed = 0 try: # Получаем вопросы questions = await self.database.get_user_questions( to_user_id=9000000, limit=100 ) # Для каждого вопроса делаем отдельный запрос к БД (N+1 проблема) for question in questions: try: if question.from_user_id: user = await self.database.get_user(question.from_user_id) if user: items_processed += 1 except Exception as e: print(f"Ошибка при получении пользователя: {e}") except Exception as e: print(f"Ошибка при N+1 запросе: {e}") duration = time.time() - start_time return BenchmarkResult( operation="n_plus_one_query", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) async def _benchmark_optimized_join_query(self) -> BenchmarkResult: """Benchmark оптимизированного JOIN запроса""" start_time = time.time() items_processed = 0 try: # Используем оптимизированный запрос с JOIN questions_with_authors = await self.database.get_user_questions_with_authors( user_id=9000000, limit=100 ) items_processed = len(questions_with_authors) except Exception as e: print(f"Ошибка при оптимизированном JOIN запросе: {e}") duration = time.time() - start_time return BenchmarkResult( operation="optimized_join_query", duration=duration, items_processed=items_processed, items_per_second=items_processed / duration if duration > 0 else 0 ) def _generate_report(self) -> Dict[str, Any]: """Генерация отчета по результатам benchmark""" if not self.results: return {"error": "Нет результатов для анализа"} # Группируем результаты по типам операций operation_groups = {} for result in self.results: if result.operation not in operation_groups: operation_groups[result.operation] = [] operation_groups[result.operation].append(result) # Анализируем производительность analysis = {} for operation, results in operation_groups.items(): if results: avg_performance = statistics.mean([r.items_per_second for r in results]) max_performance = max([r.items_per_second for r in results]) min_performance = min([r.items_per_second for r in results]) analysis[operation] = { "avg_items_per_second": round(avg_performance, 2), "max_items_per_second": round(max_performance, 2), "min_items_per_second": round(min_performance, 2), "total_tests": len(results), "error_rate": sum(1 for r in results if r.error_count > 0) / len(results) } # Сравнение производительности comparisons = {} if "single_insert_users" in analysis and "batch_insert_users" in analysis: single_perf = analysis["single_insert_users"]["avg_items_per_second"] batch_perf = analysis["batch_insert_users"]["avg_items_per_second"] comparisons["batch_vs_single_users"] = { "batch_performance": batch_perf, "single_performance": single_perf, "improvement_factor": round(batch_perf / single_perf, 2) if single_perf > 0 else 0 } if "offset_pagination" in analysis and "cursor_pagination" in analysis: offset_perf = analysis["offset_pagination"]["avg_items_per_second"] cursor_perf = analysis["cursor_pagination"]["avg_items_per_second"] comparisons["cursor_vs_offset_pagination"] = { "cursor_performance": cursor_perf, "offset_performance": offset_perf, "improvement_factor": round(cursor_perf / offset_perf, 2) if offset_perf > 0 else 0 } if "n_plus_one_query" in analysis and "optimized_join_query" in analysis: n_plus_one_perf = analysis["n_plus_one_query"]["avg_items_per_second"] join_perf = analysis["optimized_join_query"]["avg_items_per_second"] comparisons["join_vs_n_plus_one"] = { "join_performance": join_perf, "n_plus_one_performance": n_plus_one_perf, "improvement_factor": round(join_perf / n_plus_one_perf, 2) if n_plus_one_perf > 0 else 0 } return { "summary": { "total_benchmarks": len(self.results), "successful_benchmarks": len([r for r in self.results if r.error_count == 0]), "failed_benchmarks": len([r for r in self.results if r.error_count > 0]) }, "performance_analysis": analysis, "performance_comparisons": comparisons, "recommendations": self._generate_recommendations(analysis, comparisons) } def _generate_recommendations(self, analysis: Dict, comparisons: Dict) -> List[str]: """Генерация рекомендаций по оптимизации""" recommendations = [] # Рекомендации по batch операциям if "batch_vs_single_users" in comparisons: improvement = comparisons["batch_vs_single_users"]["improvement_factor"] if improvement > 2: recommendations.append(f"✅ Batch операции показывают улучшение в {improvement}x раз - рекомендуется использовать для массовых вставок") else: recommendations.append("⚠️ Batch операции не показывают значительного улучшения - возможно, стоит пересмотреть реализацию") # Рекомендации по пагинации if "cursor_vs_offset_pagination" in comparisons: improvement = comparisons["cursor_vs_offset_pagination"]["improvement_factor"] if improvement > 1.5: recommendations.append(f"✅ Cursor-based пагинация показывает улучшение в {improvement}x раз - рекомендуется для больших таблиц") else: recommendations.append("⚠️ Cursor-based пагинация не показывает значительного улучшения - возможно, данных недостаточно для демонстрации преимуществ") # Рекомендации по JOIN запросам if "join_vs_n_plus_one" in comparisons: improvement = comparisons["join_vs_n_plus_one"]["improvement_factor"] if improvement > 5: recommendations.append(f"✅ JOIN запросы показывают улучшение в {improvement}x раз - критически важно избегать N+1 проблем") else: recommendations.append("⚠️ JOIN запросы не показывают ожидаемого улучшения - возможно, нужно больше данных для тестирования") # Общие рекомендации if not recommendations: recommendations.append("📊 Недостаточно данных для генерации рекомендаций - увеличьте объем тестовых данных") return recommendations async def run_database_benchmark(): """Запуск benchmark тестов БД""" try: # Инициализация БД database = DatabaseService() await database.init() # Запуск benchmark benchmark = DatabaseBenchmark(database) results = await benchmark.run_all_benchmarks() # Вывод результатов print("\n" + "="*80) print("📊 РЕЗУЛЬТАТЫ BENCHMARK ТЕСТОВ ПРОИЗВОДИТЕЛЬНОСТИ БД") print("="*80) print(f"\n📈 Общая статистика:") print(f" Всего тестов: {results['summary']['total_benchmarks']}") print(f" Успешных: {results['summary']['successful_benchmarks']}") print(f" Неудачных: {results['summary']['failed_benchmarks']}") print(f"\n⚡ Анализ производительности:") for operation, stats in results['performance_analysis'].items(): print(f" {operation}:") print(f" Средняя производительность: {stats['avg_items_per_second']} items/sec") print(f" Максимальная: {stats['max_items_per_second']} items/sec") print(f" Минимальная: {stats['min_items_per_second']} items/sec") print(f" Ошибок: {stats['error_rate']:.1%}") print(f"\n🔄 Сравнения производительности:") for comparison, stats in results['performance_comparisons'].items(): print(f" {comparison}:") print(f" Улучшение в {stats['improvement_factor']}x раз") print(f"\n💡 Рекомендации:") for recommendation in results['recommendations']: print(f" {recommendation}") print("\n" + "="*80) except Exception as e: print(f"❌ Ошибка при запуске benchmark: {e}") if __name__ == "__main__": asyncio.run(run_database_benchmark())