import sqlite3 import configparser import os from datetime import datetime from loguru import logger from custom_logger import Logger db_logger = Logger(name='db') script_dir = os.path.dirname(os.path.abspath(__file__)) config_path = os.path.join(script_dir, 'settings.ini') config = configparser.ConfigParser() config.read(config_path) LOGS = config.getboolean('Settings', 'logs') IMPORTANT_LOGS = config.get('Telegram', 'important_logs') class BotDB: def __init__(self): db_file_path = os.path.dirname(os.path.abspath(__file__)) db_file = os.path.join(db_file_path, 'tg-bot-database') self.conn = sqlite3.connect(db_file, check_same_thread=False) self.cursor = self.conn.cursor() logger.info(f'Подключен к базе данных: {db_file_path}') def create_table(self, sql_script): """Создает таблицу в базе.""" try: cursor = self.conn.cursor() cursor.execute(sql_script) except Exception as e: print(f"Ошибка при создании таблицы: {e}") def get_current_version(self): """Получает текущую версию миграций из таблицы migrations.""" try: cursor = self.conn.cursor() cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1") version = cursor.fetchone()[0] return version except Exception as e: print(f"Ошибка при получении версии: {e}") return 0 def update_version(self, new_version, script_name): """Обновляет версию миграций в таблице migrations.""" logger.info(f'Попытка обновления версии: {new_version}, скрипт: {script_name}') try: current_date = datetime.now() today = current_date.strftime("%d-%m-%Y %H:%M:%S") cursor = self.conn.cursor() cursor.execute( "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)", (new_version, script_name, today), ) self.conn.commit() logger.info(f"Версия обновлена: {new_version}, скрипт: {script_name}") except sqlite3.IntegrityError as e: logger.error(f"Ошибка при обновлении версии: {e}") except Exception as e: logger.error(f"Ошибка при обновлении версии: {e}") # TODO: Deprecated, удалить def get_message_from_db(self, type: str, username): """Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются: type - тип получаемой обратной связи, строковое значение, сохраненное в БД username - имя пользователя """ # Подключаемся к базе try: cursor = self.conn.cursor() cursor.execute(f"SELECT * FROM messages WHERE type=?", (type,)) # Забираем данные из таблицы, преобразуем в строку, заменяем поле username на имя пользователя, # и вместо амберсанда подставляем перенос строки if type == 'connect_with_admin' or type == 'del_message' or type == 'suggest_news' or type == 'start_message': response_from_database = str(cursor.fetchone()[1]).replace('username', username).replace('&', '\n') else: response_from_database = str(cursor.fetchone()[1]).replace('&', '\n') return response_from_database except sqlite3.Error as error: print(error) # TODO: Deprecated. Остался только в voice боте, удалить и оттуда def get_error_message_from_db(self, id: int): """Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются: id - идентификатор ошибки """ # Подключаемся к базе try: cursor = self.conn.cursor() cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,)) response_from_database = str(cursor.fetchone()[1]) return response_from_database except sqlite3.Error as error: print(error) def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed): """Добавляем юзера в базу""" try: self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', " "'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed)) return self.conn.commit() except sqlite3.Error as error: print(error) def user_exists(self, user_id): """Проверяем, есть ли юзер в базе""" try: result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,)) return bool(len(result.fetchall())) except sqlite3.Error as error: print(error) def get_user_id(self, user_id): """Достаем id юзера в базе по его user_id""" try: result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_username(self, user_id): """Достаем id юзера в базе по его user_id""" try: result = self.cursor.execute("SELECT `username` FROM `our_users` WHERE `user_id` = ?", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_all_user_id(self): """Достаем все айдишники юзеров из БД и преобразуем их в список""" try: result = self.cursor.execute("SELECT `user_id` FROM `our_users`", ) fetch_all = result.fetchall() list_of_users = [] for i in fetch_all: list_of_users.append(i[0]) return list_of_users except sqlite3.Error as error: print(error) def get_user_first_name(self, user_id): try: result = self.cursor.execute("SELECT `first_name` FROM `our_users` WHERE `user_id` = ?", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def change_name(self, user_id): #TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed #result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), ) pass def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id): """Добавляет информацию о войсе юзера в БД""" try: result = self.cursor.execute( "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", (file_name, author_id, date_added, listen_count, file_id)) return self.conn.commit() except sqlite3.Error as error: print(error) def last_date_audio(self): """Получаем дату последнего войса""" try: result = self.cursor.execute( "SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1") return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_last_user_audio_record(self, user_id): """Получает данные о количестве записей пользователя""" try: result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", (user_id,)) return bool(len(result.fetchall())) except sqlite3.Error as error: print(error) def get_id_for_audio_record(self, user_id): """Получает ID аудио сообщения пользователя""" try: result = self.cursor.execute( "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_path_for_audio_record(self, user_id): """Получает данные о названии файла""" try: result = self.cursor.execute( "SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def check_listen_audio(self, user_id): """Проверяет прослушано ли аудио пользователем""" try: query_listen_audio = self.cursor.execute( """SELECT l.file_name FROM audio_message_reference a LEFT JOIN listen_audio_users l ON l.file_name = a.file_name WHERE l.user_id = ? AND l.file_name IS NOT NULL""", (user_id,)) check_sign = query_listen_audio.fetchall() query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?', (user_id,)) sign_all_audio = query_all_audio.fetchall() new_sign1 = list(set(sign_all_audio) - set(check_sign)) new_sign = [] for i in new_sign1: new_sign.append(i[0]) return new_sign except sqlite3.Error as error: print(error) def mark_listened_audio(self, file_name, user_id): """Отмечает аудио прослушанным для конкретного пользователя.""" try: result = self.cursor.execute( "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", (file_name, user_id, 1)) return self.conn.commit() except sqlite3.Error as error: print(error) def get_info_about_stickers(self, user_id): """Получает данные о получении стикеров пользователем""" try: result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,)) return result.fetchone()[0] == 1 #return result.fetchone()[0] except sqlite3.Error as error: print(error) def update_info_about_stickers(self, user_id): """Обновляет данные о получении стикеров пользователем""" try: result = self.cursor.execute("UPDATE `our_users` SET `has_stickers` = 1 WHERE `user_id` = ?", (user_id,)) return self.conn.commit() except sqlite3.Error as error: print(error) def get_users_blacklist(self): """Возвращает список пользователей в черном списке""" try: result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist`") fetch_all = result.fetchall() list_of_users = {} for i in fetch_all: list_of_users[i[0]] = i[1] return list_of_users except sqlite3.Error as error: print(error) def get_users_for_unblock_today(self, date_to_unban): """Возвращает пользователей у которых истекает срок блокировки сегодня""" try: result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist` WHERE date_to_unban = ?", (date_to_unban,)) fetch_all = result.fetchall() list_of_users = {} for i in fetch_all: list_of_users[i[0]] = i[1] return list_of_users except sqlite3.Error as error: print(error) def get_blacklist_users_by_id(self, user_id): """Возвращает список пользователей в черном списке по user_id""" try: result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban FROM `blacklist` WHERE user_id = ?", (user_id, )) return self.cursor.fetchone() except sqlite3.Error as error: print(error) def check_user_in_blacklist(self, user_id): """Проверяет, существует ли запись с данным user_id в blacklist.""" self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() return bool(result) def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None): """Добавляет пользователя в черный список""" try: result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name'," " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)", (user_id, user_name, message_for_user, date_to_unban,)) return self.conn.commit() except sqlite3.Error as error: return error def delete_user_blacklist(self, user_id): """Удаляет пользователя из черного списка""" try: #TODO: Функция всегда возвращает true, даже если такого id нет в таблице self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Пользователь с идентификатором {user_id} успешно удален.") return True except sqlite3.Error as error: logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} из таблицы blacklist. Ошибка: {str(error)}") return False def add_new_message_in_db(self, message_text, user_id, message_id, date): """Добавляем сообщение юзера в базу""" try: self.cursor.execute( "INSERT INTO `user_messages` (message_text, user_id, message_id, date) " "VALUES (?, ?, ?, ?)", (message_text, message_id, user_id, date)) return self.conn.commit() except sqlite3.Error as error: print(error) def update_date_for_user(self, date, user_id: int): try: result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?", (date, user_id,)) return self.conn.commit() except sqlite3.Error as error: print(error) def is_admin(self, user_id): """ Проверяет, является ли пользователь администратором. Args: user_id: ID пользователя Telegram. Returns: True, если пользователь администратор, иначе False. """ self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() return bool(result) def add_admin(self, user_id, role): """ Добавляет пользователя в список администраторов. Args: user_id: ID пользователя Telegram. role: Роль пользователя. Доступные варианты: 1. creator - создатель 2. admin - обычная роль """ self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role)) return self.conn.commit() def remove_admin(self, user_id): """ Удаляет пользователя из списка администраторов. Args: user_id: ID пользователя Telegram. """ self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,)) return self.conn.commit() def get_user_by_message_id(self, message_id): """Возвращает идентификатор пользователя по идентификатору сообщения""" try: result = self.cursor.execute("SELECT user_id FROM `user_messages` WHERE message_id = ?", (message_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) def get_last_users_from_db(self): """Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот""" try: result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC") users = result.fetchall() return users except sqlite3.Error as error: print(error) def get_banned_users_from_db(self): """Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот""" try: result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist", ) users = result.fetchall() return users except sqlite3.Error as error: print(error) def get_banned_users_from_db_with_limits(self, offset: int, limit: int): """Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот""" try: result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist LIMIT ?, ?", (offset, limit,) ) users = result.fetchall() return users except sqlite3.Error as error: print(error) def close(self): """Закрываем соединение с БД""" self.conn.close()