diff --git a/database/db.py b/database/db.py
index 26114f0..a9e936b 100644
--- a/database/db.py
+++ b/database/db.py
@@ -1,1228 +1,1399 @@
-import os
-import sqlite3
-import asyncio
-from datetime import datetime
-from concurrent.futures import ThreadPoolExecutor
-
-from logs.custom_logger import logger
-
-
-class BotDB:
- def __init__(self, current_dir, name):
- # Формируем правильный путь к базе данных
- if name.startswith('database/'):
- self.db_file = os.path.join(current_dir, name)
- else:
- self.db_file = os.path.join(current_dir, 'database', name)
- self.conn = None
- self.cursor = None
- self.logger = logger
- self.logger.info(f'Инициация базы данных: {self.db_file}')
- # Создаем пул потоков для асинхронных операций
- self.executor = ThreadPoolExecutor(max_workers=4)
-
- def connect(self):
- """Создание соединения и курсора."""
- # Добавляем таймаут для предотвращения зависаний
- self.conn = sqlite3.connect(self.db_file, timeout=10.0)
- # Включаем WAL режим для лучшей производительности
- self.conn.execute("PRAGMA journal_mode=WAL")
- self.cursor = self.conn.cursor()
-
- def create_table(self, sql_script):
- """
- Создает таблицу в базе. Используется в миграциях
-
- Args:
- sql_script: DDL скрипт таблицы
-
- Returns:
- None
- """
- try:
- self.connect()
- self.cursor.execute(sql_script)
- self.conn.commit()
- self.logger.info(f'Таблица создана: {sql_script}')
- except Exception as e:
- self.logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}')
- raise
- finally:
- self.close()
-
- def get_current_version(self):
- """
- Возвращает текущую последнюю версию миграции
-
- Args:
- None
-
- Returns:
- int: Версия последней миграции.
- """
- self.logger.info(f'Попытка получения версии миграции')
- try:
- self.connect()
- self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
- version = self.cursor.fetchone()[0]
- self.logger.info(f'Получена текущая версия миграции: {version}')
- return version
- except Exception as e:
- self.logger.error(f'Ошибка при получении текущей версии миграции: {str(e)}')
- raise
- finally:
- self.close()
-
- def update_version(self, new_version: int, script_name: str):
- """
- Обновляет версию миграций в таблице migrations.
-
- Добавляет новую запись в таблицу migrations с указанной версией,
- именем скрипта и текущей датой и временем.
-
- Args:
- new_version (int): Новая версия миграции
- script_name (str): Имя скрипта миграции
-
- Returns:
- None
-
- Raises:
- sqlite3. IntegrityError: Если возникает ошибка целостности при вставке
- данных в таблицу migrations.
- Exception: Если возникает любая другая ошибка при обновлении версии.
- """
- self.logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}')
- try:
- self.connect()
- today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
- self.cursor.execute(
- "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
- (new_version, script_name, today),
- )
- self.conn.commit()
- self.logger.info(f"Версия обновлена: {new_version}, название скрипта: {script_name}")
- except sqlite3.IntegrityError as e:
- self.logger.error(f"Ошибка при обновлении версии: {e}")
- raise
- except Exception as e:
- self.logger.error(f"Ошибка при обновлении версии: {e}")
- raise
- finally:
- self.close()
-
- # TODO: Deprecated. Остался только в voice боте, удалить и оттуда
- def get_error_message_from_db(self, id: int):
- """
- @deprecated
- Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
- id - идентификатор ошибки
- """
- # Подключаемся к базе
- try:
- self.connect()
- self.cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
- response_from_database = str(self.cursor.fetchone()[1])
- return response_from_database
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении сообщения об ошибка voice_bot: {error}")
- finally:
- self.close()
-
- def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool,
- language_code: str, date_added: str,
- date_changed: str):
- """
- Добавляет нового пользователя в базу данных.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
- first_name (str): Имя пользователя.
- full_name (str): Полное имя пользователя.
- username (str): Username пользователя в Telegram.
- is_bot (bool): Флаг, указывающий, является ли пользователь ботом.
- language_code (str): Код языка пользователя.
- date_added (str): Дата добавления пользователя в базу.
- date_changed (str): Дата последнего изменения данных пользователя.
-
- Returns:
- None: Если запись успешно добавлена в базу.
- Exception: Если произошла ошибка при добавлении записи.
- """
- self.logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}")
- try:
- self.connect()
- self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
- "'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
- (user_id, first_name, full_name,
- username, is_bot, language_code, date_added, date_changed))
- self.conn.commit()
- self.logger.info(f"Новый пользователь добавлен в базу: user_id={user_id}, first_name={first_name}")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при добавлении пользователя в базу: {error}. "
- f"Данные пользователя: user_id={user_id}, first_name={first_name}")
- raise
- finally:
- self.close()
-
- def user_exists(self, user_id: int):
- """
- Проверяет, существует ли пользователь в базе данных.
-
- Args:
- user_id (int): Идентификатор пользователя.
-
- Returns:
- bool: True, если пользователь найден, False - иначе.
- """
- self.logger.info(f"Попытка проверки существования пользователя: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchall()
- self.logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}")
- return bool(len(result))
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при проверке существования пользователя: {error}")
- raise
- finally:
- self.close()
-
- def get_user_id(self, user_id: int):
- """
- @deprecated
- Возвращает ID пользователя в базе данных по его user_id.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- int: ID пользователя в базе данных.
- None: Если пользователь не найден.
- """
- self.logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- if result:
- user_id_db = result[0]
- self.logger.info(f"ID пользователя в базе найден: user_id={user_id}, id_db={user_id_db}")
- return user_id_db
- else:
- self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_username(self, user_id: int):
- """
- Возвращает username пользователя из базы данных по его user_id в Telegram.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- str: Username пользователя.
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- try:
- self.connect()
- self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- if result:
- username = result[0]
- self.logger.info(f"Username пользователя найден: user_id={user_id}, username={username}")
- return username
- else:
- self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении username из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_user_id_by_username(self, username: str):
- """
- Возвращает user_id пользователя из базы данных по его user_name в Telegram.
-
- Args:
- username (str): Username пользователя.
-
- Returns:
- user_id (int): Идентификатор пользователя в Telegram.
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- try:
- self.connect()
- self.cursor.execute("SELECT user_id FROM our_users WHERE username = ?", (username,))
- result = self.cursor.fetchone()
- if result:
- user_id = result[0]
- self.logger.info(f"User_id пользователя найден: username={username}, user_id={user_id}")
- return user_id
- else:
- self.logger.info(f"Пользователь с username={username} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении username из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_full_name_by_id(self, user_id: str):
- """
- Возвращает full_name пользователя из базы данных по его username в Telegram.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- full_name (str): Username пользователя.
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- try:
- self.connect()
- self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- if result:
- full_name = result[0]
- self.logger.info(f"Username пользователя найден: user_id={user_id}, full_name={full_name}")
- return full_name
- else:
- self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении username из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_user_info_by_id(self, user_id: int):
- """
- Возвращает информацию о пользователе из базы данных по его user_id.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- dict: Словарь с информацией о пользователе (username, full_name).
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- try:
- self.connect()
- self.cursor.execute("SELECT username, full_name FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- if result:
- user_info = {
- 'username': result[0],
- 'full_name': result[1]
- }
- self.logger.info(f"Информация о пользователе найдена: user_id={user_id}, username={user_info['username']}, full_name={user_info['full_name']}")
- return user_info
- else:
- self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении информации о пользователе из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_all_user_id(self):
- """
- Возвращает список всех user_id из базы данных.
-
- Returns:
- list: Список user_id.
- []: Если в базе данных нет пользователей.
-
- Raises:
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Попытка получения всех user_id")
- try:
- self.connect()
- self.cursor.execute("SELECT user_id FROM our_users")
- fetch_all = self.cursor.fetchall()
- list_of_users = [user_id[0] for user_id in fetch_all]
- self.logger.info(f"Получен список всех user_id: {list_of_users}")
- return list_of_users
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении списка user_id из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_user_first_name(self, user_id: int):
- """
- Возвращает имя пользователя из базы данных по его user_id в Telegram.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- str: Имя пользователя.
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Попытка получения имени пользователя по user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- if result:
- first_name = result[0]
- self.logger.info(f"Имя пользователя найдено: user_id={user_id}, first_name={first_name}")
- return first_name
- else:
- self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}")
- raise
- finally:
- self.close()
-
- def get_info_about_stickers(self, user_id: int):
- """
- Проверяет, получил ли пользователь стикеры.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- bool: True, если пользователь получил стикеры, False - иначе.
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.")
- try:
- self.connect()
- self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- if result:
- has_stickers = result[0] == 1
- self.logger.info(
- f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}")
- return has_stickers
- else:
- self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении информации о получении стикеров: {error}")
- raise
- finally:
- self.close()
-
- def update_info_about_stickers(self, user_id):
- """
- Обновляет информацию о получении стикеров пользователем.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- None: Если обновление успешно выполнено.
-
- Raises:
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,))
- self.conn.commit()
- self.logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}")
- raise
- finally:
- self.close()
-
- def get_users_blacklist(self):
- """
- Возвращает список пользователей в черном списке.
-
- Returns:
- dict: Словарь, где ключ - user_id, значение - username.
- {}: Если в черном списке нет пользователей.
-
- Raises:
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции get_users_blacklist")
- try:
- self.connect()
- self.cursor.execute("SELECT user_id, user_name FROM blacklist")
- fetch_all = self.cursor.fetchall()
- list_of_users = {user_id: username for user_id, username in fetch_all}
- self.logger.info(f"Получен список пользователей в черном списке")
- return list_of_users
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}")
- raise
- finally:
- self.close()
-
- def get_users_for_unblock_today(self, date_to_unban: str):
- """
- Возвращает список пользователей, у которых истекает срок блокировки сегодня.
-
- Args:
- date_to_unban (str): Дата разблокировки.
-
- Returns:
- dict: Словарь, где ключ - user_id, значение - username.
- {}: Если сегодня нет пользователей, у которых истекает срок блокировки.
-
- Raises:
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT user_id, user_name "
- "FROM blacklist WHERE date_to_unban = ?", (date_to_unban,))
- fetch_all = result.fetchall()
- list_of_users = {user_id: username for user_id, username in fetch_all}
- self.logger.info(f"Получен список пользователей для разблокировки сегодня: {list_of_users}")
- return list_of_users
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}")
- raise
- finally:
- self.close()
-
- def get_blacklist_users_by_id(self, user_id: int):
- """
- Возвращает информацию о пользователе в черном списке по user_id.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- tuple: Кортеж (user_id, user_name, message_for_user, date_to_unban).
- None: Если пользователь не найден в черном списке.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban "
- "FROM blacklist WHERE user_id = ?", (user_id,))
- return self.cursor.fetchone()
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}")
- raise
- finally:
- self.close()
-
- def check_user_in_blacklist(self, user_id: int):
- """
- Проверяет, существует ли запись с данным user_id в blacklist.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- bool: True, если пользователь найден в черном списке, False - иначе.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- self.logger.info(f"Существует ли пользователь: user_id={user_id} Итог: {result}")
- return bool(result)
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}")
- raise
- finally:
- self.close()
-
- def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None):
- """
- Добавляет пользователя в черный список.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
- user_name (str, optional): Username пользователя. Defaults to None.
- message_for_user (str, optional): Сообщение для пользователя. Defaults to None.
- date_to_unban (datetime, optional): Дата разблокировки. Defaults to None.
-
- Returns:
- None: Если добавление в черный список успешно выполнено.
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name},"
- f" message_for_user={message_for_user}, date_to_unban={date_to_unban}")
- try:
- self.connect()
- result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
- " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
- (user_id, user_name, message_for_user, date_to_unban,))
- self.conn.commit()
- self.logger.info(f"Пользователь добавлен в черный список: user_id={user_id}")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка при добавлении пользователя в черный список: {error}")
- return error
- finally:
- self.close()
-
- def delete_user_blacklist(self, user_id: int):
- """
- Удаляет пользователя из черного списка.
-
- Args:
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- bool: True, если удаление прошло успешно, False - в случае ошибки.
-
- Raises:
- None: Ошибки обрабатываются в блоке except, возвращая False.
- """
- self.logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
- self.conn.commit()
- self.logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.")
- return True
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} "
- f"из таблицы blacklist. Ошибка: {str(error)}")
- return False
- finally:
- self.close()
-
- def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str):
- """
- Добавляет новое сообщение пользователя в базу данных.
-
- Args:
- message_text (str): Текст сообщения.
- user_id (int): Идентификатор пользователя в Telegram.
- message_id (int): Идентификатор сообщения в Telegram.
- date (str): Дата отправки сообщения.
-
- Returns:
- None: Если добавление прошло успешно.
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(
- f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}")
- try:
- self.connect()
- self.cursor.execute(
- "INSERT INTO user_messages (message_text, user_id, message_id, date) "
- "VALUES (?, ?, ?, ?)",
- (message_text, user_id, message_id, date))
- self.conn.commit()
- self.logger.info(f"Новое сообщение добавлено в базу данных: message_id={message_id}")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
- raise
- finally:
- self.close()
-
- def get_username_and_full_name(self, user_id: int):
- """
- Получает full_name и username пользователя по ID из базы
-
- Args:
- date (str): Новая дата изменения.
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- username (str): username пользователя
- full_name (str): full_name пользователя
- """
- self.logger.info(
- f"Запуск функции check_username_and_first_name: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
- username = self.cursor.fetchone()[0]
- self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
- full_name = self.cursor.fetchone()[0]
- self.logger.info(
- f"Функция check_username_and_first_name успешно отработала: user_id={user_id}, username={username}, full_name={full_name}")
- return username, full_name
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка в функции get_username_and_first_name: {error}")
- return None
- finally:
- self.close()
-
- def update_username_and_full_name(self, user_id: int, username: str, full_name: str):
- """
- Обновляет full_name и username пользователя
-
- Args:
- username (str): username пользователя
- full_name (str): full_name пользователя
- user_id (int): Идентификатор пользователя в Telegram
-
- Returns:
- True (bool): Если обновления прошли успешно
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(
- f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}")
- try:
- self.connect()
- self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
- (username, full_name, user_id,))
- self.conn.commit()
- self.logger.info(
- f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены")
- return True
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}")
- raise
- finally:
- self.close()
-
- def update_date_for_user(self, date: str, user_id: int):
- """
- #TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users
- Обновляет дату последнего изменения данных пользователя в базе.
-
- Args:
- date (str): Новая дата изменения.
- user_id (int): Идентификатор пользователя в Telegram.
-
- Returns:
- None: Если обновление прошло успешно.
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}")
- try:
- self.connect()
- self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?",
- (date, user_id,))
- self.conn.commit()
- self.logger.info(f"Дата изменения обновлена для пользователя: user_id={user_id}")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка обновления даты изменения для пользователя: {error}")
- return error
- finally:
- self.close()
-
- def is_admin(self, user_id: int):
- """
- Проверяет, является ли пользователь администратором.
-
- Args:
- user_id: ID пользователя Telegram.
-
- Returns:
- True, если пользователь администратор, иначе False.
-
- Raises:
- None: В случае ошибки возвращается None
- """
- self.logger.info(f"Запуск функции is_admin: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
- result = self.cursor.fetchone()
- return bool(result)
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
- return None
- finally:
- self.close()
-
- def add_admin(self, user_id: int, role: str):
- """
- Добавляет пользователя в список администраторов.
-
- Args:
- user_id (int): ID пользователя Telegram.
- role (str): Роль пользователя. Доступные варианты:
- 1. creator - создатель
- 2. admin - обычная роль
-
- Returns:
- None: Если добавление прошло успешно.
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}")
- try:
- self.connect()
- self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
- self.conn.commit()
- self.logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка добавления пользователя в список администраторов: {error}")
- return error
- finally:
- self.close()
-
- def remove_admin(self, user_id: int):
- """
- Удаляет пользователя из списка администраторов.
-
- Args:
- user_id (int): ID пользователя Telegram.
-
- Returns:
- None: Если удаление прошло успешно.
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции remove_admin: user_id={user_id}")
- try:
- self.connect()
- self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
- self.conn.commit()
- self.logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.")
- return None
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}")
- return error
- finally:
- self.connect()
-
- def get_user_by_message_id(self, message_id: int):
- """
- #TODO: Возвращается TypeError вместо None
- Возвращает идентификатор пользователя по идентификатору сообщения.
-
- Args:
- message_id (int): Идентификатор сообщения в Telegram.
-
- Returns:
- int: Идентификатор пользователя.
- None: Если пользователь не найден.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,))
- user = result.fetchone()[0]
- self.logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}")
- return user
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения user_id по message_id: {error}")
- raise
- finally:
- self.close()
-
- def get_last_users_from_db(self):
- """
- Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот.
-
- Returns:
- list: Список кортежей (full_name, user_id) последних 30 пользователей.
- []: Если в базе данных нет пользователей.
-
- Raises:
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info("Запуск функции get_last_users_from_db")
- try:
- self.connect()
- result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30")
- users = result.fetchall()
- self.logger.info(f"Получен список последних 30 пользователей: {users}")
- return users
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения списка последних пользователей: {error}")
- raise
- finally:
- self.close()
-
- def get_banned_users_from_db(self):
- """
- Возвращает список идентификаторов пользователей в черном списке бота.
-
- Returns:
- list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
- []: Если в черном списке нет пользователей.
-
- Raises:
- sqlite3.Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info("Запуск функции get_banned_users_from_db")
- try:
- self.connect()
- result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist")
- users = result.fetchall()
- self.logger.info(f"Получен список пользователей в черном списке: {users}")
- return users
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
- raise
- finally:
- self.close()
-
- def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
- """
- Возвращает список идентификаторов пользователей в черном списке бота с учетом смещения и ограничения.
-
- Args:
- offset (int): Смещение для выборки
- limit (int): Ограничение количества возвращаемых записей.
-
- Returns:
- list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
- []: Если в черном списке нет пользователей или количество записей с учетом смещения и ограничения равно 0.
-
- Raises:
- sqlite3. Error: Если произошла ошибка при выполнении запроса.
- """
- self.logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban "
- "FROM blacklist LIMIT ?, ?", (offset, limit,))
- users = result.fetchall()
- self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {users}")
- return users
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
- raise
- finally:
- self.close()
-
- def get_post_content_from_telegram_by_last_id(self, last_post_id: int):
- self.logger.info(
- f"Запуск функции get_post_content_from_telegram_by_last_id, идентификатор поста {last_post_id}")
- try:
- self.connect()
- result = self.cursor.execute("""
- SELECT cpft.content_name, cpft.content_type
- FROM post_from_telegram_suggest pft
- JOIN message_link_to_content mltc
- ON pft.message_id = mltc.post_id
- JOIN content_post_from_telegram cpft
- ON cpft.message_id = mltc.message_id
- WHERE pft.helper_text_message_id = ?
- """, (last_post_id,))
- post_content = result.fetchall()
- self.logger.info(
- f"Функция get_post_content_from_telegram_by_last_id получила список контента: {post_content}")
- return post_content
- finally:
- self.close()
-
- def get_post_ids_from_telegram_by_last_id(self, last_post_id: int):
- self.logger.info(
- f"Запуск функции get_post_ids_from_telegram_by_last_id, идентификатор поста {last_post_id}")
- try:
- self.connect()
- result = self.cursor.execute("""
- SELECT mltc.message_id
- FROM post_from_telegram_suggest pft
- JOIN message_link_to_content mltc
- ON pft.message_id = mltc.post_id
- WHERE pft.helper_text_message_id = ?
- """, (last_post_id,))
- post_ids = result.fetchall()
- self.logger.info(f"Функция get_post_ids_from_telegram_by_last_id "
- f"получила идентификаторы сообщений: {post_ids}")
- return post_ids
- except Exception as e:
- self.logger.error(f"Ошибка в функции get_post_ids_from_telegram_by_last_id {str(e)}")
- return False
- finally:
- self.close()
-
- def get_post_text_from_telegram_by_last_id(self, last_post_id: int):
- self.logger.info(f"Запуск функции get_post_text_from_telegram_by_last_id, идентификатор поста {last_post_id}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT text "
- "FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
- (last_post_id,))
- text = result.fetchone()[0]
- self.logger.info(f"Функция get_post_text_from_telegram_by_last_id получила text")
- return text
- except Exception as e:
- self.logger.error(f"Ошибка в функции get_post_text_from_telegram_by_last_id {str(e)}")
-
- def get_author_id_by_message_id(self, message_id: int):
- self.logger.info(f"Запуск функции get_author_id_by_message_id, идентификатор поста {message_id}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT author_id "
- "FROM post_from_telegram_suggest WHERE message_id = ?",
- (message_id,))
- author_id = result.fetchone()[0]
- self.logger.info(f"Функция get_author_id_by_message_id получила author_id {author_id}")
- return author_id
- except Exception as e:
- self.logger.error(f"Ошибка в функции get_author_id_by_message_id {str(e)}")
-
- def get_author_id_by_helper_message_id(self, helper_text_message_id: int):
- self.logger.info(f"Запуск функции get_author_id_by_helper_message_id, идентификатор поста "
- f"{helper_text_message_id}")
- try:
- self.connect()
- result = self.cursor.execute("SELECT author_id "
- "FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
- (helper_text_message_id,))
- author_id = result.fetchone()[0]
- self.logger.info(f"Функция get_author_id_by_helper_message_id получила author_id {author_id}")
- return author_id
- except Exception as e:
- self.logger.error(f"Ошибка в функции get_author_id_by_helper_message_id {str(e)}")
-
- def add_post_content_in_db(self, post_id: int, message_id: int, content_name: str, type_content: str):
- self.logger.info(
- f"Запуск функции add_post_content_in_db: post_id={post_id}, message_id={message_id}, "
- f"content_name={content_name}, content_type={type_content}")
- try:
- self.connect()
- self.cursor.execute(
- "INSERT INTO message_link_to_content (post_id, message_id)"
- "VALUES (?, ?)", (post_id, message_id))
- self.conn.commit()
- self.cursor.execute(
- "INSERT INTO content_post_from_telegram (message_id, content_name, content_type)"
- "VALUES (?, ?, ?)", (message_id, content_name, type_content))
- self.conn.commit()
- self.logger.info(f"Функция add_post_content_in_db отработала успешно")
- return True
- except Exception as e:
- self.logger.error(f"Ошибка в функции add_post_content_in_db при добавлении поста в базу данных: {e}")
- return False
-
- def add_post_in_db(self, message_id: int, text: str, author_id: int):
- self.logger.info(
- f"Запуск функции add_post_in_db: message_id={message_id}, "
- f"author_id={author_id}")
- try:
- today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
- self.connect()
- self.cursor.execute(
- "INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)"
- "VALUES (?, ?, ?, ?)", (message_id, text, author_id, today))
- self.conn.commit()
- self.logger.info(f"Функция add_post_in_db отработала успешно")
- return True
- except Exception as e:
- self.logger.error(f"Ошибка в функции add_post_in_db при добавлении поста в базу данных: {e}")
- return False
-
- def update_helper_message_in_db(self, message_id: int, helper_message_id: int):
- self.logger.info(
- f"Запуск функции update_helper_message_in_db: message_id={message_id}, "
- f"helper_message_id={helper_message_id}")
- try:
- self.connect()
- self.cursor.execute(
- "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
- (helper_message_id, message_id,))
- self.conn.commit()
- self.logger.info(f"Функция update_helper_message_in_db отработала успешно")
- return True
- except Exception as e:
- self.logger.error(f"Ошибка в функции update_helper_message_in_db при добавлении поста в базу данных: {e}")
- return False
-
- def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
- """Добавляет информацию о войсе юзера в БД"""
- self.logger.info(
- f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id},"
- f" date_added = {date_added}")
- try:
- self.connect()
- result = self.cursor.execute(
- "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) "
- "VALUES (?, ?, ?, ?, ?)",
- (file_name, author_id, date_added, listen_count, file_id))
- self.conn.commit()
- self.logger.info(
- f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, "
- f"date_added = {date_added}")
- return None
- except sqlite3.Error as error:
- print(error)
- raise
- finally:
- self.close()
-
- def last_date_audio(self):
- """Получаем дату последнего войса"""
- self.logger.info(
- f"Запуск функции last_date_audio")
- try:
- self.connect()
- result = self.cursor.execute(
- "SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
- last_date = result.fetchone()[0]
- self.logger.info(f"Последняя дата сообщения {last_date}")
- return last_date
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения последней даты войса: {error}")
- raise
- finally:
- self.close()
-
- def get_last_user_audio_record(self, user_id):
- """Получает данные о количестве записей пользователя"""
- self.logger.info(
- f"Запуск функции get_last_user_audio_record. user_id={user_id}")
- try:
- self.connect()
- r = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
- (user_id,))
- result = bool(len(r.fetchall()))
- self.logger.info(
- f"Результат функции get_last_user_audio_record: {result}")
- return result
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения последней даты войса: {error}")
- raise
- finally:
- self.close()
-
- def get_id_for_audio_record(self, user_id):
- """Получает ID аудио сообщения пользователя"""
- self.logger.info(
- f"Запуск функции get_id_for_audio_record. user_id={user_id}")
- try:
- self.connect()
- r = self.cursor.execute(
- "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? "
- "ORDER BY date_added DESC LIMIT 1",
- (user_id,))
- result = r.fetchone()[0]
- self.logger.info(
- f"Результат функции get_id_for_audio_record: {result}")
- return result
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения последней даты войса: {error}")
- raise
- finally:
- self.close()
-
- def get_path_for_audio_record(self, user_id):
- """Получает данные о названии файла"""
- self.logger.info(
- f"Запуск функции get_path_for_audio_record. user_id={user_id}")
- try:
- self.connect()
- r = self.cursor.execute(
- "SELECT `file_name` "
- "FROM `audio_message_reference` "
- "WHERE `author_id` = ? "
- "ORDER BY date_added "
- "DESC LIMIT 1",
- (user_id,))
- result = r.fetchone()[0]
- self.logger.info(
- f"Результат функции get_path_for_audio_record: {result}")
- return result
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения последней даты войса: {error}")
- raise
- finally:
- self.close()
-
- def check_listen_audio(self, user_id):
- """Проверяет прослушано ли аудио пользователем"""
- self.logger.info(
- f"Запуск функции check_listen_audio. user_id={user_id}")
- try:
- self.connect()
- query_listen_audio = self.cursor.execute(
- """SELECT l.file_name
- FROM audio_message_reference a
- LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
- WHERE l.user_id = ?
- AND l.file_name IS NOT NULL""", (user_id,))
- check_sign = query_listen_audio.fetchall()
- query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
- (user_id,))
- sign_all_audio = query_all_audio.fetchall()
- new_sign1 = list(set(sign_all_audio) - set(check_sign))
- new_sign = []
- for i in new_sign1:
- new_sign.append(i[0])
- self.logger.info(
- f"Функция check_listen_audio успешно отработала.")
- return new_sign
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения последней даты войса: {error}")
- raise
- finally:
- self.close()
-
- def mark_listened_audio(self, file_name, user_id):
- """Отмечает аудио прослушанным для конкретного пользователя."""
- self.logger.info(
- f"Запуск функции mark_listened_audio. file_name={file_name}, user_id={user_id}")
- try:
- self.connect()
- result = self.cursor.execute(
- "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
- (file_name, user_id, 1))
- return self.conn.commit()
- except sqlite3.Error as error:
- self.logger.error(f"Ошибка получения последней даты войса: {error}")
- raise
- finally:
- self.close()
-
- def close(self):
- """Закрытие соединения и курсора."""
- if self.cursor:
- self.cursor.close()
- if self.conn:
- self.conn.close()
-
- async def check_user_in_blacklist_async(self, user_id: int):
- """
- Асинхронная версия проверки пользователя в черном списке.
- """
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(self.executor, self.check_user_in_blacklist, user_id)
-
- async def get_blacklist_users_by_id_async(self, user_id: int):
- """
- Асинхронная версия получения информации о пользователе из черного списка.
- """
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(self.executor, self.get_blacklist_users_by_id, user_id)
+import os
+import sqlite3
+import asyncio
+from datetime import datetime
+from concurrent.futures import ThreadPoolExecutor
+
+from logs.custom_logger import logger
+
+
+class BotDB:
+ def __init__(self, current_dir, name):
+ # Формируем правильный путь к базе данных
+ if name.startswith('database/'):
+ self.db_file = os.path.join(current_dir, name)
+ else:
+ self.db_file = os.path.join(current_dir, 'database', name)
+ self.conn = None
+ self.cursor = None
+ self.logger = logger
+ self.logger.info(f'Инициация базы данных: {self.db_file}')
+ # Создаем пул потоков для асинхронных операций
+ self.executor = ThreadPoolExecutor(max_workers=4)
+
+ def connect(self):
+ """Создание соединения и курсора."""
+ # Добавляем таймаут для предотвращения зависаний
+ self.conn = sqlite3.connect(self.db_file, timeout=10.0)
+ # Включаем WAL режим для лучшей производительности
+ self.conn.execute("PRAGMA journal_mode=WAL")
+ self.cursor = self.conn.cursor()
+
+ def create_table(self, sql_script):
+ """
+ Создает таблицу в базе. Используется в миграциях
+
+ Args:
+ sql_script: DDL скрипт таблицы
+
+ Returns:
+ None
+ """
+ try:
+ self.connect()
+ self.cursor.execute(sql_script)
+ self.conn.commit()
+ self.logger.info(f'Таблица создана: {sql_script}')
+ except Exception as e:
+ self.logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}')
+ raise
+ finally:
+ self.close()
+
+ def get_current_version(self):
+ """
+ Возвращает текущую последнюю версию миграции
+
+ Args:
+ None
+
+ Returns:
+ int: Версия последней миграции.
+ """
+ self.logger.info(f'Попытка получения версии миграции')
+ try:
+ self.connect()
+ self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
+ version = self.cursor.fetchone()[0]
+ self.logger.info(f'Получена текущая версия миграции: {version}')
+ return version
+ except Exception as e:
+ self.logger.error(f'Ошибка при получении текущей версии миграции: {str(e)}')
+ raise
+ finally:
+ self.close()
+
+ def update_version(self, new_version: int, script_name: str):
+ """
+ Обновляет версию миграций в таблице migrations.
+
+ Добавляет новую запись в таблицу migrations с указанной версией,
+ именем скрипта и текущей датой и временем.
+
+ Args:
+ new_version (int): Новая версия миграции
+ script_name (str): Имя скрипта миграции
+
+ Returns:
+ None
+
+ Raises:
+ sqlite3. IntegrityError: Если возникает ошибка целостности при вставке
+ данных в таблицу migrations.
+ Exception: Если возникает любая другая ошибка при обновлении версии.
+ """
+ self.logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}')
+ try:
+ self.connect()
+ today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
+ self.cursor.execute(
+ "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
+ (new_version, script_name, today),
+ )
+ self.conn.commit()
+ self.logger.info(f"Версия обновлена: {new_version}, название скрипта: {script_name}")
+ except sqlite3.IntegrityError as e:
+ self.logger.error(f"Ошибка при обновлении версии: {e}")
+ raise
+ except Exception as e:
+ self.logger.error(f"Ошибка при обновлении версии: {e}")
+ raise
+ finally:
+ self.close()
+
+ def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool,
+ language_code: str, emoji: str, date_added: str, date_changed: str):
+ """
+ Добавляет нового пользователя в базу данных.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+ first_name (str): Имя пользователя.
+ full_name (str): Полное имя пользователя.
+ username (str): Username пользователя в Telegram.
+ is_bot (bool): Флаг, указывающий, является ли пользователь ботом.
+ language_code (str): Код языка пользователя.
+ emoji (str): Эмодзи закрепленная за пользователем
+ date_added (str): Дата добавления пользователя в базу.
+ date_changed (str): Дата последнего изменения данных пользователя.
+
+ Returns:
+ None: Если запись успешно добавлена в базу.
+ Exception: Если произошла ошибка при добавлении записи.
+ """
+ self.logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}")
+ try:
+ self.connect()
+ try:
+ # Новая схема с колонкой emoji
+ self.cursor.execute(
+ "INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
+ "'language_code', 'emoji', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ (user_id, first_name, full_name, username, is_bot, language_code, emoji, date_added, date_changed)
+ )
+ except sqlite3.OperationalError as e:
+ # Обратная совместимость: старая схема без колонки emoji
+ if 'no column named emoji' in str(e):
+ self.cursor.execute(
+ "INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
+ "'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
+ (user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed)
+ )
+ else:
+ raise
+ self.conn.commit()
+ self.logger.info(
+ f"Новый пользователь добавлен в базу: user_id={user_id}, first_name={first_name}, emoji={emoji}")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при добавлении пользователя в базу: {error}. "
+ f"Данные пользователя: user_id={user_id}, first_name={first_name}")
+ raise
+ finally:
+ self.close()
+
+ def user_exists(self, user_id: int):
+ """
+ Проверяет, существует ли пользователь в базе данных.
+
+ Args:
+ user_id (int): Идентификатор пользователя.
+
+ Returns:
+ bool: True, если пользователь найден, False - иначе.
+ """
+ self.logger.info(f"Попытка проверки существования пользователя: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchall()
+ self.logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}")
+ return bool(len(result))
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при проверке существования пользователя: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_user_id(self, user_id: int):
+ """
+ @deprecated
+ Возвращает ID пользователя в базе данных по его user_id.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ int: ID пользователя в базе данных.
+ None: Если пользователь не найден.
+ """
+ self.logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ if result:
+ user_id_db = result[0]
+ self.logger.info(f"ID пользователя в базе найден: user_id={user_id}, id_db={user_id_db}")
+ return user_id_db
+ else:
+ self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_username(self, user_id: int):
+ """
+ Возвращает username пользователя из базы данных по его user_id в Telegram.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ str: Username пользователя.
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ try:
+ self.connect()
+ self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ if result:
+ username = result[0]
+ self.logger.info(f"Username пользователя найден: user_id={user_id}, username={username}")
+ return username
+ else:
+ self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении username из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_user_id_by_username(self, username: str):
+ """
+ Возвращает user_id пользователя из базы данных по его user_name в Telegram.
+
+ Args:
+ username (str): Username пользователя.
+
+ Returns:
+ user_id (int): Идентификатор пользователя в Telegram.
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ try:
+ self.connect()
+ self.cursor.execute("SELECT user_id FROM our_users WHERE username = ?", (username,))
+ result = self.cursor.fetchone()
+ if result:
+ user_id = result[0]
+ self.logger.info(f"User_id пользователя найден: username={username}, user_id={user_id}")
+ return user_id
+ else:
+ self.logger.info(f"Пользователь с username={username} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении username из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_full_name_by_id(self, user_id: str):
+ """
+ Возвращает full_name пользователя из базы данных по его username в Telegram.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ full_name (str): Username пользователя.
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ try:
+ self.connect()
+ self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ if result:
+ full_name = result[0]
+ self.logger.info(f"Username пользователя найден: user_id={user_id}, full_name={full_name}")
+ return full_name
+ else:
+ self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении username из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_user_info_by_id(self, user_id: int):
+ """
+ Возвращает информацию о пользователе из базы данных по его user_id.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ dict: Словарь с информацией о пользователе (username, full_name).
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ try:
+ self.connect()
+ self.cursor.execute("SELECT username, full_name FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ if result:
+ user_info = {
+ 'username': result[0],
+ 'full_name': result[1]
+ }
+ self.logger.info(f"Информация о пользователе найдена: user_id={user_id}, username={user_info['username']}, full_name={user_info['full_name']}")
+ return user_info
+ else:
+ self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении информации о пользователе из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_all_user_id(self):
+ """
+ Возвращает список всех user_id из базы данных.
+
+ Returns:
+ list: Список user_id.
+ []: Если в базе данных нет пользователей.
+
+ Raises:
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Попытка получения всех user_id")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT user_id FROM our_users")
+ fetch_all = self.cursor.fetchall()
+ list_of_users = [user_id[0] for user_id in fetch_all]
+ self.logger.info(f"Получен список всех user_id: {list_of_users}")
+ return list_of_users
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении списка user_id из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_user_first_name(self, user_id: int):
+ """
+ Возвращает имя пользователя из базы данных по его user_id в Telegram.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ str: Имя пользователя.
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Попытка получения имени пользователя по user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ if result:
+ first_name = result[0]
+ self.logger.info(f"Имя пользователя найдено: user_id={user_id}, first_name={first_name}")
+ return first_name
+ else:
+ self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_info_about_stickers(self, user_id: int):
+ """
+ Проверяет, получил ли пользователь стикеры.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ bool: True, если пользователь получил стикеры, False - иначе.
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ if result:
+ has_stickers = result[0] == 1
+ self.logger.info(
+ f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}")
+ return has_stickers
+ else:
+ self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении информации о получении стикеров: {error}")
+ raise
+ finally:
+ self.close()
+
+ def update_info_about_stickers(self, user_id):
+ """
+ Обновляет информацию о получении стикеров пользователем.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ None: Если обновление успешно выполнено.
+
+ Raises:
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,))
+ self.conn.commit()
+ self.logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_users_blacklist(self):
+ """
+ Возвращает список пользователей в черном списке.
+
+ Returns:
+ dict: Словарь, где ключ - user_id, значение - username.
+ {}: Если в черном списке нет пользователей.
+
+ Raises:
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции get_users_blacklist")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT user_id, user_name FROM blacklist")
+ fetch_all = self.cursor.fetchall()
+ list_of_users = {user_id: username for user_id, username in fetch_all}
+ self.logger.info(f"Получен список пользователей в черном списке")
+ return list_of_users
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_users_for_unblock_today(self, date_to_unban: str):
+ """
+ Возвращает список пользователей, у которых истекает срок блокировки сегодня.
+
+ Args:
+ date_to_unban (str): Дата разблокировки.
+
+ Returns:
+ dict: Словарь, где ключ - user_id, значение - username.
+ {}: Если сегодня нет пользователей, у которых истекает срок блокировки.
+
+ Raises:
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT user_id, user_name "
+ "FROM blacklist WHERE date_to_unban = ?", (date_to_unban,))
+ fetch_all = result.fetchall()
+ list_of_users = {user_id: username for user_id, username in fetch_all}
+ self.logger.info(f"Получен список пользователей для разблокировки сегодня: {list_of_users}")
+ return list_of_users
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_blacklist_users_by_id(self, user_id: int):
+ """
+ Возвращает информацию о пользователе в черном списке по user_id.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ tuple: Кортеж (user_id, user_name, message_for_user, date_to_unban).
+ None: Если пользователь не найден в черном списке.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban "
+ "FROM blacklist WHERE user_id = ?", (user_id,))
+ return self.cursor.fetchone()
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}")
+ raise
+ finally:
+ self.close()
+
+ def check_user_in_blacklist(self, user_id: int):
+ """
+ Проверяет, существует ли запись с данным user_id в blacklist.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ bool: True, если пользователь найден в черном списке, False - иначе.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ self.logger.info(f"Существует ли пользователь: user_id={user_id} Итог: {result}")
+ return bool(result)
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}")
+ raise
+ finally:
+ self.close()
+
+ def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None):
+ """
+ Добавляет пользователя в черный список.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+ user_name (str, optional): Username пользователя. Defaults to None.
+ message_for_user (str, optional): Сообщение для пользователя. Defaults to None.
+ date_to_unban (datetime, optional): Дата разблокировки. Defaults to None.
+
+ Returns:
+ None: Если добавление в черный список успешно выполнено.
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name},"
+ f" message_for_user={message_for_user}, date_to_unban={date_to_unban}")
+ try:
+ self.connect()
+ result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
+ " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
+ (user_id, user_name, message_for_user, date_to_unban,))
+ self.conn.commit()
+ self.logger.info(f"Пользователь добавлен в черный список: user_id={user_id}")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при добавлении пользователя в черный список: {error}")
+ return error
+ finally:
+ self.close()
+
+ def delete_user_blacklist(self, user_id: int):
+ """
+ Удаляет пользователя из черного списка.
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ bool: True, если удаление прошло успешно, False - в случае ошибки.
+
+ Raises:
+ None: Ошибки обрабатываются в блоке except, возвращая False.
+ """
+ self.logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
+ self.conn.commit()
+ self.logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.")
+ return True
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} "
+ f"из таблицы blacklist. Ошибка: {str(error)}")
+ return False
+ finally:
+ self.close()
+
+ def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str):
+ """
+ Добавляет новое сообщение пользователя в базу данных.
+
+ Args:
+ message_text (str): Текст сообщения.
+ user_id (int): Идентификатор пользователя в Telegram.
+ message_id (int): Идентификатор сообщения в Telegram.
+ date (str): Дата отправки сообщения.
+
+ Returns:
+ None: Если добавление прошло успешно.
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(
+ f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}")
+ try:
+ self.connect()
+ self.cursor.execute(
+ "INSERT INTO user_messages (message_text, user_id, message_id, date) "
+ "VALUES (?, ?, ?, ?)",
+ (message_text, user_id, message_id, date))
+ self.conn.commit()
+ self.logger.info(f"Новое сообщение добавлено в базу данных: message_id={message_id}")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_username_and_full_name(self, user_id: int):
+ """
+ Получает full_name и username пользователя по ID из базы
+
+ Args:
+ date (str): Новая дата изменения.
+ user_id (int): Идентификатор пользователя в Telegram.
+
+ Returns:
+ username (str): username пользователя
+ full_name (str): full_name пользователя
+ """
+ self.logger.info(
+ f"Запуск функции check_username_and_first_name: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
+ username = self.cursor.fetchone()[0]
+ self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
+ full_name = self.cursor.fetchone()[0]
+ self.logger.info(
+ f"Функция check_username_and_first_name успешно отработала: user_id={user_id}, username={username}, full_name={full_name}")
+ return username, full_name
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка в функции get_username_and_first_name: {error}")
+ return None
+ finally:
+ self.close()
+
+ def update_username_and_full_name(self, user_id: int, username: str, full_name: str):
+ """
+ Обновляет full_name и username пользователя
+
+ Args:
+ username (str): username пользователя
+ full_name (str): full_name пользователя
+ user_id (int): Идентификатор пользователя в Telegram
+
+ Returns:
+ True (bool): Если обновления прошли успешно
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(
+ f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}")
+ try:
+ self.connect()
+ self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
+ (username, full_name, user_id,))
+ self.conn.commit()
+ self.logger.info(
+ f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены")
+ return True
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}")
+ raise
+ finally:
+ self.close()
+
+ def update_date_for_user(self, date: str, user_id: int):
+ """
+ #TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users
+ Обновляет дату последнего изменения данных пользователя в базе
+
+ Args:
+ date (str): Новая дата изменения
+ user_id (int): Идентификатор пользователя в Telegram
+
+ Returns:
+ None: Если обновление прошло успешно
+ sqlite3. Error: Если произошла ошибка при выполнении запроса
+ """
+ self.logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}")
+ try:
+ self.connect()
+ self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?",
+ (date, user_id,))
+ self.conn.commit()
+ self.logger.info(f"Дата изменения обновлена для пользователя: user_id={user_id}")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка обновления даты изменения для пользователя: {error}")
+ return error
+ finally:
+ self.close()
+
+ def check_emoji(self, emoji: str):
+ """
+ Проверяет, есть ли уже такой emoji в таблице.
+
+ Args:
+ emoji: emoji для проверки.
+
+ Returns:
+ True, если эмодзи уже есть, иначе False.
+
+ Raises:
+ None: В случае ошибки возвращается None
+ """
+ self.logger.info(f"Запуск функции check_emoji: emoji={emoji}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT 1 FROM our_users WHERE emoji = ?", (emoji,))
+ result = self.cursor.fetchone()
+ return bool(result)
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка проверки эмодзи в базе: {error}")
+ return None
+ finally:
+ self.close()
+
+ def update_emoji_for_user(self, user_id: int, emoji: str):
+ """
+ Обновляет эмодзи для пользователя в базе если его ранее не было установлено
+
+ Args:
+ user_id (int): Идентификатор пользователя в Telegram
+ emoji (str): Эмодзи пользователя
+
+ Returns:
+ None: Если обновление прошло успешно
+ sqlite3. Error: Если произошла ошибка при выполнении запроса
+ """
+ self.logger.info(f"Запуск функции update_emoji_for_user: user_id={user_id}, emoji={emoji}")
+ try:
+ self.connect()
+ self.cursor.execute("UPDATE our_users SET emoji = ? WHERE user_id = ?",
+ (emoji, user_id,))
+ self.conn.commit()
+ self.logger.info(f"Эмоджи обновлен для пользователя: user_id={user_id}")
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка обновления эмодзи для пользователя: {error}")
+ return error
+ finally:
+ self.close()
+
+ def check_emoji_for_user(self, user_id: int):
+ """
+ Проверяет, есть ли уже у пользователя назначенный emoji.
+
+ Args:
+ user_id: user_id пользователя.
+
+ Returns:
+ True, если эмодзи такого нет, иначе False.
+
+ Raises:
+ error: В случае ошибки возвращается error
+ """
+ self.logger.info(f"Запуск функции check_emoji_for_user: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT emoji FROM our_users WHERE user_id = ?", (user_id,))
+ pre_result = self.cursor.fetchone()
+ # Возвращаем "Смайл не определен", если pre_result или pre_result[0] is None
+ result = pre_result[0] if pre_result else None
+ return str(result) if result is not None else "Смайл еще не определен"
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка проверки эмодзи в базе: {error}")
+ return error
+ finally:
+ self.close()
+
+ def refresh_listen_audio(self, user_id: int):
+ """
+ Очищает всю информацию о прослушанных аудио пользователем
+
+ Args:
+ user_id: user_id пользователя.
+
+ Returns:
+ None - если все очищено успешно
+
+ Raises:
+ error: В случае ошибки возвращается error
+ """
+ self.logger.info(f"Запуск функции check_emoji_for_user: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("DELETE FROM listen_audio_users WHERE user_id = ?", (user_id,))
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка проверки эмодзи в базе: {error}")
+ return error
+ finally:
+ self.close()
+
+ def is_admin(self, user_id: int):
+ """
+ Проверяет, является ли пользователь администратором.
+
+ Args:
+ user_id: ID пользователя Telegram.
+
+ Returns:
+ True, если пользователь администратор, иначе False.
+
+ Raises:
+ None: В случае ошибки возвращается None
+ """
+ self.logger.info(f"Запуск функции is_admin: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
+ result = self.cursor.fetchone()
+ return bool(result)
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка проверки прав пользователя админа: {error}")
+ return None
+ finally:
+ self.close()
+
+ def add_admin(self, user_id: int, role: str):
+ """
+ Добавляет пользователя в список администраторов.
+
+ Args:
+ user_id (int): ID пользователя Telegram.
+ role (str): Роль пользователя. Доступные варианты:
+ 1. creator - создатель
+ 2. admin - обычная роль
+
+ Returns:
+ None: Если добавление прошло успешно.
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}")
+ try:
+ self.connect()
+ self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
+ self.conn.commit()
+ self.logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка добавления пользователя в список администраторов: {error}")
+ return error
+ finally:
+ self.close()
+
+ def remove_admin(self, user_id: int):
+ """
+ Удаляет пользователя из списка администраторов.
+
+ Args:
+ user_id (int): ID пользователя Telegram.
+
+ Returns:
+ None: Если удаление прошло успешно.
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции remove_admin: user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
+ self.conn.commit()
+ self.logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}")
+ return error
+ finally:
+ self.connect()
+
+ def get_user_by_message_id(self, message_id: int):
+ """
+ #TODO: Возвращается TypeError вместо None
+ Возвращает идентификатор пользователя по идентификатору сообщения.
+
+ Args:
+ message_id (int): Идентификатор сообщения в Telegram.
+
+ Returns:
+ int: Идентификатор пользователя.
+ None: Если пользователь не найден.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,))
+ user = result.fetchone()[0]
+ self.logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}")
+ return user
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения user_id по message_id: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_last_users_from_db(self):
+ """
+ Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот.
+
+ Returns:
+ list: Список кортежей (full_name, user_id) последних 30 пользователей.
+ []: Если в базе данных нет пользователей.
+
+ Raises:
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info("Запуск функции get_last_users_from_db")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30")
+ users = result.fetchall()
+ self.logger.info(f"Получен список последних 30 пользователей: {users}")
+ return users
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения списка последних пользователей: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_banned_users_from_db(self):
+ """
+ Возвращает список идентификаторов пользователей в черном списке бота.
+
+ Returns:
+ list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
+ []: Если в черном списке нет пользователей.
+
+ Raises:
+ sqlite3.Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info("Запуск функции get_banned_users_from_db")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist")
+ users = result.fetchall()
+ self.logger.info(f"Получен список пользователей в черном списке: {users}")
+ return users
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
+ """
+ Возвращает список идентификаторов пользователей в черном списке бота с учетом смещения и ограничения.
+
+ Args:
+ offset (int): Смещение для выборки
+ limit (int): Ограничение количества возвращаемых записей.
+
+ Returns:
+ list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
+ []: Если в черном списке нет пользователей или количество записей с учетом смещения и ограничения равно 0.
+
+ Raises:
+ sqlite3. Error: Если произошла ошибка при выполнении запроса.
+ """
+ self.logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban "
+ "FROM blacklist LIMIT ?, ?", (offset, limit,))
+ users = result.fetchall()
+ self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {users}")
+ return users
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_post_content_from_telegram_by_last_id(self, last_post_id: int):
+ self.logger.info(
+ f"Запуск функции get_post_content_from_telegram_by_last_id, идентификатор поста {last_post_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("""
+ SELECT cpft.content_name, cpft.content_type
+ FROM post_from_telegram_suggest pft
+ JOIN message_link_to_content mltc
+ ON pft.message_id = mltc.post_id
+ JOIN content_post_from_telegram cpft
+ ON cpft.message_id = mltc.message_id
+ WHERE pft.helper_text_message_id = ?
+ """, (last_post_id,))
+ post_content = result.fetchall()
+ self.logger.info(
+ f"Функция get_post_content_from_telegram_by_last_id получила список контента: {post_content}")
+ return post_content
+ finally:
+ self.close()
+
+ def get_post_ids_from_telegram_by_last_id(self, last_post_id: int):
+ self.logger.info(
+ f"Запуск функции get_post_ids_from_telegram_by_last_id, идентификатор поста {last_post_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("""
+ SELECT mltc.message_id
+ FROM post_from_telegram_suggest pft
+ JOIN message_link_to_content mltc
+ ON pft.message_id = mltc.post_id
+ WHERE pft.helper_text_message_id = ?
+ """, (last_post_id,))
+ post_ids = result.fetchall()
+ self.logger.info(f"Функция get_post_ids_from_telegram_by_last_id "
+ f"получила идентификаторы сообщений: {post_ids}")
+ return post_ids
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_post_ids_from_telegram_by_last_id {str(e)}")
+ return False
+ finally:
+ self.close()
+
+ def get_post_text_from_telegram_by_last_id(self, last_post_id: int):
+ self.logger.info(f"Запуск функции get_post_text_from_telegram_by_last_id, идентификатор поста {last_post_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT text "
+ "FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
+ (last_post_id,))
+ text = result.fetchone()[0]
+ self.logger.info(f"Функция get_post_text_from_telegram_by_last_id получила text")
+ return text
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_post_text_from_telegram_by_last_id {str(e)}")
+
+ def get_author_id_by_message_id(self, message_id: int):
+ self.logger.info(f"Запуск функции get_author_id_by_message_id, идентификатор поста {message_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT author_id "
+ "FROM post_from_telegram_suggest WHERE message_id = ?",
+ (message_id,))
+ author_id = result.fetchone()[0]
+ self.logger.info(f"Функция get_author_id_by_message_id получила author_id {author_id}")
+ return author_id
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_author_id_by_message_id {str(e)}")
+
+ def get_author_id_by_helper_message_id(self, helper_text_message_id: int):
+ self.logger.info(f"Запуск функции get_author_id_by_helper_message_id, идентификатор поста "
+ f"{helper_text_message_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT author_id "
+ "FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
+ (helper_text_message_id,))
+ author_id = result.fetchone()[0]
+ self.logger.info(f"Функция get_author_id_by_helper_message_id получила author_id {author_id}")
+ return author_id
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_author_id_by_helper_message_id {str(e)}")
+
+ def get_user_id_by_message_id_for_voice_bot(self, message_id: int):
+ self.logger.info(f"Запуск функции get_user_id_by_message_id_for_voice_bot, идентификатор поста "
+ f"{message_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT user_id "
+ "FROM audio_moderate WHERE message_id = ?",
+ (message_id,))
+ user_id = result.fetchone()[0]
+ self.logger.info(f"Функция get_user_id_by_message_id_for_voice_bot получила author_id {user_id}")
+ return user_id
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_user_id_by_message_id_for_voice_bot {str(e)}")
+
+ def set_user_id_and_message_id_for_voice_bot(self, message_id: int, user_id: int):
+ self.logger.info(f"Запуск функции set_user_id_and_message_id_for_voice_bot, идентификатор поста "
+ f"{message_id}, user_id {user_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute(
+ "INSERT INTO audio_moderate (message_id, user_id)"
+ "VALUES (?, ?)", (message_id, user_id))
+ self.conn.commit()
+ self.logger.info(f"Функция set_user_id_and_message_id_for_voice_bot отработала успешно")
+ return True
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции set_user_id_and_message_id_for_voice_bot {str(e)}")
+
+ def get_user_id_by_file_name(self, file_name: str):
+ self.logger.info(f"Запуск функции get_user_id_by_file_name, идентификатор файла "
+ f"{file_name}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT author_id "
+ "FROM audio_message_reference WHERE file_name = ?",
+ (file_name,))
+ user_id = result.fetchone()[0]
+ self.logger.info(f"Функция get_user_id_by_file_name получила user_id {user_id}")
+ return user_id
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_user_id_by_file_name {str(e)}")
+
+ def get_date_by_file_name(self, file_name: str):
+ self.logger.info(f"Запуск функции get_date_by_file_name, идентификатор файла "
+ f"{file_name}")
+ try:
+ self.connect()
+ result = self.cursor.execute("SELECT date_added "
+ "FROM audio_message_reference WHERE file_name = ?",
+ (file_name,))
+ date_added = result.fetchone()[0]
+ self.logger.info(f"Функция get_date_by_file_name получила date_added {date_added}")
+ return date_added
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции get_date_by_file_name {str(e)}")
+
+ def add_post_content_in_db(self, post_id: int, message_id: int, content_name: str, type_content: str):
+ self.logger.info(
+ f"Запуск функции add_post_content_in_db: post_id={post_id}, message_id={message_id}, "
+ f"content_name={content_name}, content_type={type_content}")
+ try:
+ self.connect()
+ self.cursor.execute(
+ "INSERT INTO message_link_to_content (post_id, message_id)"
+ "VALUES (?, ?)", (post_id, message_id))
+ self.conn.commit()
+ self.cursor.execute(
+ "INSERT INTO content_post_from_telegram (message_id, content_name, content_type)"
+ "VALUES (?, ?, ?)", (message_id, content_name, type_content))
+ self.conn.commit()
+ self.logger.info(f"Функция add_post_content_in_db отработала успешно")
+ return True
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции add_post_content_in_db при добавлении поста в базу данных: {e}")
+ return False
+
+ def add_post_in_db(self, message_id: int, text: str, author_id: int):
+ self.logger.info(
+ f"Запуск функции add_post_in_db: message_id={message_id}, "
+ f"author_id={author_id}")
+ try:
+ today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
+ self.connect()
+ self.cursor.execute(
+ "INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)"
+ "VALUES (?, ?, ?, ?)", (message_id, text, author_id, today))
+ self.conn.commit()
+ self.logger.info(f"Функция add_post_in_db отработала успешно")
+ return True
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции add_post_in_db при добавлении поста в базу данных: {e}")
+ return False
+
+ def update_helper_message_in_db(self, message_id: int, helper_message_id: int):
+ self.logger.info(
+ f"Запуск функции update_helper_message_in_db: message_id={message_id}, "
+ f"helper_message_id={helper_message_id}")
+ try:
+ self.connect()
+ self.cursor.execute(
+ "UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
+ (helper_message_id, message_id,))
+ self.conn.commit()
+ self.logger.info(f"Функция update_helper_message_in_db отработала успешно")
+ return True
+ except Exception as e:
+ self.logger.error(f"Ошибка в функции update_helper_message_in_db при добавлении поста в базу данных: {e}")
+ return False
+
+ def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
+ """Добавляет информацию о войсе юзера в БД"""
+ self.logger.info(
+ f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id},"
+ f" date_added = {date_added}")
+ try:
+ self.connect()
+ result = self.cursor.execute(
+ "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) "
+ "VALUES (?, ?, ?, ?, ?)",
+ (file_name, author_id, date_added, listen_count, file_id))
+ self.conn.commit()
+ self.logger.info(
+ f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, "
+ f"date_added = {date_added}")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка при добавлении войса в базу: {error}")
+ raise
+ finally:
+ self.close()
+
+ def last_date_audio(self):
+ """Получаем дату последнего войса"""
+ self.logger.info(
+ f"Запуск функции last_date_audio")
+ try:
+ self.connect()
+ result = self.cursor.execute(
+ "SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
+ last_date = result.fetchone()[0]
+ self.logger.info(f"Последняя дата сообщения {last_date}")
+ return last_date
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения последней даты войса: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_last_user_audio_record(self, user_id):
+ """Получает данные о количестве записей пользователя"""
+ self.logger.info(
+ f"Запуск функции get_last_user_audio_record. user_id={user_id}")
+ try:
+ self.connect()
+ r = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
+ (user_id,))
+ result = bool(len(r.fetchall()))
+ self.logger.info(
+ f"Результат функции get_last_user_audio_record: {result}")
+ return result
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения последней даты войса: {error}")
+ raise
+ finally:
+ self.close()
+
+ def delete_listen_count_for_user(self, user_id):
+ """Удаляет данные о прослушанных пользователем аудио"""
+ self.logger.info(
+ f"Запуск функции delete_listen_count_for_user. user_id={user_id}")
+ try:
+ self.connect()
+ self.cursor.execute("DELETE FROM `listen_audio_users` WHERE `user_id` = ?",
+ (user_id,))
+ self.conn.commit()
+ self.logger.info(
+ f"Функция delete_listen_count_for_user успешно отработала")
+ return None
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка удаления записей прослушивания по пользователю: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_id_for_audio_record(self, user_id):
+ """Получает ID аудио сообщения пользователя"""
+ self.logger.info(
+ f"Запуск функции get_id_for_audio_record. user_id={user_id}")
+ try:
+ self.connect()
+ r = self.cursor.execute(
+ "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? "
+ "ORDER BY date_added DESC LIMIT 1",
+ (user_id,))
+ result = r.fetchone()[0]
+ self.logger.info(
+ f"Результат функции get_id_for_audio_record: {result}")
+ return result
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения последней даты войса: {error}")
+ raise
+ finally:
+ self.close()
+
+ def get_path_for_audio_record(self, user_id):
+ """Получает данные о названии файла"""
+ self.logger.info(
+ f"Запуск функции get_path_for_audio_record. user_id={user_id}")
+ try:
+ self.connect()
+ r = self.cursor.execute(
+ "SELECT `file_name` "
+ "FROM `audio_message_reference` "
+ "WHERE `author_id` = ? "
+ "ORDER BY date_added "
+ "DESC LIMIT 1",
+ (user_id,))
+ result = r.fetchone()[0]
+ self.logger.info(
+ f"Результат функции get_path_for_audio_record: {result}")
+ return result
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения последней даты войса: {error}")
+ raise
+ finally:
+ self.close()
+
+ def check_listen_audio(self, user_id):
+ """Проверяет прослушано ли аудио пользователем"""
+ self.logger.info(
+ f"Запуск функции check_listen_audio. user_id={user_id}")
+ try:
+ self.connect()
+ query_listen_audio = self.cursor.execute(
+ """SELECT l.file_name
+ FROM audio_message_reference a
+ LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
+ WHERE l.user_id = ?
+ AND l.file_name IS NOT NULL""", (user_id,))
+ check_sign = query_listen_audio.fetchall()
+ query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
+ (user_id,))
+ sign_all_audio = query_all_audio.fetchall()
+ new_sign1 = list(set(sign_all_audio) - set(check_sign))
+ new_sign = []
+ for i in new_sign1:
+ new_sign.append(i[0])
+ self.logger.info(
+ f"Функция check_listen_audio успешно отработала.")
+ return new_sign
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения последней даты войса: {error}")
+ raise
+ finally:
+ self.close()
+
+ def mark_listened_audio(self, file_name, user_id):
+ """Отмечает аудио прослушанным для конкретного пользователя."""
+ self.logger.info(
+ f"Запуск функции mark_listened_audio. file_name={file_name}, user_id={user_id}")
+ try:
+ self.connect()
+ result = self.cursor.execute(
+ "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
+ (file_name, user_id, 1))
+ return self.conn.commit()
+ except sqlite3.Error as error:
+ self.logger.error(f"Ошибка получения последней даты войса: {error}")
+ raise
+ finally:
+ self.close()
+
+ def close(self):
+ """Закрытие соединения и курсора."""
+ if self.cursor:
+ self.cursor.close()
+ if self.conn:
+ self.conn.close()
+
+ async def check_user_in_blacklist_async(self, user_id: int):
+ """
+ Асинхронная версия проверки пользователя в черном списке.
+ """
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(self.executor, self.check_user_in_blacklist, user_id)
+
+ async def get_blacklist_users_by_id_async(self, user_id: int):
+ """
+ Асинхронная версия получения информации о пользователе из черного списка.
+ """
+ loop = asyncio.get_event_loop()
+ return await loop.run_in_executor(self.executor, self.get_blacklist_users_by_id, user_id)
diff --git a/helper_bot/handlers/private/private_handlers.py b/helper_bot/handlers/private/private_handlers.py
index 83c131b..e0ca598 100644
--- a/helper_bot/handlers/private/private_handlers.py
+++ b/helper_bot/handlers/private/private_handlers.py
@@ -18,8 +18,9 @@ from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
from helper_bot.utils import messages
from helper_bot.utils.base_dependency_factory import get_global_instance
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
- send_media_group_message_to_private_chat, prepare_media_group_from_middlewares, check_username_and_full_name, \
- send_video_message, send_video_note_message, send_audio_message, send_voice_message, add_in_db_media
+ send_media_group_message_to_private_chat, prepare_media_group_from_middlewares, send_video_message, \
+ send_video_note_message, send_audio_message, send_voice_message, add_in_db_media, \
+ check_user_emoji, check_username_and_full_name
from logs.custom_logger import logger
private_router = Router()
@@ -42,6 +43,35 @@ BotDB = bdf.get_db()
# Expose sleep for tests (tests patch helper_bot.handlers.private.private_handlers.sleep)
sleep = asyncio.sleep
+@private_router.message(
+ ChatTypeFilter(chat_type=["private"]),
+ Command("emoji")
+)
+async def handle_emoji_message(message: types.Message, state: FSMContext):
+ await message.forward(chat_id=GROUP_FOR_LOGS)
+ user_emoji = check_user_emoji(message)
+ await state.set_state("START")
+ if user_emoji is not None:
+ await message.answer(f'Твоя эмодзя - {user_emoji}', parse_mode='HTML')
+
+
+@private_router.message(
+ ChatTypeFilter(chat_type=["private"]),
+ Command("restart")
+)
+async def handle_restart_message(message: types.Message, state: FSMContext):
+ try:
+ markup = get_reply_keyboard(BotDB, message.from_user.id)
+ await message.forward(chat_id=GROUP_FOR_LOGS)
+ await state.set_state("START")
+ await update_user_info('love', message)
+ check_user_emoji(message)
+ await message.answer('Я перезапущен!', reply_markup=markup, parse_mode='HTML')
+ except Exception as e:
+ logger.error(f"Произошла ошибка handle_restart_message. Ошибка:{str(e)}")
+ await message.bot.send_message(chat_id=IMPORTANT_LOGS,
+ text=f"Произошла ошибка handle_restart_message: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
+
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
@@ -74,7 +104,8 @@ async def handle_start_message(message: types.Message, state: FSMContext):
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not BotDB.user_exists(user_id):
- BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
+ # Для первоначального добавления эмодзи пока не назначаем (совместимость)
+ BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, "", date,
date)
else:
is_need_update = check_username_and_full_name(user_id, username, full_name, BotDB)
diff --git a/helper_bot/utils/helper_func.py b/helper_bot/utils/helper_func.py
index 3aabc20..611f0bb 100644
--- a/helper_bot/utils/helper_func.py
+++ b/helper_bot/utils/helper_func.py
@@ -1,13 +1,32 @@
import html
import os
+import random
from datetime import datetime, timedelta
+from time import sleep
+try:
+ import emoji as _emoji_lib
+except Exception:
+ _emoji_lib = None
from aiogram import types
from aiogram.types import InputMediaPhoto, FSInputFile, InputMediaVideo, InputMediaAudio
-from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
+from helper_bot.utils.base_dependency_factory import BaseDependencyFactory, get_global_instance
from logs.custom_logger import logger
+bdf = get_global_instance()
+BotDB = bdf.get_db()
+GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
+
+if _emoji_lib is not None:
+ emoji_list = list(_emoji_lib.EMOJI_DATA.keys())
+else:
+ # Fallback minimal emoji set for environments without the 'emoji' package (e.g., CI tests)
+ emoji_list = [
+ "🙂", "😀", "😉", "😎", "🤖", "🦄", "🐱", "🐶", "🍀", "🔥",
+ "🌟", "🎉", "💡", "🚀", "🌈"
+ ]
+
def safe_html_escape(text: str) -> str:
"""
@@ -458,3 +477,52 @@ def unban_notifier(self):
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)
+
+
+async def update_user_info(source: str, message: types.Message):
+ # Собираем данные
+ full_name = message.from_user.full_name
+ username = message.from_user.username
+ first_name = get_first_name(message)
+ is_bot = message.from_user.is_bot
+ language_code = message.from_user.language_code
+ user_id = message.from_user.id
+ current_date = datetime.now()
+ date = current_date.strftime("%Y-%m-%d %H:%M:%S")
+ # Выбираем эмодзю, пробегаемся циклом и смотрим что в базе такого еще не было
+ user_emoji = get_random_emoji()
+
+ if not BotDB.user_exists(user_id):
+ BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, user_emoji, date,
+ date)
+ else:
+ is_need_update = check_username_and_full_name(user_id, username, full_name, BotDB)
+ if is_need_update:
+ BotDB.update_username_and_full_name(user_id, username, full_name)
+ if source != 'voice':
+ await message.answer(
+ f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name}")
+ await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
+ text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}. Новый эмодзи:{user_emoji}')
+ sleep(1)
+ BotDB.update_date_for_user(date, user_id)
+
+
+def check_user_emoji(message: types.Message):
+ user_id = message.from_user.id
+ user_emoji = BotDB.check_emoji_for_user(user_id=user_id)
+ if user_emoji is None or user_emoji in ("Смайл еще не определен", "Эмоджи не определен", ""):
+ user_emoji = get_random_emoji()
+ BotDB.update_emoji_for_user(user_id=user_id, emoji=user_emoji)
+ return user_emoji
+
+
+def get_random_emoji():
+ attempts = 0
+ while attempts < 100:
+ user_emoji = random.choice(emoji_list)
+ if not BotDB.check_emoji(user_emoji):
+ return user_emoji
+ attempts += 1
+ logger.error("Не удалось найти уникальный эмодзи после нескольких попыток.")
+ return "Эмоджи не определен"
diff --git a/helper_bot/utils/messages.py b/helper_bot/utils/messages.py
index 6551423..5e8d4b5 100644
--- a/helper_bot/utils/messages.py
+++ b/helper_bot/utils/messages.py
@@ -26,7 +26,7 @@ def get_message(username: str, type_message: str):
"&Мы рассмотрим и ответим тебе в ближайшее время☺️❤️",
"DEL_MESSAGE": "username, напиши свое обращение или предложение✍"
"&Мы рассмотрим и ответим тебе в ближайшее время☺❤",
- "BYE_MESSAGE": "Если позднее захочешь предложить еще один пост или обратиться к админам с вопросом, то просто пришли в чат команду 👉 /start"
+ "BYE_MESSAGE": "Если позднее захочешь предложить еще один пост или обратиться к админам с вопросом, то просто пришли в чат команду 👉 /restart"
"&&И тебе пока!👋🏼❤️",
"USER_ERROR": "Увы, я не понимаю тебя😐💔 Выбери один из пунктов в нижнем меню, а затем пиши.",
"QUESTION": "Сообщение успешно отправлено❤️ Ответим, как только сможем😉",
diff --git a/requirements.txt b/requirements.txt
index 6da9b91..9dea5b4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,22 +1,23 @@
-APScheduler==3.10.4
-certifi~=2024.6.2
-charset-normalizer==3.3.2
-coverage==7.5.4
-exceptiongroup==1.2.1
-idna==3.7
-iniconfig==2.0.0
-loguru==0.7.2
-packaging==24.1
-pluggy==1.5.0
-pytest==8.2.2
-pytz==2024.1
-requests==2.32.3
-six==1.16.0
-tomli==2.0.1
-tzlocal==5.2
-urllib3~=2.2.1
-pip~=23.2.1
-attrs~=23.2.0
-typing_extensions~=4.12.2
-aiohttp~=3.9.5
-aiogram~=3.10.0
\ No newline at end of file
+APScheduler==3.10.4
+certifi~=2024.6.2
+charset-normalizer==3.3.2
+coverage==7.5.4
+exceptiongroup==1.2.1
+idna==3.7
+iniconfig==2.0.0
+loguru==0.7.2
+packaging==24.1
+pluggy==1.5.0
+pytest==8.2.2
+pytz==2024.1
+requests==2.32.3
+six==1.16.0
+tomli==2.0.1
+tzlocal==5.2
+urllib3~=2.2.1
+pip~=23.2.1
+attrs~=23.2.0
+typing_extensions~=4.12.2
+aiohttp~=3.9.5
+aiogram~=3.10.0
+emoji~=2.14.0
\ No newline at end of file
diff --git a/run_helper.py b/run_helper.py
index 16a0eaf..5ea56c4 100644
--- a/run_helper.py
+++ b/run_helper.py
@@ -1,4 +1,11 @@
import asyncio
+import os
+import sys
+
+# Ensure project root is on sys.path for module resolution
+CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
+if CURRENT_DIR not in sys.path:
+ sys.path.insert(0, CURRENT_DIR)
from helper_bot.main import start_bot
from helper_bot.utils.base_dependency_factory import get_global_instance
diff --git a/tests/test_db.py b/tests/test_db.py
index 8db993e..845d4ce 100644
--- a/tests/test_db.py
+++ b/tests/test_db.py
@@ -252,12 +252,13 @@ def test_add_new_user_in_db(bot):
username = "@petr_ivanov"
is_bot = False
language_code = "ru"
+ emoji = '🦀'
date_added = "2024-07-09"
date_changed = "2024-07-09"
# Вызываем функцию add_new_user_in_db
bot.add_new_user_in_db(
- user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed
+ user_id, first_name, full_name, username, is_bot, language_code, emoji, date_added, date_changed
)
# Проверяем наличие записи в базе данных
@@ -285,7 +286,7 @@ def test_add_new_user_in_db_duplicate_user_id(bot, setup_db):
# Попытка добавить пользователя с тем же user_id
with pytest.raises(sqlite3.IntegrityError):
bot.add_new_user_in_db(
- user_id, "Марина", "Марина Альфредовна", "marina", False, "bg", "2024-07-09", "2024-07-09"
+ user_id, "Марина", "Марина Альфредовна", "marina", False, "bg", "🦀", "2024-07-09", "2024-07-09"
)
@@ -297,12 +298,13 @@ def test_add_new_user_in_db_empty_first_name(bot):
username = "@boris"
is_bot = False
language_code = "fr"
+ emoji = "🦀"
date_added = "2024-07-09"
date_changed = "2024-07-09"
# Вызываем функцию add_new_user_in_db
bot.add_new_user_in_db(
- user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed
+ user_id, first_name, full_name, username, is_bot, language_code, emoji, date_added, date_changed
)
# Проверяем наличие записи в базе данных
diff --git a/voice_bot/voice_handler/__init__.py b/voice_bot/handlers/__init__.py
similarity index 100%
rename from voice_bot/voice_handler/__init__.py
rename to voice_bot/handlers/__init__.py
diff --git a/voice_bot/handlers/callback_handler.py b/voice_bot/handlers/callback_handler.py
new file mode 100644
index 0000000..a969a98
--- /dev/null
+++ b/voice_bot/handlers/callback_handler.py
@@ -0,0 +1,68 @@
+import time
+from datetime import datetime
+from pathlib import Path
+
+from aiogram import Router, F
+from aiogram.types import CallbackQuery
+
+from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
+
+callback_router = Router()
+
+bdf = BaseDependencyFactory()
+
+GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
+GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
+IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
+PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
+LOGS = bdf.settings['Settings']['logs']
+TEST = bdf.settings['Settings']['test']
+
+BotDB = bdf.get_db()
+
+
+@callback_router.callback_query(
+ F.data == "save"
+)
+async def save_voice_message(call: CallbackQuery):
+ file_name = ''
+ file_id = 1
+ user_id = BotDB.get_user_id_by_message_id_for_voice_bot(call.message.message_id)
+ # Проверяем что запись о файле есть в базе данных
+ is_having_audio_from_user = BotDB.get_last_user_audio_record(user_id=user_id)
+ if is_having_audio_from_user is False:
+ # Если нет, то генерируем имя файла
+ file_name = f'message_from_{user_id}_number_{file_id}'
+ else:
+ # Иначе берем последнюю запись из БД, добавляем к ней 1, и создаем новую запись
+ file_name = BotDB.get_path_for_audio_record(user_id=user_id)
+ file_id = BotDB.get_id_for_audio_record(user_id) + 1
+ path = Path(f'voice_users/{file_name}.ogg')
+ if path.exists():
+ file_name = f'message_from_{user_id}_number_{file_id}'
+ else:
+ pass
+ # Собираем инфо о сообщении
+ time_UTC = int(time.time())
+ date_added = datetime.fromtimestamp(time_UTC)
+
+ # Сохраняем в базку
+ BotDB.add_audio_record(file_name, user_id, date_added, 0, file_id)
+
+ file_info = await call.message.bot.get_file(file_id=call.message.voice.file_id)
+ downloaded_file = await call.message.bot.download_file(file_path=file_info.file_path)
+ with open(f'voice_users/{file_name}.ogg', 'wb') as new_file:
+ new_file.write(downloaded_file.read())
+
+ await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
+ await call.answer(text='Сохранено!', cache_time=3)
+
+
+@callback_router.callback_query(
+ F.data == "delete"
+)
+async def delete_voice_message(call: CallbackQuery):
+ # Получаем из базы автора + отправляем сообщение + удаляем сообщение из предложки
+
+ await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
+ await call.answer(text='Удалено!', cache_time=3)
diff --git a/voice_bot/voice_handler/voice_handler.py b/voice_bot/handlers/voice_handler.py
similarity index 64%
rename from voice_bot/voice_handler/voice_handler.py
rename to voice_bot/handlers/voice_handler.py
index 9835e31..99524af 100644
--- a/voice_bot/voice_handler/voice_handler.py
+++ b/voice_bot/handlers/voice_handler.py
@@ -9,9 +9,11 @@ from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile
from helper_bot.filters.main import ChatTypeFilter
+from helper_bot.middlewares.blacklist_middleware import BlacklistMiddleware
from helper_bot.utils.base_dependency_factory import get_global_instance
+from helper_bot.utils.helper_func import update_user_info, check_user_emoji, send_voice_message
from logs.custom_logger import logger
-from voice_bot.keyboards.keyboards import get_main_keyboard
+from voice_bot.keyboards.keyboards import get_main_keyboard, get_reply_keyboard_for_voice
from voice_bot.utils.helper_func import last_message
voice_router = Router()
@@ -19,12 +21,14 @@ voice_router = Router()
bdf = get_global_instance()
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
+GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
+voice_router.message.middleware(BlacklistMiddleware())
@voice_router.message(
@@ -33,21 +37,36 @@ BotDB = bdf.get_db()
)
async def restart_function(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS)
+ await update_user_info('voice', message)
+ check_user_emoji(message)
markup = get_main_keyboard()
await message.answer(text='Я перезапущен!',
reply_markup=markup)
await state.set_state('START')
+@voice_router.message(
+ ChatTypeFilter(chat_type=["private"]),
+ Command("emoji")
+)
+async def handle_emoji_message(message: types.Message, state: FSMContext):
+ await message.forward(chat_id=GROUP_FOR_LOGS)
+ user_emoji = check_user_emoji(message)
+ await state.set_state("START")
+ if user_emoji is not None:
+ await message.answer(f'Твоя эмодзя - {user_emoji}', parse_mode='HTML')
+
+
@voice_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("help")
)
async def help_function(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS)
+ await update_user_info('voice', message)
await message.answer(
text='Скорее всего ответы на твои вопросы есть здесь, ознакомься: https://telegra.ph/Instrukciya-k-botu-Golosa-Bijsk-10-11-2'
- '\nЕсли это не поможет, пиши в тг: @Kerrad1', disable_web_page_preview=not PREVIEW_LINK)
+ '\nЕсли это не поможет, пиши в личку: @Kerrad1', disable_web_page_preview=not PREVIEW_LINK)
await state.set_state('START')
@@ -58,6 +77,8 @@ async def help_function(message: types.Message, state: FSMContext):
async def start(message: types.Message, state: FSMContext):
await state.set_state("START")
await message.forward(chat_id=GROUP_FOR_LOGS)
+ await update_user_info('voice', message)
+ user_emoji = check_user_emoji(message)
try:
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = random.choice(name_stick_hello)
@@ -98,15 +119,36 @@ async def start(message: types.Message, state: FSMContext):
parse_mode='html', reply_markup=markup,
disable_web_page_preview=not PREVIEW_LINK)
await asyncio.sleep(0.8)
+ await message.answer(text=f"Любые войсы будут помечены эмоджи. Твой эмоджи - {user_emoji}"
+ f"Таким эмоджи будут помечены твои сообщения для других "
+ f"Но другие люди не узнают кто за каким эмоджи скрывается:)",
+ parse_mode='html', reply_markup=markup,
+ disable_web_page_preview=not PREVIEW_LINK)
+ await asyncio.sleep(0.8)
await message.answer(text="Так же можешь ознакомиться с инструкцией к боту по команде /help",
parse_mode='html', reply_markup=markup,
disable_web_page_preview=not PREVIEW_LINK)
await asyncio.sleep(0.8)
- await message.answer(text="ну всё, достаточно инструкций. записывайся! Микрофон твой - 🎤",
+ await message.answer(text="Ну всё, достаточно инструкций. записывайся! Микрофон твой - 🎤",
parse_mode='html', reply_markup=markup,
disable_web_page_preview=not PREVIEW_LINK)
+@voice_router.message(
+ ChatTypeFilter(chat_type=["private"]),
+ Command("refresh")
+)
+async def refresh_listen_function(message: types.Message, state: FSMContext):
+ await message.forward(chat_id=GROUP_FOR_LOGS)
+ await update_user_info('voice', message)
+ markup = get_main_keyboard()
+ BotDB.delete_listen_count_for_user(message.from_user.id)
+ await message.answer(
+ text='Прослушивания очищены. Можешь начать слушать заново🤗', disable_web_page_preview=not PREVIEW_LINK,
+ markup=markup)
+ await state.set_state('START')
+
+
@voice_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
@@ -116,53 +158,33 @@ async def standup_write(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS)
markup = types.ReplyKeyboardRemove()
await message.answer(text='Хорошо, теперь пришли мне свое голосовое сообщение', reply_markup=markup)
- message_with_date = last_message()
- await message.answer(text=message_with_date, parse_mode="html")
+ try:
+ message_with_date = last_message()
+ await message.answer(text=message_with_date, parse_mode="html")
+ except Exception as e:
+ logger.error(f'Не удалось получить дату последнего сообщения - {e}')
await state.set_state('STANDUP_WRITE')
@voice_router.message(
StateFilter("STANDUP_WRITE"),
- ChatTypeFilter(chat_type=["private"])
+ ChatTypeFilter(chat_type=["private"]),
)
-async def save_voice_message(message: types.Message, state: FSMContext):
+async def suggest_voice(message: types.Message, state: FSMContext):
+ logger.info(
+ f"Вызов функции suggest_voice. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
+ await message.forward(chat_id=GROUP_FOR_LOGS)
markup = get_main_keyboard()
if message.content_type == 'voice':
- await message.forward(chat_id=GROUP_FOR_LOGS)
- file_name = ''
- file_id = 1
- # Проверяем что запись о файле есть в базе данных
- is_having_audio_from_user = BotDB.get_last_user_audio_record(user_id=message.from_user.id)
- if is_having_audio_from_user is False:
- # Если нет, то генерируем имя файла
- file_name = f'message_from_{message.from_user.id}_number_{file_id}'
- else:
- # Иначе берем последнюю запись из БД, добавляем к ней 1, и создаем новую запись
- file_name = BotDB.get_path_for_audio_record(user_id=message.from_user.id)
- file_id = BotDB.get_id_for_audio_record(message.from_user.id) + 1
- path = Path(f'voice_users/{file_name}.ogg')
- if path.exists():
- file_name = f'message_from_{message.from_user.id}_number_{file_id}'
- else:
- pass
- # Собираем инфо о сообщении
- author_id = message.from_user.id
- time_UTC = int(datetime.now().timestamp())
- date_added = datetime.fromtimestamp(time_UTC)
+ markup_for_voice = get_reply_keyboard_for_voice()
+ # Отправляем аудио в приватный канал
+ sent_message = await send_voice_message(GROUP_FOR_POST, message,
+ message.voice.file_id, markup_for_voice)
- # Сохраняем в базку
- BotDB.add_audio_record(file_name, author_id, date_added, 0, file_id)
+ # Сохраняем в базу инфо о посте
+ BotDB.set_user_id_and_message_id_for_voice_bot(sent_message.message_id, message.from_user.id)
- # Сохраняем файл на сервер
- # file_info = message.bot.get_file(file_id=message.voice.file_id)
- # downloaded_file = message.bot.download_file(file_path=file_info.file_path)
- # with open(f'voice_users/{file_name}.ogg', 'wb') as new_file:
- # new_file.write(downloaded_file)
-
- file_info = await message.bot.get_file(file_id=message.voice.file_id)
- downloaded_file = await message.bot.download_file(file_path=file_info.file_path)
- with open(f'voice_users/{file_name}.ogg', 'wb') as new_file:
- new_file.write(downloaded_file.read())
+ # Отправляем юзеру ответ и возвращаем его в меню
await message.answer(text='Окей, сохранил!👌', reply_markup=markup)
await state.set_state('START')
else:
@@ -177,24 +199,38 @@ async def save_voice_message(message: types.Message, state: FSMContext):
ChatTypeFilter(chat_type=["private"]),
F.text == '🎧Послушать'
)
-async def standup_listen_audio(message: types.Message, state: FSMContext):
+async def standup_listen_audio(message: types.Message):
check_audio = BotDB.check_listen_audio(user_id=message.from_user.id)
list_audio = list(check_audio)
markup = get_main_keyboard()
- await message.forward(chat_id=GROUP_FOR_LOGS)
if not list_audio:
await message.answer(text='Прости, ты прослушал все аудио😔. Возвращайся позже, возможно наша база пополнится',
reply_markup=markup)
- message_with_date = last_message()
- message.send_message(chat_id=message.chat.id, text=message_with_date, parse_mode="html")
+ try:
+ message_with_date = last_message()
+ await message.answer(text=message_with_date, parse_mode="html")
+ except Exception as e:
+ logger.error(f'Не удалось получить последнюю дату {e}')
else:
+ # Получаем ссылку на аудио сообщение пользователя
number_element = random.randint(0, len(list_audio) - 1)
audio_for_user = check_audio[number_element]
+
+ # Получаем автора записи + эмодзи по нему
+ user_id = BotDB.get_user_id_by_file_name(audio_for_user)
+ date_added = BotDB.get_date_by_file_name(audio_for_user)
+ user_emoji = BotDB.check_emoji_for_user(user_id)
+
path = Path(f'voice_users/{audio_for_user}.ogg')
- # voice = open(path, 'rb')
voice = FSInputFile(path)
+
# Маркируем сообщение как прослушанное
BotDB.mark_listened_audio(audio_for_user, user_id=message.from_user.id)
- await message.bot.send_voice(message.chat.id, voice=voice, reply_markup=markup)
- await message.forward(chat_id=GROUP_FOR_LOGS)
- await state.set_state('START')
+
+ # Формируем подпись
+ if user_emoji:
+ caption = f'{user_emoji}\nДата записи: {date_added}'
+ else:
+ caption = f'Дата записи: {date_added}'
+ await message.bot.send_voice(chat_id=message.chat.id, voice=voice, caption=caption, reply_markup=markup)
+ await message.answer(text=f'Осталось непрослушанных: {len(check_audio) - 1}', reply_markup=markup)
diff --git a/voice_bot/keyboards/keyboards.py b/voice_bot/keyboards/keyboards.py
index 0726fea..592d3dc 100644
--- a/voice_bot/keyboards/keyboards.py
+++ b/voice_bot/keyboards/keyboards.py
@@ -1,5 +1,5 @@
from aiogram import types
-from aiogram.utils.keyboard import ReplyKeyboardBuilder
+from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder
def get_main_keyboard():
@@ -8,3 +8,15 @@ def get_main_keyboard():
builder.add(types.KeyboardButton(text="🎧Послушать"))
markup = builder.as_markup(resize_keyboard=True)
return markup
+
+
+def get_reply_keyboard_for_voice():
+ builder = InlineKeyboardBuilder()
+ builder.row(types.InlineKeyboardButton(
+ text="Сохранить", callback_data="save")
+ )
+ builder.row(types.InlineKeyboardButton(
+ text="Удалить", callback_data="delete")
+ )
+ markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
+ return markup
diff --git a/voice_bot/main.py b/voice_bot/main.py
index 199d2f7..d0a25db 100644
--- a/voice_bot/main.py
+++ b/voice_bot/main.py
@@ -1,9 +1,19 @@
+import os
+import sys
+
+# Ensure project root is on sys.path for module resolution when running voice bot directly
+CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
+PROJECT_ROOT = os.path.dirname(CURRENT_DIR)
+if PROJECT_ROOT not in sys.path:
+ sys.path.insert(0, PROJECT_ROOT)
+
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.fsm.strategy import FSMStrategy
-from voice_bot.voice_handler.voice_handler import voice_router
+from voice_bot.handlers.callback_handler import callback_router
+from voice_bot.handlers.voice_handler import voice_router
async def start_bot(bdf):
@@ -13,6 +23,6 @@ async def start_bot(bdf):
link_preview_is_disabled=bdf.settings['Telegram']['preview_link']
))
dp = Dispatcher(storage=MemoryStorage(), fsm_strategy=FSMStrategy.GLOBAL_USER)
- dp.include_routers(voice_router)
+ dp.include_routers(voice_router, callback_router)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot, skip_updates=True)
diff --git a/voice_bot/utils/helper_func.py b/voice_bot/utils/helper_func.py
index 1ec0fb8..e312e27 100644
--- a/voice_bot/utils/helper_func.py
+++ b/voice_bot/utils/helper_func.py
@@ -12,6 +12,9 @@ BotDB = bdf.get_db()
def last_message():
# функция с отображением сообщения "Последнее сообщение было записано"
date_from_db = BotDB.last_date_audio()
+ if date_from_db is None:
+ return None
+
parse_date = datetime.strptime(date_from_db, "%Y-%m-%d %H:%M:%S")
last_voice_time_timestamp = time.mktime(parse_date.timetuple())
time_now_timestamp = time.time()