add tests, some fixes

This commit is contained in:
KatykhinAA
2024-07-10 23:18:36 +03:00
parent 7860d4f5c0
commit f2e44ddb29
9 changed files with 1034 additions and 63 deletions

View File

@@ -1,27 +1,27 @@
import sqlite3 import sqlite3
import configparser
import os import os
from datetime import datetime from datetime import datetime
from loguru import logger from loguru import logger
from custom_logger import Logger from logs.custom_logger import Logger
db_logger = Logger(name='db') db_logger = Logger(name='db')
script_dir = os.path.dirname(os.path.abspath(__file__))
config_path = os.path.join(script_dir, 'settings.ini') # Получение абсолютного пути к текущей директории
config = configparser.ConfigParser() current_dir = os.getcwd()
config.read(config_path)
LOGS = config.getboolean('Settings', 'logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
class BotDB: class BotDB:
def __init__(self): def __init__(self, name):
db_file_path = os.path.dirname(os.path.abspath(__file__)) self.db_file = os.path.join(current_dir, name)
db_file = os.path.join(db_file_path, 'tg-bot-database') self.conn = None
self.conn = sqlite3.connect(db_file, check_same_thread=False) self.cursor = None
logger.info(f'Подключен к базе данных: {self.db_file}')
def connect(self):
"""Создание соединения и курсора."""
self.conn = sqlite3.connect(self.db_file)
self.cursor = self.conn.cursor() self.cursor = self.conn.cursor()
logger.info(f'Подключен к базе данных: {db_file_path}')
def create_table(self, sql_script): def create_table(self, sql_script):
""" """
@@ -34,12 +34,15 @@ class BotDB:
None None
""" """
try: try:
cursor = self.conn.cursor() self.connect()
cursor.execute(sql_script) self.cursor.execute(sql_script)
self.conn.commit()
logger.info(f'Таблица создана: {sql_script}') logger.info(f'Таблица создана: {sql_script}')
except Exception as e: except Exception as e:
logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}') logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}')
raise raise
finally:
self.close()
def get_current_version(self): def get_current_version(self):
""" """
@@ -53,16 +56,18 @@ class BotDB:
""" """
logger.info(f'Попытка получения версии миграции') logger.info(f'Попытка получения версии миграции')
try: try:
cursor = self.conn.cursor() self.connect()
cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1") self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
version = cursor.fetchone()[0] version = self.cursor.fetchone()[0]
logger.info(f'Получена текущая версия миграции: {version}') logger.info(f'Получена текущая версия миграции: {version}')
return version return version
except Exception as e: except Exception as e:
logger.error(f'Ошибка при получении текущей версии миграции: {e}') logger.error(f'Ошибка при получении текущей версии миграции: {e}')
raise raise
finally:
self.close()
def update_version(self, new_version: str, script_name: str): def update_version(self, new_version: int, script_name: str):
""" """
Обновляет версию миграций в таблице migrations. Обновляет версию миграций в таблице migrations.
@@ -83,9 +88,9 @@ class BotDB:
""" """
logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}') logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}')
try: try:
self.connect()
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S") today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
cursor = self.conn.cursor() self.cursor.execute(
cursor.execute(
"INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)", "INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
(new_version, script_name, today), (new_version, script_name, today),
) )
@@ -97,6 +102,8 @@ class BotDB:
except Exception as e: except Exception as e:
logger.error(f"Ошибка при обновлении версии: {e}") logger.error(f"Ошибка при обновлении версии: {e}")
raise raise
finally:
self.close()
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда # TODO: Deprecated. Остался только в voice боте, удалить и оттуда
def get_error_message_from_db(self, id: int): def get_error_message_from_db(self, id: int):
@@ -107,14 +114,17 @@ class BotDB:
""" """
# Подключаемся к базе # Подключаемся к базе
try: try:
cursor = self.conn.cursor() self.connect()
cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,)) self.cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
response_from_database = str(cursor.fetchone()[1]) response_from_database = str(self.cursor.fetchone()[1])
return response_from_database return response_from_database
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) logger.error(f"Ошибка при получении сообщения об ошибка voice_bot: {error}")
finally:
self.close()
def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool, language_code: str, date_added: str, def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool,
language_code: str, date_added: str,
date_changed: str): date_changed: str):
""" """
Добавляет нового пользователя в базу данных. Добавляет нового пользователя в базу данных.
@@ -135,6 +145,7 @@ class BotDB:
""" """
logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}") logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}")
try: try:
self.connect()
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', " self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)", "'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name, (user_id, first_name, full_name,
@@ -146,6 +157,8 @@ class BotDB:
logger.error(f"Ошибка при добавлении пользователя в базу: {error}. " logger.error(f"Ошибка при добавлении пользователя в базу: {error}. "
f"Данные пользователя: user_id={user_id}, first_name={first_name}") f"Данные пользователя: user_id={user_id}, first_name={first_name}")
raise raise
finally:
self.close()
def user_exists(self, user_id: int): def user_exists(self, user_id: int):
""" """
@@ -159,6 +172,7 @@ class BotDB:
""" """
logger.info(f"Попытка проверки существования пользователя: user_id={user_id}") logger.info(f"Попытка проверки существования пользователя: user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchall() result = self.cursor.fetchall()
logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}") logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}")
@@ -166,6 +180,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при проверке существования пользователя: {error}") logger.error(f"Ошибка при проверке существования пользователя: {error}")
raise raise
finally:
self.close()
def get_user_id(self, user_id: int): def get_user_id(self, user_id: int):
""" """
@@ -181,6 +197,7 @@ class BotDB:
""" """
logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}") logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone() result = self.cursor.fetchone()
if result: if result:
@@ -189,10 +206,12 @@ class BotDB:
return user_id_db return user_id_db
else: else:
logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return 0 return None
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}") logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}")
raise raise
finally:
self.close()
def get_username(self, user_id: int): def get_username(self, user_id: int):
""" """
@@ -209,6 +228,7 @@ class BotDB:
sqlite3.Error: Если произошла ошибка при выполнении запроса. sqlite3.Error: Если произошла ошибка при выполнении запроса.
""" """
try: try:
self.connect()
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone() result = self.cursor.fetchone()
if result: if result:
@@ -217,10 +237,12 @@ class BotDB:
return username return username
else: else:
logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return "" return None
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении username из базы данных: {error}") logger.error(f"Ошибка при получении username из базы данных: {error}")
raise raise
finally:
self.close()
def get_all_user_id(self): def get_all_user_id(self):
""" """
@@ -235,6 +257,7 @@ class BotDB:
""" """
logger.info(f"Попытка получения всех user_id") logger.info(f"Попытка получения всех user_id")
try: try:
self.connect()
self.cursor.execute("SELECT user_id FROM our_users") self.cursor.execute("SELECT user_id FROM our_users")
fetch_all = self.cursor.fetchall() fetch_all = self.cursor.fetchall()
list_of_users = [user_id[0] for user_id in fetch_all] list_of_users = [user_id[0] for user_id in fetch_all]
@@ -243,6 +266,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении списка user_id из базы данных: {error}") logger.error(f"Ошибка при получении списка user_id из базы данных: {error}")
raise raise
finally:
self.close()
def get_user_first_name(self, user_id: int): def get_user_first_name(self, user_id: int):
""" """
@@ -260,6 +285,7 @@ class BotDB:
""" """
logger.info(f"Попытка получения имени пользователя по user_id={user_id}") logger.info(f"Попытка получения имени пользователя по user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone() result = self.cursor.fetchone()
if result: if result:
@@ -272,6 +298,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}") logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}")
raise raise
finally:
self.close()
def change_name(self, user_id): def change_name(self, user_id):
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. Обновляем поля first_name, date_changed #TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. Обновляем поля first_name, date_changed
@@ -294,11 +322,13 @@ class BotDB:
""" """
logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.") logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.")
try: try:
self.connect()
self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone() result = self.cursor.fetchone()
if result: if result:
has_stickers = result[0] == 1 has_stickers = result[0] == 1
logger.info(f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}") logger.info(
f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}")
return has_stickers return has_stickers
else: else:
logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.") logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
@@ -306,6 +336,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении информации о получении стикеров: {error}") logger.error(f"Ошибка при получении информации о получении стикеров: {error}")
raise raise
finally:
self.close()
def update_info_about_stickers(self, user_id): def update_info_about_stickers(self, user_id):
""" """
@@ -322,6 +354,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}") logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,)) self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,))
self.conn.commit() self.conn.commit()
logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}") logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}")
@@ -329,6 +362,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}") logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}")
raise raise
finally:
self.close()
def get_users_blacklist(self): def get_users_blacklist(self):
""" """
@@ -343,6 +378,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции get_users_blacklist") logger.info(f"Запуск функции get_users_blacklist")
try: try:
self.connect()
self.cursor.execute("SELECT user_id, user_name FROM blacklist") self.cursor.execute("SELECT user_id, user_name FROM blacklist")
fetch_all = self.cursor.fetchall() fetch_all = self.cursor.fetchall()
list_of_users = {user_id: username for user_id, username in fetch_all} list_of_users = {user_id: username for user_id, username in fetch_all}
@@ -351,6 +387,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}") logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}")
raise raise
finally:
self.close()
def get_users_for_unblock_today(self, date_to_unban: str): def get_users_for_unblock_today(self, date_to_unban: str):
""" """
@@ -368,6 +406,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}") logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}")
try: try:
self.connect()
result = self.cursor.execute("SELECT user_id, user_name " result = self.cursor.execute("SELECT user_id, user_name "
"FROM blacklist WHERE date_to_unban = ?", (date_to_unban,)) "FROM blacklist WHERE date_to_unban = ?", (date_to_unban,))
fetch_all = result.fetchall() fetch_all = result.fetchall()
@@ -377,6 +416,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}") logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}")
raise raise
finally:
self.close()
def get_blacklist_users_by_id(self, user_id: int): def get_blacklist_users_by_id(self, user_id: int):
""" """
@@ -394,13 +435,15 @@ class BotDB:
""" """
logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}") logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}")
try: try:
self.connect()
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban " result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban "
"FROM blacklist WHERE user_id = ?", (user_id,)) "FROM blacklist WHERE user_id = ?", (user_id,))
return self.cursor.fetchone() return self.cursor.fetchone()
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}") logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}")
raise raise
finally:
self.close()
def check_user_in_blacklist(self, user_id: int): def check_user_in_blacklist(self, user_id: int):
""" """
@@ -417,12 +460,15 @@ class BotDB:
""" """
logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}") logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone() result = self.cursor.fetchone()
return bool(result) return bool(result)
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}") logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}")
raise raise
finally:
self.close()
def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None): def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None):
""" """
@@ -441,6 +487,7 @@ class BotDB:
logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name}," logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name},"
f" message_for_user={message_for_user}, date_to_unban={date_to_unban}") f" message_for_user={message_for_user}, date_to_unban={date_to_unban}")
try: try:
self.connect()
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name'," result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)", " 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban,)) (user_id, user_name, message_for_user, date_to_unban,))
@@ -450,6 +497,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка при добавлении пользователя в черный список: {error}") logger.error(f"Ошибка при добавлении пользователя в черный список: {error}")
return error return error
finally:
self.close()
def delete_user_blacklist(self, user_id: int): def delete_user_blacklist(self, user_id: int):
""" """
@@ -466,6 +515,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}") logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,)) self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit() self.conn.commit()
logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.") logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.")
@@ -474,6 +524,8 @@ class BotDB:
logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} " logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} "
f"из таблицы blacklist. Ошибка: {str(error)}") f"из таблицы blacklist. Ошибка: {str(error)}")
return False return False
finally:
self.close()
def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str): def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str):
""" """
@@ -494,6 +546,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}") logger.info(f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}")
try: try:
self.connect()
self.cursor.execute( self.cursor.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) " "INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)", "VALUES (?, ?, ?, ?)",
@@ -504,9 +557,12 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка добавления сообщения в базу данных: {error}") logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
raise raise
finally:
self.close()
def update_date_for_user(self, date: str, user_id: int): def update_date_for_user(self, date: str, user_id: int):
""" """
#TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users
Обновляет дату последнего изменения данных пользователя в базе. Обновляет дату последнего изменения данных пользователя в базе.
Args: Args:
@@ -519,6 +575,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}") logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}")
try: try:
self.connect()
self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?", self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?",
(date, user_id,)) (date, user_id,))
self.conn.commit() self.conn.commit()
@@ -527,6 +584,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка обновления даты изменения для пользователя: {error}") logger.error(f"Ошибка обновления даты изменения для пользователя: {error}")
return error return error
finally:
self.close()
def is_admin(self, user_id: int): def is_admin(self, user_id: int):
""" """
@@ -541,13 +600,17 @@ class BotDB:
Raises: Raises:
None: В случае ошибки возвращается None None: В случае ошибки возвращается None
""" """
logger.info(f"Запуск функции is_admin: user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,)) self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone() result = self.cursor.fetchone()
return bool(result) return bool(result)
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка добавления сообщения в базу данных: {error}") logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
return None return None
finally:
self.close()
def add_admin(self, user_id: int, role: str): def add_admin(self, user_id: int, role: str):
""" """
@@ -565,6 +628,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}") logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}")
try: try:
self.connect()
self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role)) self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
self.conn.commit() self.conn.commit()
logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.") logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.")
@@ -572,6 +636,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка добавления пользователя в список администраторов: {error}") logger.error(f"Ошибка добавления пользователя в список администраторов: {error}")
return error return error
finally:
self.close()
def remove_admin(self, user_id: int): def remove_admin(self, user_id: int):
""" """
@@ -586,6 +652,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции remove_admin: user_id={user_id}") logger.info(f"Запуск функции remove_admin: user_id={user_id}")
try: try:
self.connect()
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,)) self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
self.conn.commit() self.conn.commit()
logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.") logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.")
@@ -593,9 +660,12 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}") logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}")
return error return error
finally:
self.connect()
def get_user_by_message_id(self, message_id: int): def get_user_by_message_id(self, message_id: int):
""" """
#TODO: Возвращается TypeError вместо None
Возвращает идентификатор пользователя по идентификатору сообщения. Возвращает идентификатор пользователя по идентификатору сообщения.
Args: Args:
@@ -610,6 +680,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}") logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}")
try: try:
self.connect()
result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,)) result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,))
user = result.fetchone()[0] user = result.fetchone()[0]
logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}") logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}")
@@ -617,13 +688,15 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка получения user_id по message_id: {error}") logger.error(f"Ошибка получения user_id по message_id: {error}")
raise raise
finally:
self.close()
def get_last_users_from_db(self): def get_last_users_from_db(self):
""" """
Возвращает список идентификаторов последних 100 пользователей, обращавшихся в бот. Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот.
Returns: Returns:
list: Список кортежей (full_name, user_id) последних 100 пользователей. list: Список кортежей (full_name, user_id) последних 30 пользователей.
[]: Если в базе данных нет пользователей. []: Если в базе данных нет пользователей.
Raises: Raises:
@@ -631,13 +704,16 @@ class BotDB:
""" """
logger.info("Запуск функции get_last_users_from_db") logger.info("Запуск функции get_last_users_from_db")
try: try:
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 100") self.connect()
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30")
users = result.fetchall() users = result.fetchall()
logger.info(f"Получен список последних 100 пользователей: {users}") logger.info(f"Получен список последних 30 пользователей: {users}")
return users return users
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка получения списка последних пользователей: {error}") logger.error(f"Ошибка получения списка последних пользователей: {error}")
raise raise
finally:
self.close()
def get_banned_users_from_db(self): def get_banned_users_from_db(self):
""" """
@@ -652,6 +728,7 @@ class BotDB:
""" """
logger.info("Запуск функции get_banned_users_from_db") logger.info("Запуск функции get_banned_users_from_db")
try: try:
self.connect()
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist") result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist")
users = result.fetchall() users = result.fetchall()
logger.info(f"Получен список пользователей в черном списке: {users}") logger.info(f"Получен список пользователей в черном списке: {users}")
@@ -659,6 +736,8 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка получения списка пользователей в черном списке: {error}") logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
raise raise
finally:
self.close()
def get_banned_users_from_db_with_limits(self, offset: int, limit: int): def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
""" """
@@ -677,6 +756,7 @@ class BotDB:
""" """
logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}") logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}")
try: try:
self.connect()
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban " result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban "
"FROM blacklist LIMIT ?, ?", (offset, limit,)) "FROM blacklist LIMIT ?, ?", (offset, limit,))
users = result.fetchall() users = result.fetchall()
@@ -685,58 +765,81 @@ class BotDB:
except sqlite3.Error as error: except sqlite3.Error as error:
logger.error(f"Ошибка получения списка пользователей в черном списке: {error}") logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
raise raise
finally:
self.close()
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id): def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД""" """Добавляет информацию о войсе юзера в БД"""
logger.info(
f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id}, date_added = {date_added}")
try: try:
self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", "INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id)) (file_name, author_id, date_added, listen_count, file_id))
return self.conn.commit() self.conn.commit()
logger.info(
f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, date_added = {date_added}")
return None
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def last_date_audio(self): def last_date_audio(self):
"""Получаем дату последнего войса""" """Получаем дату последнего войса"""
try: try:
self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1") "SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
return result.fetchone()[0] return result.fetchone()[0]
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def get_last_user_audio_record(self, user_id): def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя""" """Получает данные о количестве записей пользователя"""
try: try:
self.connect()
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,)) (user_id,))
return bool(len(result.fetchall())) return bool(len(result.fetchall()))
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def get_id_for_audio_record(self, user_id): def get_id_for_audio_record(self, user_id):
"""Получает ID аудио сообщения пользователя""" """Получает ID аудио сообщения пользователя"""
try: try:
self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", "SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,)) (user_id,))
return result.fetchone()[0] return result.fetchone()[0]
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def get_path_for_audio_record(self, user_id): def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла""" """Получает данные о названии файла"""
try: try:
self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", "SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,)) (user_id,))
return result.fetchone()[0] return result.fetchone()[0]
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def check_listen_audio(self, user_id): def check_listen_audio(self, user_id):
"""Проверяет прослушано ли аудио пользователем""" """Проверяет прослушано ли аудио пользователем"""
try: try:
self.connect()
query_listen_audio = self.cursor.execute( query_listen_audio = self.cursor.execute(
"""SELECT l.file_name """SELECT l.file_name
FROM audio_message_reference a FROM audio_message_reference a
@@ -754,17 +857,25 @@ class BotDB:
return new_sign return new_sign
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def mark_listened_audio(self, file_name, user_id): def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя.""" """Отмечает аудио прослушанным для конкретного пользователя."""
try: try:
self.connect()
result = self.cursor.execute( result = self.cursor.execute(
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", "INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1)) (file_name, user_id, 1))
return self.conn.commit() return self.conn.commit()
except sqlite3.Error as error: except sqlite3.Error as error:
print(error) print(error)
finally:
self.close()
def close(self): def close(self):
"""Закрываем соединение с БД""" """Закрытие соединения и курсора."""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close() self.conn.close()

View File

@@ -1,22 +1,21 @@
import datetime import datetime
import os import os
from loguru import logger import loguru
class Logger: class Logger:
def __init__(self, name): def __init__(self, name):
self.logger = logger.bind(name=name) self.logger = loguru.logger.bind(name=name)
# Получение сегодняшней даты для имени файла # Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d') today = datetime.date.today().strftime('%Y-%m-%d')
# Создание папки для логов # Создание папки для логов
current_dir = os.path.dirname(os.path.abspath(__file__)) current_dir = os.path.dirname(os.path.abspath(__file__))
logs_dir = os.path.join(current_dir, 'logs') if not os.path.exists(current_dir):
if not os.path.exists(logs_dir):
# Если не существует, создаем ее # Если не существует, создаем ее
os.makedirs(logs_dir) os.makedirs(current_dir)
filename = f'{logs_dir}/helper_bot_{today}.log' filename = f'{current_dir}/helper_bot_{today}.log'
# Настройка формата логов # Настройка формата логов
self.logger.add( self.logger.add(

20
main.py
View File

@@ -4,9 +4,7 @@ import sys
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
from enum import Enum from enum import Enum
from typing import Any from database.db import BotDB
from apscheduler.schedulers.background import BackgroundScheduler
from db import BotDB
import telebot import telebot
import random import random
from datetime import datetime, timedelta from datetime import datetime, timedelta
@@ -37,7 +35,7 @@ LOGS = config.getboolean('Settings', 'logs')
TEST = config.getboolean('Settings', 'test') TEST = config.getboolean('Settings', 'test')
# Инициализируем бота и базку # Инициализируем бота и базку
BotDB = BotDB() BotDB = BotDB(name='tg-bot-database')
class State(Enum): class State(Enum):
@@ -134,10 +132,17 @@ class TelegramHelperBot:
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Выйти из чата") item1 = types.KeyboardButton("Выйти из чата")
markup.add(item1) markup.add(item1)
message_id = 0
try:
message_id = message.reply_to_message.id message_id = message.reply_to_message.id
except AttributeError:
self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!')
message_from_admin = message.text message_from_admin = message.text
try:
chat_id = BotDB.get_user_by_message_id(message_id) chat_id = BotDB.get_user_by_message_id(message_id)
self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) self.bot.send_message(chat_id, message_from_admin, reply_markup=markup)
except TypeError:
self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.')
# Админка # Админка
@self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline'])
@@ -212,6 +217,7 @@ class TelegramHelperBot:
def unban_notifier(self): def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY # Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now() current_date = datetime.now()
print('Мы в функции unban_notifier')
today = current_date.strftime("%d-%m-%Y") today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей # Получение списка разблокированных пользователей
unblocked_users = BotDB.get_users_for_unblock_today(today) unblocked_users = BotDB.get_users_for_unblock_today(today)
@@ -369,7 +375,7 @@ class TelegramHelperBot:
) )
current_date = datetime.now() current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S") date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_message_in_db(message.text, message.message_id + 1, message.from_user.id, date) BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(self.__get_first_name(message), 'QUESTION') question = messages.get_message(self.__get_first_name(message), 'QUESTION')
markup = self.get_reply_keyboard(message) markup = self.get_reply_keyboard(message)
self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK, self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK,
@@ -543,7 +549,7 @@ class TelegramHelperBot:
return formatted_date return formatted_date
@staticmethod @staticmethod
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[Any, Any]], callback: str): def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str):
""" """
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
@@ -636,6 +642,6 @@ if __name__ == "__main__":
bot.start() bot.start()
#scheduler = BackgroundScheduler() #scheduler = BackgroundScheduler()
#scheduler.add_job(bot.unban_notifier(), 'cron', hour=0, minute=0) #scheduler.add_job(bot.unban_notifier(), 'cron', hour=22, minute=9)
#scheduler.start() #scheduler.start()

View File

@@ -1,8 +1,16 @@
import os import os
from db import BotDB from database.db import BotDB
# Получаем текущий рабочий каталог
current_dir = os.path.dirname(os.path.abspath(__file__))
BotDB = BotDB() # Переходим на уровень выше, чтобы выйти из папки migrations/
parent_dir = os.path.dirname(current_dir)
# Строим путь до файла
tg_bot_database_path = os.path.join(parent_dir, "tg-bot-database")
BotDB = BotDB(f'{tg_bot_database_path}')
def get_filename(): def get_filename():

View File

@@ -1,7 +1,7 @@
import os import os
from db import BotDB from database.db import BotDB
BotDB = BotDB() BotDB = BotDB('tg-bot-database')
def get_filename(): def get_filename():

8
pytest.ini Normal file
View File

@@ -0,0 +1,8 @@
[pytest]
pythonpath = .
python_files = test_*.py *_test.py
python_functions = test_*
testpaths = tests
[report]
omit = *myenv/*, custom_logger.py, *venv/*, tests/*

View File

@@ -1,3 +1,18 @@
pyTelegramBotAPI APScheduler==3.10.4
APScheduler~=3.10.4 certifi==2024.7.4
loguru~=0.7.2 charset-normalizer==3.3.2
coverage==7.5.4
exceptiongroup==1.2.1
idna==3.7
iniconfig==2.0.0
loguru==0.7.2
packaging==24.1
pluggy==1.5.0
pyTelegramBotAPI==4.20.0
pytest==8.2.2
pytz==2024.1
requests==2.32.3
six==1.16.0
tomli==2.0.1
tzlocal==5.2
urllib3==2.2.2

825
tests/test_db.py Normal file
View File

@@ -0,0 +1,825 @@
import os
from datetime import datetime
import pytest
import sqlite3
from database.db import BotDB
@pytest.fixture
def bot():
"""Фикстура для создания объекта BotDB."""
return BotDB("test.db")
@pytest.fixture(autouse=True, )
def setup_db():
"""Фикстура для создания всей базы перед каждым тестом."""
# Mock data 1st user
user_id = 12345
first_name = "Иван"
full_name = "Иван Иванович"
username = "@iban"
message_text = 'Hello, planet'
message_id = 1
message_for_user = "LOL"
has_stickers = 0
# Mock data 2nd user
user_id_2 = 14278
first_name_2 = "Борис"
full_name_2 = "Борис Петрович"
username_2 = "@boris"
message_text_2 = 'Hello, world'
message_id_2 = 2
message_for_user_2 = "LOL2"
has_stickers_2 = 1
# Other data
date = "2024-07-10"
next_date = "2024-07-11"
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS "admins" (
user_id INTEGER NOT NULL,
"role" TEXT
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS "audio_message_reference"
(
"id" INTEGER NOT NULL UNIQUE,
"file_name" TEXT NOT NULL UNIQUE,
"author_id" INTEGER NOT NULL,
"date_added" DATE NOT NULL,
"listen_count" INTEGER NOT NULL,
"file_id" INTEGER NOT NULL,
PRIMARY KEY ("id")
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS "blacklist"
(
"user_id" INTEGER NOT NULL UNIQUE,
"user_name" INTEGER,
"message_for_user" INTEGER,
"date_to_unban" INTEGER
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS "messages" (
"ID" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
"Message" TEXT NOT NULL,
"type" INTEGER
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS "our_users" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT UNIQUE,
"user_id" INTEGER NOT NULL UNIQUE,
"first_name" STRING,
"full_name" STRING,
"username" STRING,
"is_bot" BOOLEAN,
"language_code" STRING,
"has_stickers" INTEGER NOT NULL DEFAULT 0,
"date_added" DATE NOT NULL,
"date_changed" DATE NOT NULL
, state_user TEXT(20));
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS user_messages (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
message_text TEXT,
user_id INTEGER,
message_id INTEGER NOT NULL,
date TEXT
);
""")
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT);
""")
cursor.execute("""
CREATE TABLE migrations (
version INTEGER PRIMARY KEY NOT NULL,
script_name TEXT NOT NULL,
created_at TEXT
);
""")
#blacklist mock data
cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
(user_id, username, message_for_user, next_date))
cursor.execute("INSERT INTO blacklist (user_id, user_name, message_for_user, date_to_unban) VALUES (?, ?, ?, ?)",
(user_id_2, username_2, message_for_user_2, date))
#our_users mock data
cursor.execute(
"INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)"
" VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id, first_name, full_name, username, date, date, has_stickers)
)
cursor.execute(
"INSERT INTO our_users (user_id, first_name, full_name, username, date_added, date_changed, has_stickers)"
" VALUES (?, ?, ?, ?, ?, ?, ?)", (user_id_2, first_name_2, full_name_2, username_2, date, date, has_stickers_2)
)
#messages mock data
cursor.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)",
(message_text, user_id, message_id, date))
cursor.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)",
(message_text_2, user_id_2, message_id_2, date))
#mock admins
cursor.execute(
"INSERT INTO admins (user_id, role) "
"VALUES (?, ?)",
(user_id, 'creator'))
conn.commit()
conn.close()
yield
os.remove('test.db')
def test_bot_init(bot):
"""Проверяет, что объект BotDB инициализируется с правильным именем файла."""
assert bot.db_file == os.path.join(os.getcwd(), "test.db")
# Проверьте, что соединения с базой данных нет, так как оно не устанавливается в init
assert bot.conn is None
assert bot.cursor is None
def test_bot_connect(bot):
"""Проверяет, что метод connect создает подключение к базе данных."""
bot.connect()
assert bot.conn is not None
assert bot.cursor is not None
bot.close()
@pytest.mark.xfail
def test_bot_close(bot):
"""Проверяет, что метод close закрывает подключение к базе данных."""
bot.connect()
assert bot.conn is not None
assert bot.cursor is not None
bot.close()
assert bot.conn is None
assert bot.cursor is None
def test_create_table_success(bot):
sql_script = 'CREATE TABLE test_table (id INTEGER PRIMARY KEY);'
bot.create_table(sql_script)
# Проверяем, что таблица создана
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='test_table'")
result = cursor.fetchone()
conn.close()
assert result is not None
def test_create_table_error(bot):
sql_script = 'CREATE TABLE test_table (id INTEGER PRIMARY KEY);'
bot.create_table(sql_script)
with pytest.raises(sqlite3.OperationalError):
bot.create_table(sql_script)
def test_get_current_version_success(bot):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO migrations (version, script_name) VALUES (123, 'test')")
conn.commit()
conn.close()
# Вызываем функцию и проверяем результат
version = bot.get_current_version()
assert version == 123
def test_get_current_version_error(bot):
__drop_table('migrations')
with pytest.raises(sqlite3.OperationalError):
bot.get_current_version()
def test_update_version_success(bot):
# Вызываем функцию update_version
new_version = 124
script_name = "migration_script.sql"
bot.update_version(new_version, script_name)
# Проверяем, что данные записаны в таблицу
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM migrations WHERE version = ?", (new_version,))
result = cursor.fetchone()
conn.close()
assert result is not None
assert result[0] == new_version
assert result[1] == script_name
assert result[2] == datetime.now().strftime("%d-%m-%Y %H:%M:%S")
def test_update_version_integrity_error(bot):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO migrations (version, script_name) VALUES (123, 'test')")
conn.commit()
conn.close()
# Пытаемся обновить версию с уже существующим значением
with pytest.raises(sqlite3.IntegrityError):
bot.update_version(123, "script_2.sql")
def test_update_version_error(bot):
__drop_table('migrations')
with pytest.raises(sqlite3.OperationalError):
bot.update_version(123, "script_2.sql")()
def test_add_new_user_in_db(bot):
"""Проверяет добавление нового пользователя в базу данных."""
user_id = 50
first_name = "Петр"
full_name = "Петр Иванов"
username = "@petr_ivanov"
is_bot = False
language_code = "ru"
date_added = "2024-07-09"
date_changed = "2024-07-09"
# Вызываем функцию add_new_user_in_db
bot.add_new_user_in_db(
user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed
)
# Проверяем наличие записи в базе данных
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM our_users WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
conn.close()
assert result is not None
assert result[1] == user_id
assert result[2] == first_name
assert result[3] == full_name
assert result[4] == username
assert result[5] == is_bot
assert result[6] == language_code
assert result[8] == date_added
assert result[9] == date_changed
def test_add_new_user_in_db_duplicate_user_id(bot, setup_db):
"""Проверяет поведение при попытке добавить пользователя с уже существующим user_id."""
user_id = 12345
# Попытка добавить пользователя с тем же user_id
with pytest.raises(sqlite3.IntegrityError):
bot.add_new_user_in_db(
user_id, "Марина", "Марина Альфредовна", "marina", False, "bg", "2024-07-09", "2024-07-09"
)
def test_add_new_user_in_db_empty_first_name(bot):
""" Проверяет добавление пользователя с пустым именем (first_name) """
user_id = 43
first_name = "" # Пустое имя
full_name = "Boris Petrov"
username = "@boris"
is_bot = False
language_code = "fr"
date_added = "2024-07-09"
date_changed = "2024-07-09"
# Вызываем функцию add_new_user_in_db
bot.add_new_user_in_db(
user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed
)
# Проверяем наличие записи в базе данных
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute(f"SELECT * FROM our_users WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
conn.close()
assert result is not None
assert result[1] == user_id
assert result[2] == first_name
assert result[3] == full_name
assert result[4] == username
assert result[5] == is_bot
assert result[6] == language_code
assert result[8] == date_added
assert result[9] == date_changed
def test_user_exists_found(bot):
"""Проверяет, что функция возвращает True, если пользователь найден."""
user_id = 12345
# Проверяем наличие записи в базе данных
assert bot.user_exists(user_id) is True
def test_user_exists_not_found(bot):
"""Проверяет, что функция возвращает False, если пользователь не найден."""
user_id = 99999
assert bot.user_exists(user_id) is False
def test_user_exists_error(bot):
"""Проверяет, что функция возвращает ошибки"""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.user_exists(12345)
def test_get_user_id_found(bot):
"""Проверяет, что функция возвращает ID пользователя, если он найден."""
user_id = 12345
# Проверяем, что возвращается правильный ID из базы
user_id_db = bot.get_user_id(user_id)
assert user_id_db == 1
def test_get_user_id_not_found(bot, setup_db):
"""Проверяет, что функция возвращает None, если пользователь не найден."""
user_id = 99999
assert bot.get_user_id(user_id) is None
def test_get_user_id_error(bot):
"""Проверяет, что функция обрабатывает некорректный user_id."""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.get_user_id(12345)
def test_get_username_found(bot):
"""Проверяет, что функция возвращает username пользователя, если он найден."""
user_id = 12345
username = "@iban"
# Проверяем, что возвращается правильный username из базы
username_db = bot.get_username(user_id)
assert username_db == username
def test_get_username_not_found(bot, setup_db):
"""Проверяет, что функция возвращает None, если пользователь не найден."""
user_id = 99999
assert bot.get_username(user_id) is None
def test_get_username_error(bot):
"""Проверяет, что функция возвращает ошибку"""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.get_username(12345)
def test_get_all_user_id_empty(bot):
"""Проверяет, что функция возвращает пустой список, если в базе нет пользователей."""
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM our_users")
conn.commit()
conn.close()
# Проверяем наличие записей в базе данных
user_ids = bot.get_all_user_id()
assert user_ids == []
def test_get_all_user_id_non_empty(bot, setup_db):
"""Проверяет, что функция возвращает список всех user_id из базы данных."""
# Проверяем наличие записи в базе данных
user_ids = bot.get_all_user_id()
assert user_ids == [12345, 14278] # Проверяем, что в списке два ожидаемых user_id
def test_get_all_user_id_error(bot):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.get_all_user_id()
def test_get_user_first_name_found(bot):
"""Проверяет, что функция возвращает имя пользователя, если он найден."""
user_id = 12345
first_name = bot.get_user_first_name(user_id)
assert first_name == "Иван"
def test_get_user_first_name_not_found(bot, setup_db):
"""Проверяет, что функция возвращает None, если пользователь не найден."""
user_id = 99999
assert bot.get_user_first_name(user_id) is None
@pytest.mark.xfail
def test_get_user_first_name_invalid_user_id(bot):
"""Проверяет, что функция обрабатывает некорректный user_id."""
with pytest.raises(sqlite3.Error):
bot.get_user_first_name("invalid_user_id") # Передача строки
def test_get_user_first_name_error(bot):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.get_user_first_name(12345)
def test_get_info_about_stickers_found_received(bot):
"""Проверяет, что функция возвращает True, если пользователь получил стикеры."""
user_id = 14278
assert bot.get_info_about_stickers(user_id) is True
def test_get_info_about_stickers_found_not_received(bot, setup_db):
"""Проверяет, что функция возвращает False, если пользователь не получил стикеры."""
user_id = 12345
assert bot.get_info_about_stickers(user_id) is False
@pytest.mark.xfail
def test_get_info_about_stickers_not_found(bot, setup_db):
"""Проверяет, что функция возвращает None, если пользователь не найден."""
user_id = 99999
assert bot.get_info_about_stickers(user_id) is None
@pytest.mark.xfail
def test_get_info_about_stickers_invalid_user_id(bot):
"""Проверяет, что функция обрабатывает некорректный user_id."""
with pytest.raises(sqlite3.Error):
bot.get_info_about_stickers("invalid_user_id")
def test_get_info_about_stickers_error(bot):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.get_info_about_stickers(12345)
def test_update_info_about_stickers_success(bot):
"""Проверяет, что функция успешно обновляет информацию о получении стикеров."""
user_id = 12345
bot.update_info_about_stickers(user_id)
# Проверяем, что информация обновлена
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
conn.close()
assert result[0] == 1
def test_update_info_about_stickers_not_found(bot):
"""Проверяет, что функция не вызывает ошибки, если пользователь не найден."""
user_id = 99999
bot.update_info_about_stickers(user_id)
# Проверяем, что база данных не изменилась
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM our_users WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
conn.close()
assert result[0] == 0
def test_update_info_about_stickers_error(bot):
"""Проверяет, что функция вызывает ошибки"""
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.update_info_about_stickers(12345)
def test_get_users_blacklist_empty(bot):
"""Проверяет, что функция возвращает пустой словарь, если в черном списке нет пользователей."""
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM blacklist")
conn.commit()
conn.close()
blacklist = bot.get_users_blacklist()
assert blacklist == {}
def test_get_users_blacklist_non_empty(bot):
"""Проверяет, что функция возвращает словарь с пользователями из черного списка."""
blacklist = bot.get_users_blacklist()
assert blacklist == {12345: "@iban", 14278: "@boris"}
def test_get_users_blacklist_error(bot):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.get_users_blacklist()
def test_get_blacklist_users_by_id_found(bot, setup_db):
"""Проверяет, что функция возвращает информацию о пользователе, если он найден в черном списке."""
user_id = 12345
result = bot.get_blacklist_users_by_id(user_id)
assert result == (12345, "@iban", "LOL", "2024-07-11")
def test_get_blacklist_users_by_id_not_found(bot, setup_db):
"""Проверяет, что функция возвращает None, если пользователь не найден в черном списке."""
user_id = 99999
assert bot.get_blacklist_users_by_id(user_id) is None
@pytest.mark.xfail
def test_get_blacklist_users_by_id_invalid_user_id(bot):
"""Проверяет, что функция обрабатывает некорректный user_id."""
with pytest.raises(sqlite3.Error):
bot.get_blacklist_users_by_id("invalid_user_id") # Передача строки
def test_get_blacklist_users_by_id_error(bot):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.get_blacklist_users_by_id(12345)
def test_get_users_for_unblock_today_found(bot):
"""Проверяет, что функция возвращает словарь с пользователями, у которых истекает блокировка сегодня."""
date_to_unban = "2024-07-11"
result = bot.get_users_for_unblock_today(date_to_unban)
assert result == {12345: "@iban"}
def test_get_users_for_unblock_today_not_found(bot, setup_db):
"""Проверяет, что функция возвращает пустой словарь, если сегодня нет пользователей, у которых истекает блокировка."""
date_to_unban = "2024-07-12"
result = bot.get_users_for_unblock_today(date_to_unban)
assert result == {}
def test_get_users_for_unblock_today_error(bot):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.get_users_for_unblock_today("2023-12-26")
def test_check_user_in_blacklist_found(bot, setup_db):
"""Проверяет, что функция возвращает True, если пользователь найден в черном списке."""
user_id = 12345
bot.set_user_blacklist(user_id, "JohnDoe") # Добавляем пользователя в черный список
assert bot.check_user_in_blacklist(user_id) is True
def test_check_user_in_blacklist_not_found(bot, setup_db):
"""Проверяет, что функция возвращает False, если пользователь не найден в черном списке."""
user_id = 99999
assert bot.check_user_in_blacklist(user_id) is False
def test_check_user_in_blacklist_error(bot, setup_db):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.check_user_in_blacklist(12345)
def test_set_user_blacklist_success(bot):
"""Проверяет, что функция успешно добавляет пользователя в черный список."""
user_id = 11
user_name = "Гриша"
message_for_user = "Лови бан!"
date_to_unban = datetime.now().strftime("%Y-%m-%d") # Текущая дата
assert bot.set_user_blacklist(user_id, user_name, message_for_user, date_to_unban) is None
# Проверяем, что запись добавлена в базу
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT * FROM blacklist WHERE user_id = ?", (user_id,))
result = cursor.fetchone()
conn.commit()
conn.close()
assert result is not None
assert result[1] == user_name
assert result[2] == message_for_user
assert result[3] == date_to_unban
@pytest.mark.xfail
def test_set_user_blacklist_duplicate_user_id(bot, setup_db):
"""Проверяет, что функция не добавляет дубликат user_id в черный список."""
user_id = 12345
bot.set_user_blacklist(user_id, "JohnDoe")
with pytest.raises(sqlite3.IntegrityError):
bot.set_user_blacklist(user_id, "JaneSmith") # Попытка добавить дубликат
@pytest.mark.xfail
def test_set_user_blacklist_error(bot, setup_db):
"""Проверяет, что функция вызывает sqlite3. Error при ошибке запроса."""
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.set_user_blacklist(12345, "JohnDoe", "You are banned!", "2024-01-01")
def test_delete_user_blacklist_success(bot):
bot.delete_user_blacklist(12345)
assert bot.check_user_in_blacklist(12345) is False
@pytest.mark.xfail
def test_delete_user_blacklist_not_found(bot):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("INSERT INTO blacklist (user_id, user_name, date_to_unban) VALUES (?, ?, ?)",
(12345, "JohnDoe", "2023-12-26"))
conn.commit()
conn.close()
result = bot.delete_user_blacklist(514)
assert result is False
@pytest.mark.xfail
def test_delete_user_blacklist_error(bot):
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.delete_user_blacklist(12345)
def test_add_new_message_in_db_success(bot):
result = bot.add_new_message_in_db('hello', 4232187, 5, '2024-01-01')
assert result is None
def test_add_new_message_in_db_error(bot):
__drop_table('user_messages')
with pytest.raises(sqlite3.Error):
bot.add_new_message_in_db('hello', 12345, 1, '2024-01-01')
def test_update_date_for_user_success(bot):
bot.update_date_for_user('2024-07-15', 12345)
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("SELECT date_changed FROM our_users WHERE user_id = ?", (12345,))
new_date = cursor.fetchone()[0]
conn.close()
assert new_date == '2024-07-15'
@pytest.mark.xfail
def test_update_date_for_user_error(bot):
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.update_date_for_user('2024-07-15', 12345)
def test_is_admin_success(bot):
assert bot.is_admin(12345) is True
def test_is_admin_not_found(bot):
assert bot.is_admin(1) is False
def test_is_admin_error(bot):
__drop_table('admins')
assert bot.is_admin(1) is None
def test_get_user_by_message_id_success(bot):
assert bot.get_user_by_message_id(1) == 12345
@pytest.mark.xfail
def test_get_user_by_message_id_not_found(bot):
assert bot.get_user_by_message_id(124) == None
def test_get_user_by_message_id_error(bot):
__drop_table('user_messages')
with pytest.raises(sqlite3.Error):
bot.get_user_by_message_id(14)
def test_get_last_users_from_db_success(bot):
users = bot.get_last_users_from_db()
assert users is not None
assert len(users) == 2
def test_get_last_users_from_db_empty(bot):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM our_users")
conn.commit()
conn.close()
users = bot.get_last_users_from_db()
assert users == []
assert len(users) == 0
def test_get_user_by_message_id_error(bot):
__drop_table('our_users')
with pytest.raises(sqlite3.Error):
bot.get_last_users_from_db()
def test_get_banned_users_from_db_success(bot):
users = bot.get_banned_users_from_db()
assert users[0][0] == '@iban'
assert users[0][1] == 12345
assert users[0][2] == 'LOL'
assert users[1][0] == '@boris'
assert users[1][1] == 14278
assert users[1][2] == 'LOL2'
def test_get_banned_users_from_db_empty(bot):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM blacklist")
conn.commit()
conn.close()
users = bot.get_banned_users_from_db()
assert users == []
assert len(users) == 0
def test_get_banned_users_from_db_error(bot):
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.get_banned_users_from_db()
def test_get_banned_users_from_db_with_limits_success_limit(bot):
users = bot.get_banned_users_from_db_with_limits(0, 1)
assert users[0][0] == '@iban'
assert users[0][1] == 12345
assert users[0][2] == 'LOL'
assert len(users) == 1
def test_get_banned_users_from_db_with_limits_success_offset(bot):
users = bot.get_banned_users_from_db_with_limits(1, 2)
assert users[0][0] == '@boris'
assert users[0][1] == 14278
assert users[0][2] == 'LOL2'
assert len(users) == 1
def test_get_banned_users_from_db_with_limits_empty(bot):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute("DELETE FROM blacklist")
conn.commit()
conn.close()
users = bot.get_banned_users_from_db_with_limits(0, 2)
assert users == []
assert len(users) == 0
def test_get_banned_users_from_db_with_limits_error(bot):
__drop_table('blacklist')
with pytest.raises(sqlite3.Error):
bot.get_banned_users_from_db_with_limits(0, 2)
def __drop_table(table_name: str):
conn = sqlite3.connect('test.db')
cursor = conn.cursor()
cursor.execute(f"DROP TABLE {table_name}")
conn.commit()
conn.close()
if __name__ == "__main__":
pytest.main()

View File

@@ -4,8 +4,7 @@ import sys
from pathlib import Path from pathlib import Path
from time import sleep from time import sleep
import db from database.db import BotDB
from db import BotDB
import telebot import telebot
import random import random
from datetime import datetime from datetime import datetime