From f2e44ddb29f3a6b3394f9967d4c3e807517452e6 Mon Sep 17 00:00:00 2001 From: KatykhinAA Date: Wed, 10 Jul 2024 23:18:36 +0300 Subject: [PATCH] add tests, some fixes --- db.py => database/db.py | 187 ++++- custom_logger.py => logs/custom_logger.py | 11 +- main.py | 26 +- migrations/000_migrations_init.py | 12 +- migrations/001_create_new_tables.py | 4 +- pytest.ini | 8 + requirements.txt | 21 +- tests/test_db.py | 825 ++++++++++++++++++++++ voice_bot.py | 3 +- 9 files changed, 1034 insertions(+), 63 deletions(-) rename db.py => database/db.py (86%) rename custom_logger.py => logs/custom_logger.py (77%) create mode 100644 pytest.ini create mode 100644 tests/test_db.py diff --git a/db.py b/database/db.py similarity index 86% rename from db.py rename to database/db.py index 7c3d345..3e558fa 100644 --- a/db.py +++ b/database/db.py @@ -1,27 +1,27 @@ import sqlite3 -import configparser import os from datetime import datetime from loguru import logger -from custom_logger import Logger +from logs.custom_logger import Logger db_logger = Logger(name='db') -script_dir = os.path.dirname(os.path.abspath(__file__)) -config_path = os.path.join(script_dir, 'settings.ini') -config = configparser.ConfigParser() -config.read(config_path) -LOGS = config.getboolean('Settings', 'logs') -IMPORTANT_LOGS = config.get('Telegram', 'important_logs') + +# Получение абсолютного пути к текущей директории +current_dir = os.getcwd() class BotDB: - def __init__(self): - db_file_path = os.path.dirname(os.path.abspath(__file__)) - db_file = os.path.join(db_file_path, 'tg-bot-database') - self.conn = sqlite3.connect(db_file, check_same_thread=False) + def __init__(self, name): + self.db_file = os.path.join(current_dir, name) + self.conn = None + self.cursor = None + logger.info(f'Подключен к базе данных: {self.db_file}') + + def connect(self): + """Создание соединения и курсора.""" + self.conn = sqlite3.connect(self.db_file) self.cursor = self.conn.cursor() - logger.info(f'Подключен к базе данных: {db_file_path}') def create_table(self, sql_script): """ @@ -34,12 +34,15 @@ class BotDB: None """ try: - cursor = self.conn.cursor() - cursor.execute(sql_script) + self.connect() + self.cursor.execute(sql_script) + self.conn.commit() logger.info(f'Таблица создана: {sql_script}') except Exception as e: logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}') raise + finally: + self.close() def get_current_version(self): """ @@ -53,16 +56,18 @@ class BotDB: """ logger.info(f'Попытка получения версии миграции') try: - cursor = self.conn.cursor() - cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1") - version = cursor.fetchone()[0] + self.connect() + self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1") + version = self.cursor.fetchone()[0] logger.info(f'Получена текущая версия миграции: {version}') return version except Exception as e: logger.error(f'Ошибка при получении текущей версии миграции: {e}') raise + finally: + self.close() - def update_version(self, new_version: str, script_name: str): + def update_version(self, new_version: int, script_name: str): """ Обновляет версию миграций в таблице migrations. @@ -83,9 +88,9 @@ class BotDB: """ logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}') try: + self.connect() today = datetime.now().strftime("%d-%m-%Y %H:%M:%S") - cursor = self.conn.cursor() - cursor.execute( + self.cursor.execute( "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)", (new_version, script_name, today), ) @@ -97,6 +102,8 @@ class BotDB: except Exception as e: logger.error(f"Ошибка при обновлении версии: {e}") raise + finally: + self.close() # TODO: Deprecated. Остался только в voice боте, удалить и оттуда def get_error_message_from_db(self, id: int): @@ -107,14 +114,17 @@ class BotDB: """ # Подключаемся к базе try: - cursor = self.conn.cursor() - cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,)) - response_from_database = str(cursor.fetchone()[1]) + self.connect() + self.cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,)) + response_from_database = str(self.cursor.fetchone()[1]) return response_from_database except sqlite3.Error as error: - print(error) + logger.error(f"Ошибка при получении сообщения об ошибка voice_bot: {error}") + finally: + self.close() - def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool, language_code: str, date_added: str, + def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool, + language_code: str, date_added: str, date_changed: str): """ Добавляет нового пользователя в базу данных. @@ -135,6 +145,7 @@ class BotDB: """ logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}") try: + self.connect() self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', " "'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, @@ -146,6 +157,8 @@ class BotDB: logger.error(f"Ошибка при добавлении пользователя в базу: {error}. " f"Данные пользователя: user_id={user_id}, first_name={first_name}") raise + finally: + self.close() def user_exists(self, user_id: int): """ @@ -159,6 +172,7 @@ class BotDB: """ logger.info(f"Попытка проверки существования пользователя: user_id={user_id}") try: + self.connect() self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchall() logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}") @@ -166,6 +180,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при проверке существования пользователя: {error}") raise + finally: + self.close() def get_user_id(self, user_id: int): """ @@ -181,6 +197,7 @@ class BotDB: """ logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}") try: + self.connect() self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: @@ -189,10 +206,12 @@ class BotDB: return user_id_db else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") - return 0 + return None except sqlite3.Error as error: logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}") raise + finally: + self.close() def get_username(self, user_id: int): """ @@ -209,6 +228,7 @@ class BotDB: sqlite3.Error: Если произошла ошибка при выполнении запроса. """ try: + self.connect() self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: @@ -217,10 +237,12 @@ class BotDB: return username else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") - return "" + return None except sqlite3.Error as error: logger.error(f"Ошибка при получении username из базы данных: {error}") raise + finally: + self.close() def get_all_user_id(self): """ @@ -235,6 +257,7 @@ class BotDB: """ logger.info(f"Попытка получения всех user_id") try: + self.connect() self.cursor.execute("SELECT user_id FROM our_users") fetch_all = self.cursor.fetchall() list_of_users = [user_id[0] for user_id in fetch_all] @@ -243,6 +266,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при получении списка user_id из базы данных: {error}") raise + finally: + self.close() def get_user_first_name(self, user_id: int): """ @@ -260,6 +285,7 @@ class BotDB: """ logger.info(f"Попытка получения имени пользователя по user_id={user_id}") try: + self.connect() self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: @@ -272,6 +298,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}") raise + finally: + self.close() def change_name(self, user_id): #TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. Обновляем поля first_name, date_changed @@ -294,11 +322,13 @@ class BotDB: """ logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.") try: + self.connect() self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() if result: has_stickers = result[0] == 1 - logger.info(f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}") + logger.info( + f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}") return has_stickers else: logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") @@ -306,6 +336,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при получении информации о получении стикеров: {error}") raise + finally: + self.close() def update_info_about_stickers(self, user_id): """ @@ -322,6 +354,7 @@ class BotDB: """ logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}") try: + self.connect() self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}") @@ -329,6 +362,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}") raise + finally: + self.close() def get_users_blacklist(self): """ @@ -343,6 +378,7 @@ class BotDB: """ logger.info(f"Запуск функции get_users_blacklist") try: + self.connect() self.cursor.execute("SELECT user_id, user_name FROM blacklist") fetch_all = self.cursor.fetchall() list_of_users = {user_id: username for user_id, username in fetch_all} @@ -351,6 +387,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}") raise + finally: + self.close() def get_users_for_unblock_today(self, date_to_unban: str): """ @@ -368,6 +406,7 @@ class BotDB: """ logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}") try: + self.connect() result = self.cursor.execute("SELECT user_id, user_name " "FROM blacklist WHERE date_to_unban = ?", (date_to_unban,)) fetch_all = result.fetchall() @@ -377,6 +416,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}") raise + finally: + self.close() def get_blacklist_users_by_id(self, user_id: int): """ @@ -394,13 +435,15 @@ class BotDB: """ logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}") try: + self.connect() result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban " - "FROM blacklist WHERE user_id = ?", (user_id, )) + "FROM blacklist WHERE user_id = ?", (user_id,)) return self.cursor.fetchone() except sqlite3.Error as error: logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}") raise - + finally: + self.close() def check_user_in_blacklist(self, user_id: int): """ @@ -417,12 +460,15 @@ class BotDB: """ logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}") try: + self.connect() self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() return bool(result) except sqlite3.Error as error: logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}") raise + finally: + self.close() def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None): """ @@ -441,6 +487,7 @@ class BotDB: logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name}," f" message_for_user={message_for_user}, date_to_unban={date_to_unban}") try: + self.connect() result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name'," " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)", (user_id, user_name, message_for_user, date_to_unban,)) @@ -450,6 +497,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка при добавлении пользователя в черный список: {error}") return error + finally: + self.close() def delete_user_blacklist(self, user_id: int): """ @@ -466,6 +515,7 @@ class BotDB: """ logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}") try: + self.connect() self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.") @@ -474,6 +524,8 @@ class BotDB: logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} " f"из таблицы blacklist. Ошибка: {str(error)}") return False + finally: + self.close() def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str): """ @@ -494,6 +546,7 @@ class BotDB: """ logger.info(f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}") try: + self.connect() self.cursor.execute( "INSERT INTO user_messages (message_text, user_id, message_id, date) " "VALUES (?, ?, ?, ?)", @@ -504,9 +557,12 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка добавления сообщения в базу данных: {error}") raise + finally: + self.close() def update_date_for_user(self, date: str, user_id: int): """ + #TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users Обновляет дату последнего изменения данных пользователя в базе. Args: @@ -519,6 +575,7 @@ class BotDB: """ logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}") try: + self.connect() self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?", (date, user_id,)) self.conn.commit() @@ -527,6 +584,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка обновления даты изменения для пользователя: {error}") return error + finally: + self.close() def is_admin(self, user_id: int): """ @@ -541,13 +600,17 @@ class BotDB: Raises: None: В случае ошибки возвращается None """ + logger.info(f"Запуск функции is_admin: user_id={user_id}") try: + self.connect() self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,)) result = self.cursor.fetchone() return bool(result) except sqlite3.Error as error: logger.error(f"Ошибка добавления сообщения в базу данных: {error}") return None + finally: + self.close() def add_admin(self, user_id: int, role: str): """ @@ -565,6 +628,7 @@ class BotDB: """ logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}") try: + self.connect() self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role)) self.conn.commit() logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.") @@ -572,6 +636,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка добавления пользователя в список администраторов: {error}") return error + finally: + self.close() def remove_admin(self, user_id: int): """ @@ -586,6 +652,7 @@ class BotDB: """ logger.info(f"Запуск функции remove_admin: user_id={user_id}") try: + self.connect() self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,)) self.conn.commit() logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.") @@ -593,9 +660,12 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}") return error + finally: + self.connect() def get_user_by_message_id(self, message_id: int): """ + #TODO: Возвращается TypeError вместо None Возвращает идентификатор пользователя по идентификатору сообщения. Args: @@ -610,6 +680,7 @@ class BotDB: """ logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}") try: + self.connect() result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,)) user = result.fetchone()[0] logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}") @@ -617,13 +688,15 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка получения user_id по message_id: {error}") raise + finally: + self.close() def get_last_users_from_db(self): """ - Возвращает список идентификаторов последних 100 пользователей, обращавшихся в бот. + Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот. Returns: - list: Список кортежей (full_name, user_id) последних 100 пользователей. + list: Список кортежей (full_name, user_id) последних 30 пользователей. []: Если в базе данных нет пользователей. Raises: @@ -631,13 +704,16 @@ class BotDB: """ logger.info("Запуск функции get_last_users_from_db") try: - result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 100") + self.connect() + result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30") users = result.fetchall() - logger.info(f"Получен список последних 100 пользователей: {users}") + logger.info(f"Получен список последних 30 пользователей: {users}") return users except sqlite3.Error as error: logger.error(f"Ошибка получения списка последних пользователей: {error}") raise + finally: + self.close() def get_banned_users_from_db(self): """ @@ -652,6 +728,7 @@ class BotDB: """ logger.info("Запуск функции get_banned_users_from_db") try: + self.connect() result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist") users = result.fetchall() logger.info(f"Получен список пользователей в черном списке: {users}") @@ -659,6 +736,8 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка получения списка пользователей в черном списке: {error}") raise + finally: + self.close() def get_banned_users_from_db_with_limits(self, offset: int, limit: int): """ @@ -677,6 +756,7 @@ class BotDB: """ logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}") try: + self.connect() result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban " "FROM blacklist LIMIT ?, ?", (offset, limit,)) users = result.fetchall() @@ -685,58 +765,81 @@ class BotDB: except sqlite3.Error as error: logger.error(f"Ошибка получения списка пользователей в черном списке: {error}") raise + finally: + self.close() def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id): """Добавляет информацию о войсе юзера в БД""" + logger.info( + f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id}, date_added = {date_added}") try: + self.connect() result = self.cursor.execute( "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", (file_name, author_id, date_added, listen_count, file_id)) - return self.conn.commit() + self.conn.commit() + logger.info( + f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, date_added = {date_added}") + return None except sqlite3.Error as error: print(error) + finally: + self.close() def last_date_audio(self): """Получаем дату последнего войса""" try: + self.connect() result = self.cursor.execute( "SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1") return result.fetchone()[0] except sqlite3.Error as error: print(error) + finally: + self.close() def get_last_user_audio_record(self, user_id): """Получает данные о количестве записей пользователя""" try: + self.connect() result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", (user_id,)) return bool(len(result.fetchall())) except sqlite3.Error as error: print(error) + finally: + self.close() def get_id_for_audio_record(self, user_id): """Получает ID аудио сообщения пользователя""" try: + self.connect() result = self.cursor.execute( "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) + finally: + self.close() def get_path_for_audio_record(self, user_id): """Получает данные о названии файла""" try: + self.connect() result = self.cursor.execute( "SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,)) return result.fetchone()[0] except sqlite3.Error as error: print(error) + finally: + self.close() def check_listen_audio(self, user_id): """Проверяет прослушано ли аудио пользователем""" try: + self.connect() query_listen_audio = self.cursor.execute( """SELECT l.file_name FROM audio_message_reference a @@ -754,17 +857,25 @@ class BotDB: return new_sign except sqlite3.Error as error: print(error) + finally: + self.close() def mark_listened_audio(self, file_name, user_id): """Отмечает аудио прослушанным для конкретного пользователя.""" try: + self.connect() result = self.cursor.execute( "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", (file_name, user_id, 1)) return self.conn.commit() except sqlite3.Error as error: print(error) + finally: + self.close() def close(self): - """Закрываем соединение с БД""" - self.conn.close() + """Закрытие соединения и курсора.""" + if self.cursor: + self.cursor.close() + if self.conn: + self.conn.close() diff --git a/custom_logger.py b/logs/custom_logger.py similarity index 77% rename from custom_logger.py rename to logs/custom_logger.py index 28b9655..2287cf9 100644 --- a/custom_logger.py +++ b/logs/custom_logger.py @@ -1,22 +1,21 @@ import datetime import os -from loguru import logger +import loguru class Logger: def __init__(self, name): - self.logger = logger.bind(name=name) + self.logger = loguru.logger.bind(name=name) # Получение сегодняшней даты для имени файла today = datetime.date.today().strftime('%Y-%m-%d') # Создание папки для логов current_dir = os.path.dirname(os.path.abspath(__file__)) - logs_dir = os.path.join(current_dir, 'logs') - if not os.path.exists(logs_dir): + if not os.path.exists(current_dir): # Если не существует, создаем ее - os.makedirs(logs_dir) - filename = f'{logs_dir}/helper_bot_{today}.log' + os.makedirs(current_dir) + filename = f'{current_dir}/helper_bot_{today}.log' # Настройка формата логов self.logger.add( diff --git a/main.py b/main.py index 6ea78c0..fed9481 100644 --- a/main.py +++ b/main.py @@ -4,9 +4,7 @@ import sys from pathlib import Path from time import sleep from enum import Enum -from typing import Any -from apscheduler.schedulers.background import BackgroundScheduler -from db import BotDB +from database.db import BotDB import telebot import random from datetime import datetime, timedelta @@ -37,7 +35,7 @@ LOGS = config.getboolean('Settings', 'logs') TEST = config.getboolean('Settings', 'test') # Инициализируем бота и базку -BotDB = BotDB() +BotDB = BotDB(name='tg-bot-database') class State(Enum): @@ -134,10 +132,17 @@ class TelegramHelperBot: markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Выйти из чата") markup.add(item1) - message_id = message.reply_to_message.id + message_id = 0 + try: + message_id = message.reply_to_message.id + except AttributeError: + self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!') message_from_admin = message.text - chat_id = BotDB.get_user_by_message_id(message_id) - self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) + try: + chat_id = BotDB.get_user_by_message_id(message_id) + self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) + except TypeError: + self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.') # Админка @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) @@ -212,6 +217,7 @@ class TelegramHelperBot: def unban_notifier(self): # Получение сегодняшней даты в формате DD-MM-YYYY current_date = datetime.now() + print('Мы в функции unban_notifier') today = current_date.strftime("%d-%m-%Y") # Получение списка разблокированных пользователей unblocked_users = BotDB.get_users_for_unblock_today(today) @@ -369,7 +375,7 @@ class TelegramHelperBot: ) current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") - BotDB.add_new_message_in_db(message.text, message.message_id + 1, message.from_user.id, date) + BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) question = messages.get_message(self.__get_first_name(message), 'QUESTION') markup = self.get_reply_keyboard(message) self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK, @@ -543,7 +549,7 @@ class TelegramHelperBot: return formatted_date @staticmethod - def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[Any, Any]], callback: str): + def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str): """ Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback @@ -636,6 +642,6 @@ if __name__ == "__main__": bot.start() #scheduler = BackgroundScheduler() - #scheduler.add_job(bot.unban_notifier(), 'cron', hour=0, minute=0) + #scheduler.add_job(bot.unban_notifier(), 'cron', hour=22, minute=9) #scheduler.start() diff --git a/migrations/000_migrations_init.py b/migrations/000_migrations_init.py index 4ba49b0..a83e7ab 100644 --- a/migrations/000_migrations_init.py +++ b/migrations/000_migrations_init.py @@ -1,8 +1,16 @@ import os -from db import BotDB +from database.db import BotDB +# Получаем текущий рабочий каталог +current_dir = os.path.dirname(os.path.abspath(__file__)) -BotDB = BotDB() +# Переходим на уровень выше, чтобы выйти из папки migrations/ +parent_dir = os.path.dirname(current_dir) + +# Строим путь до файла +tg_bot_database_path = os.path.join(parent_dir, "tg-bot-database") + +BotDB = BotDB(f'{tg_bot_database_path}') def get_filename(): diff --git a/migrations/001_create_new_tables.py b/migrations/001_create_new_tables.py index cd45af5..f1a1d4e 100644 --- a/migrations/001_create_new_tables.py +++ b/migrations/001_create_new_tables.py @@ -1,7 +1,7 @@ import os -from db import BotDB +from database.db import BotDB -BotDB = BotDB() +BotDB = BotDB('tg-bot-database') def get_filename(): diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..685a09e --- /dev/null +++ b/pytest.ini @@ -0,0 +1,8 @@ +[pytest] +pythonpath = . +python_files = test_*.py *_test.py +python_functions = test_* +testpaths = tests + +[report] +omit = *myenv/*, custom_logger.py, *venv/*, tests/* \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 35a1522..0715972 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,18 @@ -pyTelegramBotAPI -APScheduler~=3.10.4 -loguru~=0.7.2 \ No newline at end of file +APScheduler==3.10.4 +certifi==2024.7.4 +charset-normalizer==3.3.2 +coverage==7.5.4 +exceptiongroup==1.2.1 +idna==3.7 +iniconfig==2.0.0 +loguru==0.7.2 +packaging==24.1 +pluggy==1.5.0 +pyTelegramBotAPI==4.20.0 +pytest==8.2.2 +pytz==2024.1 +requests==2.32.3 +six==1.16.0 +tomli==2.0.1 +tzlocal==5.2 +urllib3==2.2.2 diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..e80846b --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,825 @@ +import os +from datetime import datetime +import pytest +import sqlite3 +from database.db import BotDB + + +@pytest.fixture +def bot(): + """Фикстура для создания объекта BotDB.""" + return BotDB("test.db") + + +@pytest.fixture(autouse=True, ) +def setup_db(): + """Фикстура для создания всей базы перед каждым тестом.""" + # Mock data 1st user + user_id = 12345 + first_name = "Иван" + full_name = "Иван Иванович" + username = "@iban" + message_text = 'Hello, planet' + message_id = 1 + message_for_user = "LOL" + has_stickers = 0 + # Mock data 2nd user + user_id_2 = 14278 + first_name_2 = "Борис" + full_name_2 = "Борис Петрович" + username_2 = "@boris" + message_text_2 = 'Hello, world' + message_id_2 = 2 + message_for_user_2 = "LOL2" + has_stickers_2 = 1 + # Other data + date = "2024-07-10" + next_date = "2024-07-11" + conn = sqlite3.connect("test.db") + cursor = conn.cursor() + cursor.execute(""" + CREATE TABLE IF NOT EXISTS "admins" ( + user_id INTEGER NOT NULL, + "role" TEXT + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS "audio_message_reference" + ( + "id" INTEGER NOT NULL UNIQUE, + "file_name" TEXT NOT NULL UNIQUE, + "author_id" INTEGER NOT NULL, + "date_added" DATE NOT NULL, + "listen_count" INTEGER NOT NULL, + "file_id" INTEGER NOT NULL, + PRIMARY KEY ("id") + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS "blacklist" + ( + "user_id" INTEGER NOT NULL UNIQUE, + "user_name" INTEGER, + "message_for_user" INTEGER, + "date_to_unban" INTEGER + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS "messages" ( + "ID" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, + "Message" TEXT NOT NULL, + "type" INTEGER + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS "our_users" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE, + "user_id" INTEGER NOT NULL UNIQUE, + "first_name" STRING, + "full_name" STRING, + "username" STRING, + "is_bot" BOOLEAN, + "language_code" STRING, + "has_stickers" INTEGER NOT NULL DEFAULT 0, + "date_added" DATE NOT NULL, + "date_changed" DATE NOT NULL + , state_user TEXT(20)); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS user_messages ( + id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + message_text TEXT, + user_id INTEGER, + message_id INTEGER NOT NULL, + date TEXT + ); + """) + cursor.execute(""" + CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT); + """) + cursor.execute(""" + CREATE TABLE migrations ( + version INTEGER PRIMARY KEY NOT NULL, + script_name TEXT NOT NULL, + created_at TEXT + ); + """) + + #blacklist mock data + cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)", + (user_id, username, message_for_user, next_date)) + cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)", + (user_id_2, username_2, message_for_user_2, date)) + #our_users mock data + cursor.execute( + "INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)" + " VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, username, date, date, has_stickers) + ) + cursor.execute( + "INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)" + " VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id_2, first_name_2, full_name_2, username_2, date, date, has_stickers_2) + ) + #messages mock data + cursor.execute( + "INSERT INTO user_messages (message_text, user_id, message_id, date) " + "VALUES (?, ?, ?, ?)", + (message_text, user_id, message_id, date)) + cursor.execute( + "INSERT INTO user_messages (message_text, user_id, message_id, date) " + "VALUES (?, ?, ?, ?)", + (message_text_2, user_id_2, message_id_2, date)) + #mock admins + cursor.execute( + "INSERT INTO admins (user_id, role) " + "VALUES (?, ?)", + (user_id, 'creator')) + conn.commit() + conn.close() + yield + os.remove('test.db') + + +def test_bot_init(bot): + """Проверяет, что объект BotDB инициализируется с правильным именем файла.""" + assert bot.db_file == os.path.join(os.getcwd(), "test.db") + # Проверьте, что соединения с базой данных нет, так как оно не устанавливается в init + assert bot.conn is None + assert bot.cursor is None + + +def test_bot_connect(bot): + """Проверяет, что метод connect создает подключение к базе данных.""" + bot.connect() + assert bot.conn is not None + assert bot.cursor is not None + bot.close() + + +@pytest.mark.xfail +def test_bot_close(bot): + """Проверяет, что метод close закрывает подключение к базе данных.""" + bot.connect() + assert bot.conn is not None + assert bot.cursor is not None + bot.close() + assert bot.conn is None + assert bot.cursor is None + + +def test_create_table_success(bot): + sql_script = 'CREATE TABLE test_table (id INTEGER PRIMARY KEY);' + bot.create_table(sql_script) + + # Проверяем, что таблица создана + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='test_table'") + result = cursor.fetchone() + conn.close() + + assert result is not None + + +def test_create_table_error(bot): + sql_script = 'CREATE TABLE test_table (id INTEGER PRIMARY KEY);' + bot.create_table(sql_script) + + with pytest.raises(sqlite3.OperationalError): + bot.create_table(sql_script) + + +def test_get_current_version_success(bot): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("INSERT INTO migrations (version, script_name) VALUES (123, 'test')") + conn.commit() + conn.close() + + # Вызываем функцию и проверяем результат + version = bot.get_current_version() + assert version == 123 + + +def test_get_current_version_error(bot): + __drop_table('migrations') + with pytest.raises(sqlite3.OperationalError): + bot.get_current_version() + + +def test_update_version_success(bot): + # Вызываем функцию update_version + new_version = 124 + script_name = "migration_script.sql" + bot.update_version(new_version, script_name) + + # Проверяем, что данные записаны в таблицу + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT * FROM migrations WHERE version = ?", (new_version,)) + result = cursor.fetchone() + conn.close() + assert result is not None + assert result[0] == new_version + assert result[1] == script_name + assert result[2] == datetime.now().strftime("%d-%m-%Y %H:%M:%S") + + +def test_update_version_integrity_error(bot): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("INSERT INTO migrations (version, script_name) VALUES (123, 'test')") + conn.commit() + conn.close() + # Пытаемся обновить версию с уже существующим значением + with pytest.raises(sqlite3.IntegrityError): + bot.update_version(123, "script_2.sql") + +def test_update_version_error(bot): + __drop_table('migrations') + with pytest.raises(sqlite3.OperationalError): + bot.update_version(123, "script_2.sql")() + + +def test_add_new_user_in_db(bot): + """Проверяет добавление нового пользователя в базу данных.""" + user_id = 50 + first_name = "Петр" + full_name = "Петр Иванов" + username = "@petr_ivanov" + is_bot = False + language_code = "ru" + date_added = "2024-07-09" + date_changed = "2024-07-09" + + # Вызываем функцию add_new_user_in_db + bot.add_new_user_in_db( + user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed + ) + + # Проверяем наличие записи в базе данных + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT * FROM our_users WHERE user_id = ?", (user_id,)) + result = cursor.fetchone() + conn.close() + + assert result is not None + assert result[1] == user_id + assert result[2] == first_name + assert result[3] == full_name + assert result[4] == username + assert result[5] == is_bot + assert result[6] == language_code + assert result[8] == date_added + assert result[9] == date_changed + + +def test_add_new_user_in_db_duplicate_user_id(bot, setup_db): + """Проверяет поведение при попытке добавить пользователя с уже существующим user_id.""" + user_id = 12345 + + # Попытка добавить пользователя с тем же user_id + with pytest.raises(sqlite3.IntegrityError): + bot.add_new_user_in_db( + user_id, "Марина", "Марина Альфредовна", "marina", False, "bg", "2024-07-09", "2024-07-09" + ) + + +def test_add_new_user_in_db_empty_first_name(bot): + """ Проверяет добавление пользователя с пустым именем (first_name) """ + user_id = 43 + first_name = "" # Пустое имя + full_name = "Boris Petrov" + username = "@boris" + is_bot = False + language_code = "fr" + date_added = "2024-07-09" + date_changed = "2024-07-09" + + # Вызываем функцию add_new_user_in_db + bot.add_new_user_in_db( + user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed + ) + + # Проверяем наличие записи в базе данных + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute(f"SELECT * FROM our_users WHERE user_id = ?", (user_id,)) + result = cursor.fetchone() + conn.close() + assert result is not None + assert result[1] == user_id + assert result[2] == first_name + assert result[3] == full_name + assert result[4] == username + assert result[5] == is_bot + assert result[6] == language_code + assert result[8] == date_added + assert result[9] == date_changed + + +def test_user_exists_found(bot): + """Проверяет, что функция возвращает True, если пользователь найден.""" + user_id = 12345 + + # Проверяем наличие записи в базе данных + assert bot.user_exists(user_id) is True + + +def test_user_exists_not_found(bot): + """Проверяет, что функция возвращает False, если пользователь не найден.""" + user_id = 99999 + assert bot.user_exists(user_id) is False + + +def test_user_exists_error(bot): + """Проверяет, что функция возвращает ошибки""" + __drop_table('our_users') + with pytest.raises(sqlite3.Error): + bot.user_exists(12345) + + +def test_get_user_id_found(bot): + """Проверяет, что функция возвращает ID пользователя, если он найден.""" + user_id = 12345 + # Проверяем, что возвращается правильный ID из базы + user_id_db = bot.get_user_id(user_id) + assert user_id_db == 1 + + +def test_get_user_id_not_found(bot, setup_db): + """Проверяет, что функция возвращает None, если пользователь не найден.""" + user_id = 99999 + assert bot.get_user_id(user_id) is None + + +def test_get_user_id_error(bot): + """Проверяет, что функция обрабатывает некорректный user_id.""" + __drop_table('our_users') + with pytest.raises(sqlite3.Error): + bot.get_user_id(12345) + + +def test_get_username_found(bot): + """Проверяет, что функция возвращает username пользователя, если он найден.""" + user_id = 12345 + username = "@iban" + # Проверяем, что возвращается правильный username из базы + username_db = bot.get_username(user_id) + assert username_db == username + + +def test_get_username_not_found(bot, setup_db): + """Проверяет, что функция возвращает None, если пользователь не найден.""" + user_id = 99999 + assert bot.get_username(user_id) is None + + +def test_get_username_error(bot): + """Проверяет, что функция возвращает ошибку""" + __drop_table('our_users') + with pytest.raises(sqlite3.Error): + bot.get_username(12345) + + +def test_get_all_user_id_empty(bot): + """Проверяет, что функция возвращает пустой список, если в базе нет пользователей.""" + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("DELETE FROM our_users") + conn.commit() + conn.close() + # Проверяем наличие записей в базе данных + user_ids = bot.get_all_user_id() + assert user_ids == [] + + +def test_get_all_user_id_non_empty(bot, setup_db): + """Проверяет, что функция возвращает список всех user_id из базы данных.""" + # Проверяем наличие записи в базе данных + user_ids = bot.get_all_user_id() + assert user_ids == [12345, 14278] # Проверяем, что в списке два ожидаемых user_id + + +def test_get_all_user_id_error(bot): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('our_users') + + with pytest.raises(sqlite3.Error): + bot.get_all_user_id() + + +def test_get_user_first_name_found(bot): + """Проверяет, что функция возвращает имя пользователя, если он найден.""" + user_id = 12345 + first_name = bot.get_user_first_name(user_id) + assert first_name == "Иван" + + +def test_get_user_first_name_not_found(bot, setup_db): + """Проверяет, что функция возвращает None, если пользователь не найден.""" + user_id = 99999 + assert bot.get_user_first_name(user_id) is None + + +@pytest.mark.xfail +def test_get_user_first_name_invalid_user_id(bot): + """Проверяет, что функция обрабатывает некорректный user_id.""" + with pytest.raises(sqlite3.Error): + bot.get_user_first_name("invalid_user_id") # Передача строки + + +def test_get_user_first_name_error(bot): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('our_users') + + with pytest.raises(sqlite3.Error): + bot.get_user_first_name(12345) + + +def test_get_info_about_stickers_found_received(bot): + """Проверяет, что функция возвращает True, если пользователь получил стикеры.""" + user_id = 14278 + assert bot.get_info_about_stickers(user_id) is True + + +def test_get_info_about_stickers_found_not_received(bot, setup_db): + """Проверяет, что функция возвращает False, если пользователь не получил стикеры.""" + user_id = 12345 + assert bot.get_info_about_stickers(user_id) is False + + +@pytest.mark.xfail +def test_get_info_about_stickers_not_found(bot, setup_db): + """Проверяет, что функция возвращает None, если пользователь не найден.""" + user_id = 99999 + assert bot.get_info_about_stickers(user_id) is None + + +@pytest.mark.xfail +def test_get_info_about_stickers_invalid_user_id(bot): + """Проверяет, что функция обрабатывает некорректный user_id.""" + with pytest.raises(sqlite3.Error): + bot.get_info_about_stickers("invalid_user_id") + + +def test_get_info_about_stickers_error(bot): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('our_users') + + with pytest.raises(sqlite3.Error): + bot.get_info_about_stickers(12345) + + +def test_update_info_about_stickers_success(bot): + """Проверяет, что функция успешно обновляет информацию о получении стикеров.""" + user_id = 12345 + bot.update_info_about_stickers(user_id) + + # Проверяем, что информация обновлена + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,)) + result = cursor.fetchone() + conn.close() + assert result[0] == 1 + + +def test_update_info_about_stickers_not_found(bot): + """Проверяет, что функция не вызывает ошибки, если пользователь не найден.""" + user_id = 99999 + bot.update_info_about_stickers(user_id) + + # Проверяем, что база данных не изменилась + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT COUNT(*) FROM our_users WHERE user_id = ?", (user_id,)) + result = cursor.fetchone() + conn.close() + assert result[0] == 0 + + +def test_update_info_about_stickers_error(bot): + """Проверяет, что функция вызывает ошибки""" + __drop_table('our_users') + with pytest.raises(sqlite3.Error): + bot.update_info_about_stickers(12345) + + +def test_get_users_blacklist_empty(bot): + """Проверяет, что функция возвращает пустой словарь, если в черном списке нет пользователей.""" + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("DELETE FROM blacklist") + conn.commit() + conn.close() + + blacklist = bot.get_users_blacklist() + assert blacklist == {} + + +def test_get_users_blacklist_non_empty(bot): + """Проверяет, что функция возвращает словарь с пользователями из черного списка.""" + blacklist = bot.get_users_blacklist() + assert blacklist == {12345: "@iban", 14278: "@boris"} + + +def test_get_users_blacklist_error(bot): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('blacklist') + + with pytest.raises(sqlite3.Error): + bot.get_users_blacklist() + + +def test_get_blacklist_users_by_id_found(bot, setup_db): + """Проверяет, что функция возвращает информацию о пользователе, если он найден в черном списке.""" + user_id = 12345 + + result = bot.get_blacklist_users_by_id(user_id) + assert result == (12345, "@iban", "LOL", "2024-07-11") + + +def test_get_blacklist_users_by_id_not_found(bot, setup_db): + """Проверяет, что функция возвращает None, если пользователь не найден в черном списке.""" + user_id = 99999 + assert bot.get_blacklist_users_by_id(user_id) is None + + +@pytest.mark.xfail +def test_get_blacklist_users_by_id_invalid_user_id(bot): + """Проверяет, что функция обрабатывает некорректный user_id.""" + with pytest.raises(sqlite3.Error): + bot.get_blacklist_users_by_id("invalid_user_id") # Передача строки + + +def test_get_blacklist_users_by_id_error(bot): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('blacklist') + + with pytest.raises(sqlite3.Error): + bot.get_blacklist_users_by_id(12345) + + +def test_get_users_for_unblock_today_found(bot): + """Проверяет, что функция возвращает словарь с пользователями, у которых истекает блокировка сегодня.""" + date_to_unban = "2024-07-11" + result = bot.get_users_for_unblock_today(date_to_unban) + assert result == {12345: "@iban"} + + +def test_get_users_for_unblock_today_not_found(bot, setup_db): + """Проверяет, что функция возвращает пустой словарь, если сегодня нет пользователей, у которых истекает блокировка.""" + date_to_unban = "2024-07-12" + result = bot.get_users_for_unblock_today(date_to_unban) + assert result == {} + + +def test_get_users_for_unblock_today_error(bot): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('blacklist') + + with pytest.raises(sqlite3.Error): + bot.get_users_for_unblock_today("2023-12-26") + + +def test_check_user_in_blacklist_found(bot, setup_db): + """Проверяет, что функция возвращает True, если пользователь найден в черном списке.""" + user_id = 12345 + bot.set_user_blacklist(user_id, "JohnDoe") # Добавляем пользователя в черный список + + assert bot.check_user_in_blacklist(user_id) is True + + +def test_check_user_in_blacklist_not_found(bot, setup_db): + """Проверяет, что функция возвращает False, если пользователь не найден в черном списке.""" + user_id = 99999 + assert bot.check_user_in_blacklist(user_id) is False + + +def test_check_user_in_blacklist_error(bot, setup_db): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('blacklist') + + with pytest.raises(sqlite3.Error): + bot.check_user_in_blacklist(12345) + + +def test_set_user_blacklist_success(bot): + """Проверяет, что функция успешно добавляет пользователя в черный список.""" + user_id = 11 + user_name = "Гриша" + message_for_user = "Лови бан!" + date_to_unban = datetime.now().strftime("%Y-%m-%d") # Текущая дата + + assert bot.set_user_blacklist(user_id, user_name, message_for_user, date_to_unban) is None + + # Проверяем, что запись добавлена в базу + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT * FROM blacklist WHERE user_id = ?", (user_id,)) + result = cursor.fetchone() + conn.commit() + conn.close() + + assert result is not None + assert result[1] == user_name + assert result[2] == message_for_user + assert result[3] == date_to_unban + + +@pytest.mark.xfail +def test_set_user_blacklist_duplicate_user_id(bot, setup_db): + """Проверяет, что функция не добавляет дубликат user_id в черный список.""" + user_id = 12345 + bot.set_user_blacklist(user_id, "JohnDoe") + + with pytest.raises(sqlite3.IntegrityError): + bot.set_user_blacklist(user_id, "JaneSmith") # Попытка добавить дубликат + + +@pytest.mark.xfail +def test_set_user_blacklist_error(bot, setup_db): + """Проверяет, что функция вызывает sqlite3. Error при ошибке запроса.""" + __drop_table('blacklist') + + with pytest.raises(sqlite3.Error): + bot.set_user_blacklist(12345, "JohnDoe", "You are banned!", "2024-01-01") + + +def test_delete_user_blacklist_success(bot): + bot.delete_user_blacklist(12345) + assert bot.check_user_in_blacklist(12345) is False + + +@pytest.mark.xfail +def test_delete_user_blacklist_not_found(bot): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("INSERT INTO blacklist (user_id, user_name, date_to_unban) VALUES (?, ?, ?)", + (12345, "JohnDoe", "2023-12-26")) + conn.commit() + conn.close() + + result = bot.delete_user_blacklist(514) + assert result is False + + +@pytest.mark.xfail +def test_delete_user_blacklist_error(bot): + __drop_table('blacklist') + + with pytest.raises(sqlite3.Error): + bot.delete_user_blacklist(12345) + + +def test_add_new_message_in_db_success(bot): + result = bot.add_new_message_in_db('hello', 4232187, 5, '2024-01-01') + assert result is None + + +def test_add_new_message_in_db_error(bot): + __drop_table('user_messages') + with pytest.raises(sqlite3.Error): + bot.add_new_message_in_db('hello', 12345, 1, '2024-01-01') + + +def test_update_date_for_user_success(bot): + bot.update_date_for_user('2024-07-15', 12345) + + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("SELECT date_changed FROM our_users WHERE user_id = ?", (12345,)) + new_date = cursor.fetchone()[0] + conn.close() + assert new_date == '2024-07-15' + + +@pytest.mark.xfail +def test_update_date_for_user_error(bot): + __drop_table('our_users') + with pytest.raises(sqlite3.Error): + bot.update_date_for_user('2024-07-15', 12345) + + +def test_is_admin_success(bot): + assert bot.is_admin(12345) is True + + +def test_is_admin_not_found(bot): + assert bot.is_admin(1) is False + + +def test_is_admin_error(bot): + __drop_table('admins') + assert bot.is_admin(1) is None + + +def test_get_user_by_message_id_success(bot): + assert bot.get_user_by_message_id(1) == 12345 + + +@pytest.mark.xfail +def test_get_user_by_message_id_not_found(bot): + assert bot.get_user_by_message_id(124) == None + + +def test_get_user_by_message_id_error(bot): + __drop_table('user_messages') + with pytest.raises(sqlite3.Error): + bot.get_user_by_message_id(14) + + +def test_get_last_users_from_db_success(bot): + users = bot.get_last_users_from_db() + assert users is not None + assert len(users) == 2 + + +def test_get_last_users_from_db_empty(bot): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("DELETE FROM our_users") + conn.commit() + conn.close() + users = bot.get_last_users_from_db() + assert users == [] + assert len(users) == 0 + + +def test_get_user_by_message_id_error(bot): + __drop_table('our_users') + with pytest.raises(sqlite3.Error): + bot.get_last_users_from_db() + + +def test_get_banned_users_from_db_success(bot): + users = bot.get_banned_users_from_db() + assert users[0][0] == '@iban' + assert users[0][1] == 12345 + assert users[0][2] == 'LOL' + assert users[1][0] == '@boris' + assert users[1][1] == 14278 + assert users[1][2] == 'LOL2' + + +def test_get_banned_users_from_db_empty(bot): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("DELETE FROM blacklist") + conn.commit() + conn.close() + users = bot.get_banned_users_from_db() + assert users == [] + assert len(users) == 0 + + +def test_get_banned_users_from_db_error(bot): + __drop_table('blacklist') + with pytest.raises(sqlite3.Error): + bot.get_banned_users_from_db() + + +def test_get_banned_users_from_db_with_limits_success_limit(bot): + users = bot.get_banned_users_from_db_with_limits(0, 1) + assert users[0][0] == '@iban' + assert users[0][1] == 12345 + assert users[0][2] == 'LOL' + assert len(users) == 1 + + +def test_get_banned_users_from_db_with_limits_success_offset(bot): + users = bot.get_banned_users_from_db_with_limits(1, 2) + assert users[0][0] == '@boris' + assert users[0][1] == 14278 + assert users[0][2] == 'LOL2' + assert len(users) == 1 + + +def test_get_banned_users_from_db_with_limits_empty(bot): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute("DELETE FROM blacklist") + conn.commit() + conn.close() + users = bot.get_banned_users_from_db_with_limits(0, 2) + assert users == [] + assert len(users) == 0 + + +def test_get_banned_users_from_db_with_limits_error(bot): + __drop_table('blacklist') + with pytest.raises(sqlite3.Error): + bot.get_banned_users_from_db_with_limits(0, 2) + + +def __drop_table(table_name: str): + conn = sqlite3.connect('test.db') + cursor = conn.cursor() + cursor.execute(f"DROP TABLE {table_name}") + conn.commit() + conn.close() + + +if __name__ == "__main__": + pytest.main() diff --git a/voice_bot.py b/voice_bot.py index d7fd3eb..c0325f7 100644 --- a/voice_bot.py +++ b/voice_bot.py @@ -4,8 +4,7 @@ import sys from pathlib import Path from time import sleep -import db -from db import BotDB +from database.db import BotDB import telebot import random from datetime import datetime