import sqlite3 import configparser import os from datetime import datetime from loguru import logger from custom_logger import Logger db_logger = Logger(name='db') script_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(script_dir, 'settings.ini') config = configparser.ConfigParser() config.read(config_path) LOGS = config.getboolean('Settings', 'logs') IMPORTANT_LOGS = config.get('Telegram', 'important_logs') class BotDB: def __init__(self): db_file_path = os.path.dirname(os.path.abspath(__file__)) db_file = os.path.join(db_file_path, 'tg-bot-database') self.conn = sqlite3.connect(db_file, check_same_thread=False) self.cursor = self.conn.cursor() logger.info(f'Подключен к базе данных: {db_file_path}') def create_table(self, sql_script): """ Создает таблицу в базе. Используется в миграциях Args: sql_script: DDL скрипт таблицы Returns: None """ try: cursor = self.conn.cursor() cursor.execute(sql_script) logger.info(f'Таблица создана: {sql_script}') except Exception as e: logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}') raise def get_current_version(self): """ Возвращает текущую последнюю версию миграции Args: None Returns: int: Версия последней миграции. """ logger.info(f'Попытка получения версии миграции') try: cursor = self.conn.cursor() cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1") version = cursor.fetchone()[0] logger.info(f'Получена текущая версия миграции: {version}') return version except Exception as e: logger.error(f'Ошибка при получении текущей версии миграции: {e}') raise def update_version(self, new_version: str, script_name: str): """ Обновляет версию миграций в таблице migrations. Добавляет новую запись в таблицу migrations с указанной версией, именем скрипта и текущей датой и временем. Args: new_version (int): Новая версия миграции script_name (str): Имя скрипта миграции Returns: None Raises: sqlite3. IntegrityError: Если возникает ошибка целостности при вставке данных в таблицу migrations. Exception: Если возникает любая другая ошибка при обновлении версии. """ logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}') try: today = datetime.now().strftime("%d-%m-%Y %H:%M:%S") cursor = self.conn.cursor() cursor.execute( "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)", (new_version, script_name, today), ) self.conn.commit() logger.info(f"Версия обновлена: {new_version}, название скрипта: {script_name}") except sqlite3.IntegrityError as e: logger.error(f"Ошибка при обновлении версии: {e}") raise except Exception as e: logger.error(f"Ошибка при обновлении версии: {e}") raise # TODO: Deprecated. Остался только в voice боте, удалить и оттуда def get_error_message_from_db(self, id: int): """ @deprecated Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются: id - идентификатор ошибки """ # Подключаемся к базе try: cursor = self.conn.cursor() cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,)) response_from_database = str(cursor.fetchone()[1]) return response_from_database except sqlite3.Error as error: print(error) def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool, language_code: str, date_added: str, date_changed: str): """ Добавляет нового пользователя в базу данных. Args: user_id (int): Идентификатор пользователя в Telegram. first_name (str): Имя пользователя. full_name (str): Полное имя пользователя. username (str): Username пользователя в Telegram. is_bot (bool): Флаг, указывающий, является ли пользователь ботом. language_code (str): Код языка пользователя. date_added (str): Дата добавления пользователя в базу. date_changed (str): Дата последнего изменения данных пользователя. Returns: None: Если запись успешно добавлена в базу. Exception: Если произошла ошибка при добавлении записи. """ logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}") try: self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', " "'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed)) self.conn.commit() logger.info(f"Новый пользователь добавлен в базу: user_id={user_id}, first_name={first_name}") return None except sqlite3.Error as error: logger.error(f"Ошибка при добавлении пользователя в базу: {error}. " f"Данные пользователя: user_id={user_id}, first_name={first_name}") raise def user_exists(self, user_id: int): """ Проверяет, существует ли пользователь в базе данных. Args: user_id (int): Идентификатор пользователя. Returns: bool: True, если пользователь найден, False - иначе. """ logger.info(f"Попытка проверки существования пользователя: user_id={user_id}") try: self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchall() logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}") return bool(len(result)) except sqlite3.Error as error: logger.error(f"Ошибка при проверке существования пользователя: {error}") raise def get_user_id(self, user_id: int): """ @deprecated Возвращает ID пользователя в базе данных по его user_id. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: int: ID пользователя в базе данных. None: Если пользователь не найден. """ logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}") try: self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: user_id_db = result[0] logger.info(f"ID пользователя в базе найден: user_id={user_id}, id_db={user_id_db}") return user_id_db else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") return 0 except sqlite3.Error as error: logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}") raise def get_username(self, user_id: int): """ Возвращает username пользователя из базы данных по его user_id в Telegram. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: str: Username пользователя. None: Если пользователь не найден. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ try: self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: username = result[0] logger.info(f"Username пользователя найден: user_id={user_id}, username={username}") return username else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") return "" except sqlite3.Error as error: logger.error(f"Ошибка при получении username из базы данных: {error}") raise def get_all_user_id(self): """ Возвращает список всех user_id из базы данных. Returns: list: Список user_id. []: Если в базе данных нет пользователей. Raises: sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Попытка получения всех user_id") try: self.cursor.execute("SELECT user_id FROM our_users") fetch_all = self.cursor.fetchall() list_of_users = [user_id[0] for user_id in fetch_all] logger.info(f"Получен список всех user_id: {list_of_users}") return list_of_users except sqlite3.Error as error: logger.error(f"Ошибка при получении списка user_id из базы данных: {error}") raise def get_user_first_name(self, user_id: int): """ Возвращает имя пользователя из базы данных по его user_id в Telegram. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: str: Имя пользователя. None: Если пользователь не найден. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Попытка получения имени пользователя по user_id={user_id}") try: self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: first_name = result[0] logger.info(f"Имя пользователя найдено: user_id={user_id}, first_name={first_name}") return first_name else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") return None except sqlite3.Error as error: logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}") raise def change_name(self, user_id): #TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. Обновляем поля first_name, date_changed #result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), ) pass def get_info_about_stickers(self, user_id: int): """ Проверяет, получил ли пользователь стикеры. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: bool: True, если пользователь получил стикеры, False - иначе. None: Если пользователь не найден. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.") try: self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: has_stickers = result[0] == 1 logger.info(f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}") return has_stickers else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") return None except sqlite3.Error as error: logger.error(f"Ошибка при получении информации о получении стикеров: {error}") raise def update_info_about_stickers(self, user_id): """ Обновляет информацию о получении стикеров пользователем. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: None: Если обновление успешно выполнено. Raises: sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}") try: self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}") return None except sqlite3.Error as error: logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}") raise def get_users_blacklist(self): """ Возвращает список пользователей в черном списке. Returns: dict: Словарь, где ключ - user_id, значение - username. {}: Если в черном списке нет пользователей. Raises: sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции get_users_blacklist") try: self.cursor.execute("SELECT user_id, user_name FROM blacklist") fetch_all = self.cursor.fetchall() list_of_users = {user_id: username for user_id, username in fetch_all} logger.info(f"Получен список пользователей в черном списке") return list_of_users except sqlite3.Error as error: logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}") raise def get_users_for_unblock_today(self, date_to_unban: str): """ Возвращает список пользователей, у которых истекает срок блокировки сегодня. Args: date_to_unban (str): Дата разблокировки. Returns: dict: Словарь, где ключ - user_id, значение - username. {}: Если сегодня нет пользователей, у которых истекает срок блокировки. Raises: sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}") try: result = self.cursor.execute("SELECT user_id, user_name " "FROM blacklist WHERE date_to_unban = ?", (date_to_unban,)) fetch_all = result.fetchall() list_of_users = {user_id: username for user_id, username in fetch_all} logger.info(f"Получен список пользователей для разблокировки сегодня: {list_of_users}") return list_of_users except sqlite3.Error as error: logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}") raise def get_blacklist_users_by_id(self, user_id: int): """ Возвращает информацию о пользователе в черном списке по user_id. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: tuple: Кортеж (user_id, user_name, message_for_user, date_to_unban). None: Если пользователь не найден в черном списке. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}") try: result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban " "FROM blacklist WHERE user_id = ?", (user_id, )) return self.cursor.fetchone() except sqlite3.Error as error: logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}") raise def check_user_in_blacklist(self, user_id: int): """ Проверяет, существует ли запись с данным user_id в blacklist. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: bool: True, если пользователь найден в черном списке, False - иначе. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}") try: self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() return bool(result) except sqlite3.Error as error: logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}") raise def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None): """ Добавляет пользователя в черный список. Args: user_id (int): Идентификатор пользователя в Telegram. user_name (str, optional): Username пользователя. Defaults to None. message_for_user (str, optional): Сообщение для пользователя. Defaults to None. date_to_unban (datetime, optional): Дата разблокировки. Defaults to None. Returns: None: Если добавление в черный список успешно выполнено. sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name}," f" message_for_user={message_for_user}, date_to_unban={date_to_unban}") try: result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name'," " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)", (user_id, user_name, message_for_user, date_to_unban,)) self.conn.commit() logger.info(f"Пользователь добавлен в черный список: user_id={user_id}") return None except sqlite3.Error as error: logger.error(f"Ошибка при добавлении пользователя в черный список: {error}") return error def delete_user_blacklist(self, user_id: int): """ Удаляет пользователя из черного списка. Args: user_id (int): Идентификатор пользователя в Telegram. Returns: bool: True, если удаление прошло успешно, False - в случае ошибки. Raises: None: Ошибки обрабатываются в блоке except, возвращая False. """ logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}") try: self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.") return True except sqlite3.Error as error: logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} " f"из таблицы blacklist. Ошибка: {str(error)}") return False def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str): """ Добавляет новое сообщение пользователя в базу данных. Args: message_text (str): Текст сообщения. user_id (int): Идентификатор пользователя в Telegram. message_id (int): Идентификатор сообщения в Telegram. date (str): Дата отправки сообщения. Returns: None: Если добавление прошло успешно. sqlite3.Error: Если произошла ошибка при выполнении запроса. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}") try: self.cursor.execute( "INSERT INTO user_messages (message_text, user_id, message_id, date) " "VALUES (?, ?, ?, ?)", (message_text, user_id, message_id, date)) self.conn.commit() logger.info(f"Новое сообщение добавлено в базу данных: message_id={message_id}") return None except sqlite3.Error as error: logger.error(f"Ошибка добавления сообщения в базу данных: {error}") raise def update_date_for_user(self, date: str, user_id: int): """ Обновляет дату последнего изменения данных пользователя в базе. Args: date (str): Новая дата изменения. user_id (int): Идентификатор пользователя в Telegram. Returns: None: Если обновление прошло успешно. sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}") try: self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?", (date, user_id,)) self.conn.commit() logger.info(f"Дата изменения обновлена для пользователя: user_id={user_id}") return None except sqlite3.Error as error: logger.error(f"Ошибка обновления даты изменения для пользователя: {error}") return error def is_admin(self, user_id: int): """ Проверяет, является ли пользователь администратором. Args: user_id: ID пользователя Telegram. Returns: True, если пользователь администратор, иначе False. Raises: None: В случае ошибки возвращается None """ try: self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() return bool(result) except sqlite3.Error as error: logger.error(f"Ошибка добавления сообщения в базу данных: {error}") return None def add_admin(self, user_id: int, role: str): """ Добавляет пользователя в список администраторов. Args: user_id (int): ID пользователя Telegram. role (str): Роль пользователя. Доступные варианты: 1. creator - создатель 2. admin - обычная роль Returns: None: Если добавление прошло успешно. sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}") try: self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role)) self.conn.commit() logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.") return None except sqlite3.Error as error: logger.error(f"Ошибка добавления пользователя в список администраторов: {error}") return error def remove_admin(self, user_id: int): """ Удаляет пользователя из списка администраторов. Args: user_id (int): ID пользователя Telegram. Returns: None: Если удаление прошло успешно. sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции remove_admin: user_id={user_id}") try: self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.") return None except sqlite3.Error as error: logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}") return error def get_user_by_message_id(self, message_id: int): """ Возвращает идентификатор пользователя по идентификатору сообщения. Args: message_id (int): Идентификатор сообщения в Telegram. Returns: int: Идентификатор пользователя. None: Если пользователь не найден. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}") try: result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,)) user = result.fetchone()[0] logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}") return user except sqlite3.Error as error: logger.error(f"Ошибка получения user_id по message_id: {error}") raise def get_last_users_from_db(self): """ Возвращает список идентификаторов последних 100 пользователей, обращавшихся в бот. Returns: list: Список кортежей (full_name, user_id) последних 100 пользователей. []: Если в базе данных нет пользователей. Raises: sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info("Запуск функции get_last_users_from_db") try: result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 100") users = result.fetchall() logger.info(f"Получен список последних 100 пользователей: {users}") return users except sqlite3.Error as error: logger.error(f"Ошибка получения списка последних пользователей: {error}") raise def get_banned_users_from_db(self): """ Возвращает список идентификаторов пользователей в черном списке бота. Returns: list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке. []: Если в черном списке нет пользователей. Raises: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ logger.info("Запуск функции get_banned_users_from_db") try: result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist") users = result.fetchall() logger.info(f"Получен список пользователей в черном списке: {users}") return users except sqlite3.Error as error: logger.error(f"Ошибка получения списка пользователей в черном списке: {error}") raise def get_banned_users_from_db_with_limits(self, offset: int, limit: int): """ Возвращает список идентификаторов пользователей в черном списке бота с учетом смещения и ограничения. Args: offset (int): Смещение для выборки limit (int): Ограничение количества возвращаемых записей. Returns: list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке. []: Если в черном списке нет пользователей или количество записей с учетом смещения и ограничения равно 0. Raises: sqlite3. Error: Если произошла ошибка при выполнении запроса. """ logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}") try: result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban " "FROM blacklist LIMIT ?, ?", (offset, limit,)) users = result.fetchall() logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {users}") return users except sqlite3.Error as error: logger.error(f"Ошибка получения списка пользователей в черном списке: {error}") raise def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id): """Добавляет информацию о войсе юзера в БД""" try: result = self.cursor.execute( "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", (file_name, author_id, date_added, listen_count, file_id)) return self.conn.commit() except sqlite3.Error as error: print(error) def last_date_audio(self): """Получаем дату последнего войса""" try: result = self.cursor.execute( "SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1") return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_last_user_audio_record(self, user_id): """Получает данные о количестве записей пользователя""" try: result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", (user_id,)) return bool(len(result.fetchall())) except sqlite3.Error as error: print(error) def get_id_for_audio_record(self, user_id): """Получает ID аудио сообщения пользователя""" try: result = self.cursor.execute( "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_path_for_audio_record(self, user_id): """Получает данные о названии файла""" try: result = self.cursor.execute( "SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def check_listen_audio(self, user_id): """Проверяет прослушано ли аудио пользователем""" try: query_listen_audio = self.cursor.execute( """SELECT l.file_name FROM audio_message_reference a LEFT JOIN listen_audio_users l ON l.file_name = a.file_name WHERE l.user_id = ? AND l.file_name IS NOT NULL""", (user_id,)) check_sign = query_listen_audio.fetchall() query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?', (user_id,)) sign_all_audio = query_all_audio.fetchall() new_sign1 = list(set(sign_all_audio) - set(check_sign)) new_sign = [] for i in new_sign1: new_sign.append(i[0]) return new_sign except sqlite3.Error as error: print(error) def mark_listened_audio(self, file_name, user_id): """Отмечает аудио прослушанным для конкретного пользователя.""" try: result = self.cursor.execute( "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", (file_name, user_id, 1)) return self.conn.commit() except sqlite3.Error as error: print(error) def close(self): """Закрываем соединение с БД""" self.conn.close()