Files
telegram-helper-bot/database/db.py
Andrey 2d40f4496e Update voice bot functionality and clean up project structure
- Added voice message handling capabilities, including saving and deleting audio messages via callback queries.
- Refactored audio record management in the database to remove unnecessary fields and streamline operations.
- Introduced new keyboard options for voice interactions in the bot.
- Updated `.gitignore` to include voice user files for better project organization.
- Removed obsolete voice bot handler files to simplify the codebase.
2025-09-01 19:17:05 +03:00

1495 lines
74 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sqlite3
import asyncio
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor
from logs.custom_logger import logger
# Local imports - metrics
from helper_bot.utils.metrics import (
metrics,
track_time,
track_errors,
db_query_time
)
class BotDB:
def __init__(self, current_dir, name):
print(f"DEBUG BotDB: current_dir={current_dir}, name={name}")
# Формируем правильный путь к базе данных
if name.startswith('database/'):
# Если имя уже содержит database/, то используем его как есть
self.db_file = os.path.join(current_dir, name)
else:
# Если имя не содержит database/, то добавляем его
self.db_file = os.path.join(current_dir, 'database', name)
print(f"DEBUG BotDB: db_file={self.db_file}")
self.conn = None
self.cursor = None
self.logger = logger
self.logger.info(f'Инициация базы данных: {self.db_file}')
# Создаем пул потоков для асинхронных операций
self.executor = ThreadPoolExecutor(max_workers=4)
def connect(self):
"""Создание соединения и курсора."""
try:
# Проверяем существование файла базы данных
if not os.path.exists(self.db_file):
self.logger.error(f"Файл базы данных не найден: {self.db_file}")
raise FileNotFoundError(f"Файл базы данных не найден: {self.db_file}")
# Проверяем права доступа к файлу
if not os.access(self.db_file, os.R_OK | os.W_OK):
self.logger.error(f"Нет прав доступа к файлу базы данных: {self.db_file}")
raise PermissionError(f"Нет прав доступа к файлу базы данных: {self.db_file}")
# Добавляем таймаут для предотвращения зависаний
self.conn = sqlite3.connect(self.db_file, timeout=10.0)
# Включаем WAL режим для лучшей производительности
self.conn.execute("PRAGMA journal_mode=WAL")
self.conn.execute("PRAGMA synchronous=NORMAL")
self.conn.execute("PRAGMA cache_size=10000")
self.conn.execute("PRAGMA temp_store=MEMORY")
self.cursor = self.conn.cursor()
self.logger.info(f"Успешное подключение к базе данных: {self.db_file}")
except sqlite3.Error as e:
self.logger.error(f"Ошибка SQLite при подключении к базе данных: {e}")
raise
except (FileNotFoundError, PermissionError) as e:
self.logger.error(f"Ошибка файловой системы при подключении к базе данных: {e}")
raise
except Exception as e:
self.logger.error(f"Неожиданная ошибка при подключении к базе данных: {e}")
raise
def create_table(self, sql_script):
"""
Создает таблицу в базе. Используется в миграциях
Args:
sql_script: DDL скрипт таблицы
Returns:
None
"""
try:
self.connect()
self.cursor.execute(sql_script)
self.conn.commit()
self.logger.info(f'Таблица создана: {sql_script}')
except Exception as e:
self.logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}')
raise
finally:
self.close()
def get_current_version(self):
"""
Возвращает текущую последнюю версию миграции
Args:
None
Returns:
int: Версия последней миграции.
"""
self.logger.info(f'Попытка получения версии миграции')
try:
self.connect()
self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
version = self.cursor.fetchone()[0]
self.logger.info(f'Получена текущая версия миграции: {version}')
return version
except Exception as e:
self.logger.error(f'Ошибка при получении текущей версии миграции: {str(e)}')
raise
finally:
self.close()
def update_version(self, new_version: int, script_name: str):
"""
Обновляет версию миграций в таблице migrations.
Добавляет новую запись в таблицу migrations с указанной версией,
именем скрипта и текущей датой и временем.
Args:
new_version (int): Новая версия миграции
script_name (str): Имя скрипта миграции
Returns:
None
Raises:
sqlite3. IntegrityError: Если возникает ошибка целостности при вставке
данных в таблицу migrations.
Exception: Если возникает любая другая ошибка при обновлении версии.
"""
self.logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}')
try:
self.connect()
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
self.cursor.execute(
"INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
(new_version, script_name, today),
)
self.conn.commit()
self.logger.info(f"Версия обновлена: {new_version}, название скрипта: {script_name}")
except sqlite3.IntegrityError as e:
self.logger.error(f"Ошибка при обновлении версии: {e}")
raise
except Exception as e:
self.logger.error(f"Ошибка при обновлении версии: {e}")
raise
finally:
self.close()
@track_time("add_new_user_in_db", "database")
@track_errors("database", "add_new_user_in_db")
@db_query_time("add_new_user_in_db", "our_users", "insert")
def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool,
language_code: str, emoji: str, date_added: str, date_changed: str):
"""
Добавляет нового пользователя в базу данных.
Args:
user_id (int): Идентификатор пользователя в Telegram.
first_name (str): Имя пользователя.
full_name (str): Полное имя пользователя.
username (str): Username пользователя в Telegram.
is_bot (bool): Флаг, указывающий, является ли пользователь ботом.
language_code (str): Код языка пользователя.
emoji (str): Эмодзи закрепленная за пользователем
date_added (str): Дата добавления пользователя в базу.
date_changed (str): Дата последнего изменения данных пользователя.
Returns:
None: Если запись успешно добавлена в базу.
Exception: Если произошла ошибка при добавлении записи.
"""
self.logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}")
try:
self.connect()
try:
# Новая схема с колонкой emoji
self.cursor.execute(
"INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'emoji', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name, username, is_bot, language_code, emoji, date_added, date_changed)
)
except sqlite3.OperationalError as e:
# Обратная совместимость: старая схема без колонки emoji
if 'no column named emoji' in str(e):
self.cursor.execute(
"INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed)
)
else:
raise
self.conn.commit()
self.logger.info(
f"Новый пользователь добавлен в базу: user_id={user_id}, first_name={first_name}, emoji={emoji}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при добавлении пользователя в базу: {error}. "
f"Данные пользователя: user_id={user_id}, first_name={first_name}")
raise
finally:
self.close()
@track_time("user_exists", "database")
@track_errors("database", "user_exists")
@db_query_time("user_exists", "our_users", "select")
def user_exists(self, user_id: int):
"""
Проверяет, существует ли пользователь в базе данных.
Args:
user_id (int): Идентификатор пользователя.
Returns:
bool: True, если пользователь найден, False - иначе.
"""
self.logger.info(f"Попытка проверки существования пользователя: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchall()
self.logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}")
return bool(len(result))
except sqlite3.Error as error:
self.logger.error(f"Ошибка при проверке существования пользователя: {error}")
raise
finally:
self.close()
def get_user_id(self, user_id: int):
"""
@deprecated
Возвращает ID пользователя в базе данных по его user_id.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
int: ID пользователя в базе данных.
None: Если пользователь не найден.
"""
self.logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
user_id_db = result[0]
self.logger.info(f"ID пользователя в базе найден: user_id={user_id}, id_db={user_id_db}")
return user_id_db
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}")
raise
finally:
self.close()
def get_username(self, user_id: int):
"""
Возвращает username пользователя из базы данных по его user_id в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
str: Username пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
username = result[0]
self.logger.info(f"Username пользователя найден: user_id={user_id}, username={username}")
return username
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_user_id_by_username(self, username: str):
"""
Возвращает user_id пользователя из базы данных по его user_name в Telegram.
Args:
username (str): Username пользователя.
Returns:
user_id (int): Идентификатор пользователя в Telegram.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT user_id FROM our_users WHERE username = ?", (username,))
result = self.cursor.fetchone()
if result:
user_id = result[0]
self.logger.info(f"User_id пользователя найден: username={username}, user_id={user_id}")
return user_id
else:
self.logger.info(f"Пользователь с username={username} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_full_name_by_id(self, user_id: str):
"""
Возвращает full_name пользователя из базы данных по его username в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
full_name (str): Username пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
full_name = result[0]
self.logger.info(f"Username пользователя найден: user_id={user_id}, full_name={full_name}")
return full_name
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_user_info_by_id(self, user_id: int):
"""
Возвращает информацию о пользователе из базы данных по его user_id.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
dict: Словарь с информацией о пользователе (username, full_name).
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT username, full_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
user_info = {
'username': result[0],
'full_name': result[1]
}
self.logger.info(f"Информация о пользователе найдена: user_id={user_id}, username={user_info['username']}, full_name={user_info['full_name']}")
return user_info
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении информации о пользователе из базы данных: {error}")
raise
finally:
self.close()
def get_all_user_id(self):
"""
Возвращает список всех user_id из базы данных.
Returns:
list: Список user_id.
[]: Если в базе данных нет пользователей.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Попытка получения всех user_id")
try:
self.connect()
self.cursor.execute("SELECT user_id FROM our_users")
fetch_all = self.cursor.fetchall()
list_of_users = [user_id[0] for user_id in fetch_all]
self.logger.info(f"Получен список всех user_id: {list_of_users}")
return list_of_users
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении списка user_id из базы данных: {error}")
raise
finally:
self.close()
def get_user_first_name(self, user_id: int):
"""
Возвращает имя пользователя из базы данных по его user_id в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
str: Имя пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Попытка получения имени пользователя по user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
first_name = result[0]
self.logger.info(f"Имя пользователя найдено: user_id={user_id}, first_name={first_name}")
return first_name
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}")
raise
finally:
self.close()
@track_time("get_info_about_stickers", "database")
@track_errors("database", "get_info_about_stickers")
@db_query_time("get_info_about_stickers", "our_users", "select")
def get_info_about_stickers(self, user_id: int):
"""
Проверяет, получил ли пользователь стикеры.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
bool: True, если пользователь получил стикеры, False - иначе.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.")
try:
self.connect()
self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
has_stickers = result[0] == 1
self.logger.info(
f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}")
return has_stickers
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении информации о получении стикеров: {error}")
raise
finally:
self.close()
@track_time("update_info_about_stickers", "database")
@track_errors("database", "update_info_about_stickers")
@db_query_time("update_info_about_stickers", "our_users", "update")
def update_info_about_stickers(self, user_id):
"""
Обновляет информацию о получении стикеров пользователем.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
None: Если обновление успешно выполнено.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,))
self.conn.commit()
self.logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}")
raise
finally:
self.close()
def get_users_for_unblock_today(self, date_to_unban: str):
"""
Возвращает список пользователей, у которых истекает срок блокировки сегодня.
Args:
date_to_unban (str): Дата разблокировки.
Returns:
dict: Словарь, где ключ - user_id, значение - username.
{}: Если сегодня нет пользователей, у которых истекает срок блокировки.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id, user_name "
"FROM blacklist WHERE date_to_unban = ?", (date_to_unban,))
fetch_all = result.fetchall()
list_of_users = {user_id: username for user_id, username in fetch_all}
self.logger.info(f"Получен список пользователей для разблокировки сегодня: {list_of_users}")
return list_of_users
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}")
raise
finally:
self.close()
def get_blacklist_users_by_id(self, user_id: int):
"""
Возвращает информацию о пользователе в черном списке по user_id.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
tuple: Кортеж (user_id, user_name, message_for_user, date_to_unban).
None: Если пользователь не найден в черном списке.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban "
"FROM blacklist WHERE user_id = ?", (user_id,))
return self.cursor.fetchone()
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}")
raise
finally:
self.close()
def check_user_in_blacklist(self, user_id: int):
"""
Проверяет, существует ли запись с данным user_id в blacklist.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
bool: True, если пользователь найден в черном списке, False - иначе.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
self.logger.info(f"Существует ли пользователь: user_id={user_id} Итог: {result}")
return bool(result)
except sqlite3.Error as error:
self.logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}")
raise
finally:
self.close()
def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None):
"""
Добавляет пользователя в черный список.
Args:
user_id (int): Идентификатор пользователя в Telegram.
user_name (str, optional): Username пользователя. Defaults to None.
message_for_user (str, optional): Сообщение для пользователя. Defaults to None.
date_to_unban (datetime, optional): Дата разблокировки. Defaults to None.
Returns:
None: Если добавление в черный список успешно выполнено.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name},"
f" message_for_user={message_for_user}, date_to_unban={date_to_unban}")
try:
self.connect()
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban,))
self.conn.commit()
self.logger.info(f"Пользователь добавлен в черный список: user_id={user_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при добавлении пользователя в черный список: {error}")
return error
finally:
self.close()
def delete_user_blacklist(self, user_id: int):
"""
Удаляет пользователя из черного списка.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
bool: True, если удаление прошло успешно, False - в случае ошибки.
Raises:
None: Ошибки обрабатываются в блоке except, возвращая False.
"""
self.logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}")
try:
self.connect()
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit()
self.logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.")
return True
except sqlite3.Error as error:
self.logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} "
f"из таблицы blacklist. Ошибка: {str(error)}")
return False
finally:
self.close()
@track_time("add_new_message_in_db", "database")
@track_errors("database", "add_new_message_in_db")
@db_query_time("add_new_message_in_db", "user_messages", "insert")
def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str):
"""
Добавляет новое сообщение пользователя в базу данных.
Args:
message_text (str): Текст сообщения.
user_id (int): Идентификатор пользователя в Telegram.
message_id (int): Идентификатор сообщения в Telegram.
date (str): Дата отправки сообщения.
Returns:
None: Если добавление прошло успешно.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(
f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}")
try:
self.connect()
self.cursor.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)",
(message_text, user_id, message_id, date))
self.conn.commit()
self.logger.info(f"Новое сообщение добавлено в базу данных: message_id={message_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
raise
finally:
self.close()
@track_time("get_username_and_full_name", "database")
@track_errors("database", "get_username_and_full_name")
@db_query_time("get_username_and_full_name", "our_users", "select")
def get_username_and_full_name(self, user_id: int):
"""
Получает full_name и username пользователя по ID из базы
Args:
date (str): Новая дата изменения.
user_id (int): Идентификатор пользователя в Telegram.
Returns:
username (str): username пользователя
full_name (str): full_name пользователя
"""
self.logger.info(
f"Запуск функции check_username_and_first_name: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
username = self.cursor.fetchone()[0]
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
full_name = self.cursor.fetchone()[0]
self.logger.info(
f"Функция check_username_and_first_name успешно отработала: user_id={user_id}, username={username}, full_name={full_name}")
return username, full_name
except sqlite3.Error as error:
self.logger.error(f"Ошибка в функции get_username_and_first_name: {error}")
return None
finally:
self.close()
@track_time("update_username_and_full_name", "database")
@track_errors("database", "update_username_and_full_name")
@db_query_time("update_username_and_full_name", "our_users", "update")
def update_username_and_full_name(self, user_id: int, username: str, full_name: str):
"""
Обновляет full_name и username пользователя
Args:
username (str): username пользователя
full_name (str): full_name пользователя
user_id (int): Идентификатор пользователя в Telegram
Returns:
True (bool): Если обновления прошли успешно
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(
f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
(username, full_name, user_id,))
self.conn.commit()
self.logger.info(
f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены")
return True
except sqlite3.Error as error:
self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}")
raise
finally:
self.close()
@track_time("update_date_for_user", "database")
@track_errors("database", "update_date_for_user")
@db_query_time("update_date_for_user", "our_users", "update")
def update_date_for_user(self, date: str, user_id: int):
"""
#TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users
Обновляет дату последнего изменения данных пользователя в базе
Args:
date (str): Новая дата изменения
user_id (int): Идентификатор пользователя в Telegram
Returns:
None: Если обновление прошло успешно
sqlite3. Error: Если произошла ошибка при выполнении запроса
"""
self.logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?",
(date, user_id,))
self.conn.commit()
self.logger.info(f"Дата изменения обновлена для пользователя: user_id={user_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка обновления даты изменения для пользователя: {error}")
return error
finally:
self.close()
@track_time("check_emoji", "database")
@track_errors("database", "check_emoji")
@db_query_time("check_emoji", "our_users", "select")
def check_emoji(self, emoji: str):
"""
Проверяет, есть ли уже такой emoji в таблице.
Args:
emoji: emoji для проверки.
Returns:
True, если эмодзи уже есть, иначе False.
Raises:
None: В случае ошибки возвращается None
"""
self.logger.info(f"Запуск функции check_emoji: emoji={emoji}")
try:
self.connect()
self.cursor.execute("SELECT 1 FROM our_users WHERE emoji = ?", (emoji,))
result = self.cursor.fetchone()
return bool(result)
except sqlite3.Error as error:
self.logger.error(f"Ошибка проверки эмодзи в базе: {error}")
return None
finally:
self.close()
@track_time("update_emoji_for_user", "database")
@track_errors("database", "update_emoji_for_user")
@db_query_time("update_emoji_for_user", "our_users", "update")
def update_emoji_for_user(self, user_id: int, emoji: str):
"""
Обновляет эмодзи для пользователя в базе если его ранее не было установлено
Args:
user_id (int): Идентификатор пользователя в Telegram
emoji (str): Эмодзи пользователя
Returns:
None: Если обновление прошло успешно
sqlite3. Error: Если произошла ошибка при выполнении запроса
"""
self.logger.info(f"Запуск функции update_emoji_for_user: user_id={user_id}, emoji={emoji}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET emoji = ? WHERE user_id = ?",
(emoji, user_id,))
self.conn.commit()
self.logger.info(f"Эмоджи обновлен для пользователя: user_id={user_id}")
except sqlite3.Error as error:
self.logger.error(f"Ошибка обновления эмодзи для пользователя: {error}")
return error
finally:
self.close()
@track_time("check_emoji_for_user", "database")
@track_errors("database", "check_emoji_for_user")
@db_query_time("check_emoji_for_user", "our_users", "select")
def check_emoji_for_user(self, user_id: int):
"""
Проверяет, есть ли уже у пользователя назначенный emoji.
Args:
user_id: user_id пользователя.
Returns:
True, если эмодзи такого нет, иначе False.
Raises:
error: В случае ошибки возвращается error
"""
self.logger.info(f"Запуск функции check_emoji_for_user: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT emoji FROM our_users WHERE user_id = ?", (user_id,))
pre_result = self.cursor.fetchone()
# Возвращаем "Смайл не определен", если pre_result или pre_result[0] is None
result = pre_result[0] if pre_result else None
return str(result) if result is not None else "Смайл еще не определен"
except sqlite3.Error as error:
self.logger.error(f"Ошибка проверки эмодзи в базе: {error}")
return error
finally:
self.close()
def refresh_listen_audio(self, user_id: int):
"""
Очищает всю информацию о прослушанных аудио пользователем
Args:
user_id: user_id пользователя.
Returns:
None - если все очищено успешно
Raises:
error: В случае ошибки возвращается error
"""
self.logger.info(f"Запуск функции check_emoji_for_user: user_id={user_id}")
try:
self.connect()
self.cursor.execute("DELETE FROM listen_audio_users WHERE user_id = ?", (user_id,))
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка проверки эмодзи в базе: {error}")
return error
finally:
self.close()
def is_admin(self, user_id: int):
"""
Проверяет, является ли пользователь администратором.
Args:
user_id: ID пользователя Telegram.
Returns:
True, если пользователь администратор, иначе False.
Raises:
None: В случае ошибки возвращается None
"""
self.logger.info(f"Запуск функции is_admin: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
except sqlite3.Error as error:
self.logger.error(f"Ошибка проверки прав пользователя админа: {error}")
return None
finally:
self.close()
def add_admin(self, user_id: int, role: str):
"""
Добавляет пользователя в список администраторов.
Args:
user_id (int): ID пользователя Telegram.
role (str): Роль пользователя. Доступные варианты:
1. creator - создатель
2. admin - обычная роль
Returns:
None: Если добавление прошло успешно.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}")
try:
self.connect()
self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
self.conn.commit()
self.logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка добавления пользователя в список администраторов: {error}")
return error
finally:
self.close()
def remove_admin(self, user_id: int):
"""
Удаляет пользователя из списка администраторов.
Args:
user_id (int): ID пользователя Telegram.
Returns:
None: Если удаление прошло успешно.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции remove_admin: user_id={user_id}")
try:
self.connect()
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
self.conn.commit()
self.logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}")
return error
finally:
self.connect()
def get_user_by_message_id(self, message_id: int):
"""
#TODO: Возвращается TypeError вместо None
Возвращает идентификатор пользователя по идентификатору сообщения.
Args:
message_id (int): Идентификатор сообщения в Telegram.
Returns:
int: Идентификатор пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,))
user = result.fetchone()[0]
self.logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}")
return user
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения user_id по message_id: {error}")
raise
finally:
self.close()
def get_last_users_from_db(self):
"""
Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот.
Returns:
list: Список кортежей (full_name, user_id) последних 30 пользователей.
[]: Если в базе данных нет пользователей.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info("Запуск функции get_last_users_from_db")
try:
self.connect()
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30")
users = result.fetchall()
self.logger.info(f"Получен список последних 30 пользователей: {users}")
return users
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения списка последних пользователей: {error}")
raise
finally:
self.close()
def get_banned_users_from_db(self):
"""
Возвращает список идентификаторов пользователей в черном списке бота.
Returns:
list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
[]: Если в черном списке нет пользователей.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info("Запуск функции get_banned_users_from_db")
try:
self.connect()
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist")
users = result.fetchall()
self.logger.info(f"Получен список пользователей в черном списке: {users}")
return users
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
raise
finally:
self.close()
def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
"""
Возвращает список идентификаторов пользователей в черном списке бота с учетом смещения и ограничения.
Args:
offset (int): Смещение для выборки
limit (int): Ограничение количества возвращаемых записей.
Returns:
list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
[]: Если в черном списке нет пользователей или количество записей с учетом смещения и ограничения равно 0.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}")
try:
self.connect()
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban "
"FROM blacklist LIMIT ?, ?", (offset, limit,))
users = result.fetchall()
self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {users}")
return users
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
raise
finally:
self.close()
def get_post_content_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(
f"Запуск функции get_post_content_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("""
SELECT cpft.content_name, cpft.content_type
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc
ON pft.message_id = mltc.post_id
JOIN content_post_from_telegram cpft
ON cpft.message_id = mltc.message_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,))
post_content = result.fetchall()
self.logger.info(
f"Функция get_post_content_from_telegram_by_last_id получила список контента: {post_content}")
return post_content
finally:
self.close()
def get_post_ids_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(
f"Запуск функции get_post_ids_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("""
SELECT mltc.message_id
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc
ON pft.message_id = mltc.post_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,))
post_ids = result.fetchall()
self.logger.info(f"Функция get_post_ids_from_telegram_by_last_id "
f"получила идентификаторы сообщений: {post_ids}")
return post_ids
except Exception as e:
self.logger.error(f"Ошибка в функции get_post_ids_from_telegram_by_last_id {str(e)}")
return False
finally:
self.close()
def get_post_text_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(f"Запуск функции get_post_text_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("SELECT text "
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(last_post_id,))
text = result.fetchone()[0]
self.logger.info(f"Функция get_post_text_from_telegram_by_last_id получила text")
return text
except Exception as e:
self.logger.error(f"Ошибка в функции get_post_text_from_telegram_by_last_id {str(e)}")
def get_author_id_by_message_id(self, message_id: int):
self.logger.info(f"Запуск функции get_author_id_by_message_id, идентификатор поста {message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM post_from_telegram_suggest WHERE message_id = ?",
(message_id,))
author_id = result.fetchone()[0]
self.logger.info(f"Функция get_author_id_by_message_id получила author_id {author_id}")
return author_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_author_id_by_message_id {str(e)}")
def get_author_id_by_helper_message_id(self, helper_text_message_id: int):
self.logger.info(f"Запуск функции get_author_id_by_helper_message_id, идентификатор поста "
f"{helper_text_message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(helper_text_message_id,))
author_id = result.fetchone()[0]
self.logger.info(f"Функция get_author_id_by_helper_message_id получила author_id {author_id}")
return author_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_author_id_by_helper_message_id {str(e)}")
def get_user_id_by_message_id_for_voice_bot(self, message_id: int):
self.logger.info(f"Запуск функции get_user_id_by_message_id_for_voice_bot, идентификатор поста "
f"{message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id "
"FROM audio_moderate WHERE message_id = ?",
(message_id,))
user_id = result.fetchone()[0]
self.logger.info(f"Функция get_user_id_by_message_id_for_voice_bot получила author_id {user_id}")
return user_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_user_id_by_message_id_for_voice_bot {str(e)}")
def set_user_id_and_message_id_for_voice_bot(self, message_id: int, user_id: int):
self.logger.info(f"Запуск функции set_user_id_and_message_id_for_voice_bot, идентификатор поста "
f"{message_id}, user_id {user_id}")
try:
self.connect()
result = self.cursor.execute(
"INSERT INTO audio_moderate (message_id, user_id)"
"VALUES (?, ?)", (message_id, user_id))
self.conn.commit()
self.logger.info(f"Функция set_user_id_and_message_id_for_voice_bot отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции set_user_id_and_message_id_for_voice_bot {str(e)}")
def get_user_id_by_file_name(self, file_name: str):
self.logger.info(f"Запуск функции get_user_id_by_file_name, идентификатор файла "
f"{file_name}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM audio_message_reference WHERE file_name = ?",
(file_name,))
user_id = result.fetchone()[0]
self.logger.info(f"Функция get_user_id_by_file_name получила user_id {user_id}")
return user_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_user_id_by_file_name {str(e)}")
def get_date_by_file_name(self, file_name: str):
self.logger.info(f"Запуск функции get_date_by_file_name, идентификатор файла "
f"{file_name}")
try:
self.connect()
result = self.cursor.execute("SELECT date_added "
"FROM audio_message_reference WHERE file_name = ?",
(file_name,))
date_added = result.fetchone()[0]
self.logger.info(f"Функция get_date_by_file_name получила date_added {date_added}")
return date_added
except Exception as e:
self.logger.error(f"Ошибка в функции get_date_by_file_name {str(e)}")
def add_post_content_in_db(self, post_id: int, message_id: int, content_name: str, type_content: str):
self.logger.info(
f"Запуск функции add_post_content_in_db: post_id={post_id}, message_id={message_id}, "
f"content_name={content_name}, content_type={type_content}")
try:
self.connect()
self.cursor.execute(
"INSERT INTO message_link_to_content (post_id, message_id)"
"VALUES (?, ?)", (post_id, message_id))
self.conn.commit()
self.cursor.execute(
"INSERT INTO content_post_from_telegram (message_id, content_name, content_type)"
"VALUES (?, ?, ?)", (message_id, content_name, type_content))
self.conn.commit()
self.logger.info(f"Функция add_post_content_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции add_post_content_in_db при добавлении поста в базу данных: {e}")
return False
def add_post_in_db(self, message_id: int, text: str, author_id: int):
self.logger.info(
f"Запуск функции add_post_in_db: message_id={message_id}, "
f"author_id={author_id}")
try:
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
self.connect()
self.cursor.execute(
"INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)"
"VALUES (?, ?, ?, ?)", (message_id, text, author_id, today))
self.conn.commit()
self.logger.info(f"Функция add_post_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции add_post_in_db при добавлении поста в базу данных: {e}")
return False
def update_helper_message_in_db(self, message_id: int, helper_message_id: int):
self.logger.info(
f"Запуск функции update_helper_message_in_db: message_id={message_id}, "
f"helper_message_id={helper_message_id}")
try:
self.connect()
self.cursor.execute(
"UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
(helper_message_id, message_id,))
self.conn.commit()
self.logger.info(f"Функция update_helper_message_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции update_helper_message_in_db при добавлении поста в базу данных: {e}")
return False
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД"""
self.logger.info(
f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id},"
f" date_added = {date_added}")
try:
self.connect()
result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) "
"VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id))
self.conn.commit()
self.logger.info(
f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, "
f"date_added = {date_added}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при добавлении войса в базу: {error}")
raise
finally:
self.close()
def last_date_audio(self):
"""Получаем дату последнего войса"""
self.logger.info(
f"Запуск функции last_date_audio")
try:
self.connect()
result = self.cursor.execute(
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
last_date = result.fetchone()[0]
self.logger.info(f"Последняя дата сообщения {last_date}")
return last_date
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя"""
self.logger.info(
f"Запуск функции get_last_user_audio_record. user_id={user_id}")
try:
self.connect()
r = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,))
result = bool(len(r.fetchall()))
self.logger.info(
f"Результат функции get_last_user_audio_record: {result}")
return result
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def delete_listen_count_for_user(self, user_id):
"""Удаляет данные о прослушанных пользователем аудио"""
self.logger.info(
f"Запуск функции delete_listen_count_for_user. user_id={user_id}")
try:
self.connect()
self.cursor.execute("DELETE FROM `listen_audio_users` WHERE `user_id` = ?",
(user_id,))
self.conn.commit()
self.logger.info(
f"Функция delete_listen_count_for_user успешно отработала")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка удаления записей прослушивания по пользователю: {error}")
raise
finally:
self.close()
def get_id_for_audio_record(self, user_id):
"""Получает следующий номер аудио сообщения пользователя"""
self.logger.info(
f"Запуск функции get_id_for_audio_record. user_id={user_id}")
try:
self.connect()
r = self.cursor.execute(
"SELECT COUNT(*) FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,))
result = r.fetchone()[0]
self.logger.info(
f"Результат функции get_id_for_audio_record: {result}")
return result
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения количества аудио: {error}")
raise
finally:
self.close()
def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла"""
self.logger.info(
f"Запуск функции get_path_for_audio_record. user_id={user_id}")
try:
self.connect()
r = self.cursor.execute(
"SELECT `file_name` "
"FROM `audio_message_reference` "
"WHERE `author_id` = ? "
"ORDER BY date_added "
"DESC LIMIT 1",
(user_id,))
result = r.fetchone()[0]
self.logger.info(
f"Результат функции get_path_for_audio_record: {result}")
return result
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def check_listen_audio(self, user_id):
"""Проверяет прослушано ли аудио пользователем"""
self.logger.info(
f"Запуск функции check_listen_audio. user_id={user_id}")
try:
self.connect()
query_listen_audio = self.cursor.execute(
"""SELECT l.file_name
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ?
AND l.file_name IS NOT NULL""", (user_id,))
check_sign = query_listen_audio.fetchall()
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
(user_id,))
sign_all_audio = query_all_audio.fetchall()
new_sign1 = list(set(sign_all_audio) - set(check_sign))
new_sign = []
for i in new_sign1:
new_sign.append(i[0])
self.logger.info(
f"Функция check_listen_audio успешно отработала.")
return new_sign
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя."""
self.logger.info(
f"Запуск функции mark_listened_audio. file_name={file_name}, user_id={user_id}")
try:
self.connect()
result = self.cursor.execute(
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1))
return self.conn.commit()
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def close(self):
"""Закрытие соединения и курсора."""
try:
if self.cursor:
self.cursor.close()
self.cursor = None
except Exception as e:
self.logger.error(f"Ошибка при закрытии курсора: {e}")
try:
if self.conn:
self.conn.close()
self.conn = None
except Exception as e:
self.logger.error(f"Ошибка при закрытии соединения: {e}")
async def check_user_in_blacklist_async(self, user_id: int):
"""
Асинхронная версия проверки пользователя в черном списке.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, self.check_user_in_blacklist, user_id)
async def get_blacklist_users_by_id_async(self, user_id: int):
"""
Асинхронная версия получения информации о пользователе из черного списка.
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, self.get_blacklist_users_by_id, user_id)
def check_database_integrity(self):
"""
Проверяет целостность базы данных и очищает WAL файлы.
"""
try:
self.connect()
# Проверяем целостность базы данных
self.cursor.execute("PRAGMA integrity_check")
integrity_result = self.cursor.fetchone()
if integrity_result and integrity_result[0] == "ok":
self.logger.info("Проверка целостности базы данных прошла успешно")
# Очищаем WAL файлы
self.cursor.execute("PRAGMA wal_checkpoint(TRUNCATE)")
self.logger.info("WAL файлы очищены")
else:
self.logger.warning(f"Проблемы с целостностью базы данных: {integrity_result}")
except Exception as e:
self.logger.error(f"Ошибка при проверке целостности базы данных: {e}")
raise
finally:
self.close()
def cleanup_wal_files(self):
"""
Очищает WAL файлы и переключает на DELETE режим для предотвращения проблем с I/O.
"""
try:
self.connect()
# Переключаем на DELETE режим для очистки WAL файлов
self.cursor.execute("PRAGMA journal_mode=DELETE")
self.cursor.execute("PRAGMA journal_mode=WAL")
self.logger.info("WAL файлы очищены и режим восстановлен")
except Exception as e:
self.logger.error(f"Ошибка при очистке WAL файлов: {e}")
raise
finally:
self.close()