Files
telegram-helper-bot/database/async_db.py
Andrey 5f6882d348 Implement audio record management features in AsyncBotDB and AudioRepository
- Added methods to delete audio moderation records and retrieve all audio records in async_db.py.
- Enhanced AudioRepository with functionality to delete audio records by file name and retrieve all audio message records.
- Improved logging for audio record operations to enhance monitoring and debugging capabilities.
- Updated related handlers to ensure proper integration of new audio management features.
2025-09-05 01:31:50 +03:00

369 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import aiosqlite
from datetime import datetime
from typing import Optional, List, Dict, Any, Tuple
from database.repository_factory import RepositoryFactory
from database.models import (
User, BlacklistUser, UserMessage, TelegramPost, PostContent,
Admin, AudioMessage
)
class AsyncBotDB:
"""Новый асинхронный класс для работы с базой данных с использованием репозиториев."""
def __init__(self, db_path: str):
self.factory = RepositoryFactory(db_path)
self.logger = self.factory.users.logger
async def create_tables(self):
"""Создание всех таблиц в базе данных."""
await self.factory.create_all_tables()
self.logger.info("Все таблицы успешно созданы")
# Методы для работы с пользователями
async def user_exists(self, user_id: int) -> bool:
"""Проверяет, существует ли пользователь в базе данных."""
return await self.factory.users.user_exists(user_id)
async def add_user(self, user: User):
"""Добавление нового пользователя."""
await self.factory.users.add_user(user)
async def get_user_info(self, user_id: int) -> Optional[Dict[str, Any]]:
"""Получение информации о пользователе."""
user = await self.factory.users.get_user_info(user_id)
if user:
return {
'username': user.username,
'full_name': user.full_name,
'has_stickers': user.has_stickers,
'emoji': user.emoji
}
return None
async def get_username(self, user_id: int) -> Optional[str]:
"""Возвращает username пользователя."""
return await self.factory.users.get_username(user_id)
async def get_user_id_by_username(self, username: str) -> Optional[int]:
"""Возвращает user_id пользователя по username."""
return await self.factory.users.get_user_id_by_username(username)
async def get_full_name_by_id(self, user_id: int) -> Optional[str]:
"""Возвращает full_name пользователя."""
return await self.factory.users.get_full_name_by_id(user_id)
async def get_username_and_full_name(self, user_id: int) -> tuple[Optional[str], Optional[str]]:
"""Возвращает username и full_name пользователя."""
username = await self.get_username(user_id)
full_name = await self.get_full_name_by_id(user_id)
return username, full_name
async def get_user_by_id(self, user_id: int) -> Optional[User]:
"""Получение пользователя по ID."""
return await self.factory.users.get_user_by_id(user_id)
async def get_user_first_name(self, user_id: int) -> Optional[str]:
"""Возвращает first_name пользователя."""
return await self.factory.users.get_user_first_name(user_id)
async def get_all_user_id(self) -> List[int]:
"""Возвращает список всех user_id."""
return await self.factory.users.get_all_user_ids()
async def get_last_users(self, limit: int = 30) -> List[tuple]:
"""Получение последних пользователей."""
return await self.factory.users.get_last_users(limit)
async def update_user_date(self, user_id: int):
"""Обновление даты последнего изменения пользователя."""
await self.factory.users.update_user_date(user_id)
async def update_user_info(self, user_id: int, username: str = None, full_name: str = None):
"""Обновление информации о пользователе."""
await self.factory.users.update_user_info(user_id, username, full_name)
async def update_user_emoji(self, user_id: int, emoji: str):
"""Обновление эмодзи пользователя."""
await self.factory.users.update_user_emoji(user_id, emoji)
async def update_stickers_info(self, user_id: int):
"""Обновление информации о стикерах."""
await self.factory.users.update_stickers_info(user_id)
async def get_stickers_info(self, user_id: int) -> bool:
"""Получение информации о стикерах."""
return await self.factory.users.get_stickers_info(user_id)
async def check_emoji_exists(self, emoji: str) -> bool:
"""Проверка существования эмодзи."""
return await self.factory.users.check_emoji_exists(emoji)
async def get_user_emoji(self, user_id: int) -> str:
"""Получает эмодзи пользователя."""
return await self.factory.users.get_user_emoji(user_id)
async def check_emoji_for_user(self, user_id: int) -> str:
"""Проверяет, есть ли уже у пользователя назначенный emoji."""
return await self.factory.users.check_emoji_for_user(user_id)
# Методы для работы с сообщениями
async def add_message(self, message_text: str, user_id: int, message_id: int, date: int = None):
"""Добавление сообщения пользователя."""
if date is None:
from datetime import datetime
date = int(datetime.now().timestamp())
message = UserMessage(
message_text=message_text,
user_id=user_id,
telegram_message_id=message_id,
date=date
)
await self.factory.messages.add_message(message)
async def get_user_by_message_id(self, message_id: int) -> Optional[int]:
"""Получение пользователя по message_id."""
return await self.factory.messages.get_user_by_message_id(message_id)
# Методы для работы с постами
async def add_post(self, post: TelegramPost):
"""Добавление поста."""
await self.factory.posts.add_post(post)
async def update_helper_message(self, message_id: int, helper_message_id: int):
"""Обновление helper сообщения."""
await self.factory.posts.update_helper_message(message_id, helper_message_id)
async def add_post_content(self, post_id: int, message_id: int, content_name: str, content_type: str):
"""Добавление контента поста."""
return await self.factory.posts.add_post_content(post_id, message_id, content_name, content_type)
async def get_post_content_from_telegram_by_last_id(self, last_post_id: int) -> List[Tuple[str, str]]:
"""Получает контент поста по helper_text_message_id."""
return await self.factory.posts.get_post_content_by_helper_id(last_post_id)
async def get_post_text_from_telegram_by_last_id(self, last_post_id: int) -> Optional[str]:
"""Получает текст поста по helper_text_message_id."""
return await self.factory.posts.get_post_text_by_helper_id(last_post_id)
async def get_post_ids_from_telegram_by_last_id(self, last_post_id: int) -> List[int]:
"""Получает ID сообщений по helper_text_message_id."""
return await self.factory.posts.get_post_ids_by_helper_id(last_post_id)
async def get_author_id_by_message_id(self, message_id: int) -> Optional[int]:
"""Получает ID автора по message_id."""
return await self.factory.posts.get_author_id_by_message_id(message_id)
async def get_author_id_by_helper_message_id(self, helper_text_message_id: int) -> Optional[int]:
"""Получает ID автора по helper_text_message_id."""
return await self.factory.posts.get_author_id_by_helper_message_id(helper_text_message_id)
# Методы для работы с черным списком
async def set_user_blacklist(self, user_id: int, user_name: str = None,
message_for_user: str = None, date_to_unban: int = None):
"""Добавляет пользователя в черный список."""
blacklist_user = BlacklistUser(
user_id=user_id,
message_for_user=message_for_user,
date_to_unban=date_to_unban
)
await self.factory.blacklist.add_user(blacklist_user)
async def delete_user_blacklist(self, user_id: int) -> bool:
"""Удаляет пользователя из черного списка."""
return await self.factory.blacklist.remove_user(user_id)
async def check_user_in_blacklist(self, user_id: int) -> bool:
"""Проверяет, существует ли запись с данным user_id в blacklist."""
return await self.factory.blacklist.user_exists(user_id)
async def get_blacklist_users(self, offset: int = 0, limit: int = 10) -> List[tuple]:
"""Получение пользователей из черного списка."""
users = await self.factory.blacklist.get_all_users(offset, limit)
return [(user.user_id, user.message_for_user, user.date_to_unban) for user in users]
async def get_banned_users_from_db(self) -> List[tuple]:
"""Возвращает список пользователей в черном списке."""
users = await self.factory.blacklist.get_all_users_no_limit()
return [(user.user_id, user.message_for_user, user.date_to_unban) for user in users]
async def get_banned_users_from_db_with_limits(self, offset: int, limit: int) -> List[tuple]:
"""Возвращает список пользователей в черном списке с учетом смещения и ограничения."""
users = await self.factory.blacklist.get_all_users(offset, limit)
return [(user.user_id, user.message_for_user, user.date_to_unban) for user in users]
async def get_blacklist_users_by_id(self, user_id: int) -> Optional[tuple]:
"""Возвращает информацию о пользователе в черном списке по user_id."""
user = await self.factory.blacklist.get_user(user_id)
if user:
return (user.user_id, user.message_for_user, user.date_to_unban)
return None
async def get_blacklist_count(self) -> int:
"""Получение количества пользователей в черном списке."""
return await self.factory.blacklist.get_count()
async def get_users_for_unblock_today(self, current_timestamp: int) -> Dict[int, int]:
"""Возвращает список пользователей, у которых истек срок блокировки."""
return await self.factory.blacklist.get_users_for_unblock_today(current_timestamp)
# Методы для работы с администраторами
async def add_admin(self, user_id: int, role: str = "admin"):
"""Добавление администратора."""
admin = Admin(user_id=user_id, role=role)
await self.factory.admins.add_admin(admin)
async def remove_admin(self, user_id: int):
"""Удаление администратора."""
await self.factory.admins.remove_admin(user_id)
async def is_admin(self, user_id: int) -> bool:
"""Проверка, является ли пользователь администратором."""
return await self.factory.admins.is_admin(user_id)
async def get_all_admins(self) -> list[Admin]:
"""Получение всех администраторов."""
return await self.factory.admins.get_all_admins()
# Методы для работы с аудио
async def add_audio_record(self, file_name: str, author_id: int, date_added: str,
listen_count: int, file_id: str):
"""Добавляет информацию о войсе пользователя."""
audio = AudioMessage(
file_name=file_name,
author_id=author_id,
date_added=date_added,
listen_count=listen_count,
file_id=file_id
)
await self.factory.audio.add_audio_record(audio)
async def add_audio_record_simple(self, file_name: str, user_id: int, date_added) -> None:
"""Добавляет простую запись об аудио файле."""
await self.factory.audio.add_audio_record_simple(file_name, user_id, date_added)
async def last_date_audio(self) -> Optional[str]:
"""Получает дату последнего войса."""
return await self.factory.audio.get_last_date_audio()
async def get_last_user_audio_record(self, user_id: int) -> bool:
"""Получает данные о количестве записей пользователя."""
count = await self.factory.audio.get_user_audio_records_count(user_id)
return bool(count)
async def get_path_for_audio_record(self, user_id: int) -> Optional[str]:
"""Получает данные о названии файла."""
return await self.factory.audio.get_path_for_audio_record(user_id)
async def check_listen_audio(self, user_id: int) -> List[str]:
"""Проверяет прослушано ли аудио пользователем."""
return await self.factory.audio.check_listen_audio(user_id)
async def mark_listened_audio(self, file_name: str, user_id: int):
"""Отмечает аудио прослушанным для конкретного пользователя."""
await self.factory.audio.mark_listened_audio(file_name, user_id)
async def get_id_for_audio_record(self, user_id: int) -> int:
"""Получает следующий номер аудио сообщения пользователя."""
return await self.factory.audio.get_user_audio_records_count(user_id)
async def get_user_audio_records_count(self, user_id: int) -> int:
"""Получает количество аудио записей пользователя."""
return await self.factory.audio.get_user_audio_records_count(user_id)
async def refresh_listen_audio(self, user_id: int):
"""Очищает всю информацию о прослушанных аудио пользователем."""
await self.factory.audio.refresh_listen_audio(user_id)
async def delete_listen_count_for_user(self, user_id: int):
"""Удаляет данные о прослушанных пользователем аудио."""
await self.factory.audio.delete_listen_count_for_user(user_id)
async def get_user_id_by_file_name(self, file_name: str) -> Optional[int]:
"""Получает user_id пользователя по имени файла."""
return await self.factory.audio.get_user_id_by_file_name(file_name)
async def get_date_by_file_name(self, file_name: str) -> Optional[str]:
"""Получает дату добавления файла."""
return await self.factory.audio.get_date_by_file_name(file_name)
# Методы для voice bot
async def set_user_id_and_message_id_for_voice_bot(self, message_id: int, user_id: int) -> bool:
"""Устанавливает связь между message_id и user_id для voice bot."""
return await self.factory.audio.set_user_id_and_message_id_for_voice_bot(message_id, user_id)
async def get_user_id_by_message_id_for_voice_bot(self, message_id: int) -> Optional[int]:
"""Получает user_id пользователя по message_id для voice bot."""
return await self.factory.audio.get_user_id_by_message_id_for_voice_bot(message_id)
async def delete_audio_moderate_record(self, message_id: int) -> None:
"""Удаляет запись из таблицы audio_moderate по message_id."""
await self.factory.audio.delete_audio_moderate_record(message_id)
async def get_all_audio_records(self) -> List[Dict[str, Any]]:
"""Получить все записи аудио сообщений."""
return await self.factory.audio.get_all_audio_records()
async def delete_audio_record_by_file_name(self, file_name: str) -> None:
"""Удалить запись аудио сообщения по имени файла."""
await self.factory.audio.delete_audio_record_by_file_name(file_name)
# Методы для миграций
async def get_migration_version(self) -> int:
"""Получение текущей версии миграции."""
return await self.factory.migrations.get_migration_version()
async def get_current_version(self) -> Optional[int]:
"""Возвращает текущую последнюю версию миграции."""
return await self.factory.migrations.get_current_version()
async def update_version(self, new_version: int, script_name: str):
"""Обновляет версию миграций в таблице migrations."""
await self.factory.migrations.update_version(new_version, script_name)
async def create_table(self, sql_script: str):
"""Создает таблицу в базе. Используется в миграциях."""
await self.factory.migrations.create_table(sql_script)
async def update_migration_version(self, version: int, script_name: str):
"""Обновление версии миграции."""
await self.factory.migrations.update_version(version, script_name)
# Методы для voice bot welcome tracking
async def check_voice_bot_welcome_received(self, user_id: int) -> bool:
"""Проверяет, получал ли пользователь приветственное сообщение от voice_bot."""
return await self.factory.users.check_voice_bot_welcome_received(user_id)
async def mark_voice_bot_welcome_received(self, user_id: int) -> bool:
"""Отмечает, что пользователь получил приветственное сообщение от voice_bot."""
return await self.factory.users.mark_voice_bot_welcome_received(user_id)
# Методы для проверки целостности
async def check_database_integrity(self):
"""Проверяет целостность базы данных и очищает WAL файлы."""
await self.factory.check_database_integrity()
async def cleanup_wal_files(self):
"""Очищает WAL файлы и переключает на DELETE режим для предотвращения проблем с I/O."""
await self.factory.cleanup_wal_files()
async def close(self):
"""Закрытие соединений."""
# Соединения закрываются в каждом методе
pass
async def fetch_one(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
"""Выполняет SQL запрос и возвращает один результат."""
try:
async with aiosqlite.connect(self.factory.db_path) as conn:
async with conn.execute(query, params) as cursor:
row = await cursor.fetchone()
if row:
columns = [description[0] for description in cursor.description]
return dict(zip(columns, row))
return None
except Exception as e:
self.logger.error(f"Error executing query: {e}")
return None