1170 lines
58 KiB
Python
1170 lines
58 KiB
Python
import os
|
||
import sqlite3
|
||
from datetime import datetime
|
||
|
||
from logs.custom_logger import logger
|
||
|
||
|
||
class BotDB:
|
||
def __init__(self, current_dir, name):
|
||
self.db_file = os.path.join(current_dir, name)
|
||
self.conn = None
|
||
self.cursor = None
|
||
self.logger = logger
|
||
self.logger.info(f'Инициация базы данных: {self.db_file}')
|
||
|
||
def connect(self):
|
||
"""Создание соединения и курсора."""
|
||
self.conn = sqlite3.connect(self.db_file)
|
||
self.cursor = self.conn.cursor()
|
||
|
||
def create_table(self, sql_script):
|
||
"""
|
||
Создает таблицу в базе. Используется в миграциях
|
||
|
||
Args:
|
||
sql_script: DDL скрипт таблицы
|
||
|
||
Returns:
|
||
None
|
||
"""
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute(sql_script)
|
||
self.conn.commit()
|
||
self.logger.info(f'Таблица создана: {sql_script}')
|
||
except Exception as e:
|
||
self.logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}')
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_current_version(self):
|
||
"""
|
||
Возвращает текущую последнюю версию миграции
|
||
|
||
Args:
|
||
None
|
||
|
||
Returns:
|
||
int: Версия последней миграции.
|
||
"""
|
||
self.logger.info(f'Попытка получения версии миграции')
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
|
||
version = self.cursor.fetchone()[0]
|
||
self.logger.info(f'Получена текущая версия миграции: {version}')
|
||
return version
|
||
except Exception as e:
|
||
self.logger.error(f'Ошибка при получении текущей версии миграции: {str(e)}')
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def update_version(self, new_version: int, script_name: str):
|
||
"""
|
||
Обновляет версию миграций в таблице migrations.
|
||
|
||
Добавляет новую запись в таблицу migrations с указанной версией,
|
||
именем скрипта и текущей датой и временем.
|
||
|
||
Args:
|
||
new_version (int): Новая версия миграции
|
||
script_name (str): Имя скрипта миграции
|
||
|
||
Returns:
|
||
None
|
||
|
||
Raises:
|
||
sqlite3. IntegrityError: Если возникает ошибка целостности при вставке
|
||
данных в таблицу migrations.
|
||
Exception: Если возникает любая другая ошибка при обновлении версии.
|
||
"""
|
||
self.logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}')
|
||
try:
|
||
self.connect()
|
||
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
|
||
self.cursor.execute(
|
||
"INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
|
||
(new_version, script_name, today),
|
||
)
|
||
self.conn.commit()
|
||
self.logger.info(f"Версия обновлена: {new_version}, название скрипта: {script_name}")
|
||
except sqlite3.IntegrityError as e:
|
||
self.logger.error(f"Ошибка при обновлении версии: {e}")
|
||
raise
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка при обновлении версии: {e}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда
|
||
def get_error_message_from_db(self, id: int):
|
||
"""
|
||
@deprecated
|
||
Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
|
||
id - идентификатор ошибки
|
||
"""
|
||
# Подключаемся к базе
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
|
||
response_from_database = str(self.cursor.fetchone()[1])
|
||
return response_from_database
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении сообщения об ошибка voice_bot: {error}")
|
||
finally:
|
||
self.close()
|
||
|
||
def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool,
|
||
language_code: str, date_added: str,
|
||
date_changed: str):
|
||
"""
|
||
Добавляет нового пользователя в базу данных.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
first_name (str): Имя пользователя.
|
||
full_name (str): Полное имя пользователя.
|
||
username (str): Username пользователя в Telegram.
|
||
is_bot (bool): Флаг, указывающий, является ли пользователь ботом.
|
||
language_code (str): Код языка пользователя.
|
||
date_added (str): Дата добавления пользователя в базу.
|
||
date_changed (str): Дата последнего изменения данных пользователя.
|
||
|
||
Returns:
|
||
None: Если запись успешно добавлена в базу.
|
||
Exception: Если произошла ошибка при добавлении записи.
|
||
"""
|
||
self.logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
|
||
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||
(user_id, first_name, full_name,
|
||
username, is_bot, language_code, date_added, date_changed))
|
||
self.conn.commit()
|
||
self.logger.info(f"Новый пользователь добавлен в базу: user_id={user_id}, first_name={first_name}")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при добавлении пользователя в базу: {error}. "
|
||
f"Данные пользователя: user_id={user_id}, first_name={first_name}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def user_exists(self, user_id: int):
|
||
"""
|
||
Проверяет, существует ли пользователь в базе данных.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя.
|
||
|
||
Returns:
|
||
bool: True, если пользователь найден, False - иначе.
|
||
"""
|
||
self.logger.info(f"Попытка проверки существования пользователя: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchall()
|
||
self.logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}")
|
||
return bool(len(result))
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при проверке существования пользователя: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_user_id(self, user_id: int):
|
||
"""
|
||
@deprecated
|
||
Возвращает ID пользователя в базе данных по его user_id.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
int: ID пользователя в базе данных.
|
||
None: Если пользователь не найден.
|
||
"""
|
||
self.logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
user_id_db = result[0]
|
||
self.logger.info(f"ID пользователя в базе найден: user_id={user_id}, id_db={user_id_db}")
|
||
return user_id_db
|
||
else:
|
||
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_username(self, user_id: int):
|
||
"""
|
||
Возвращает username пользователя из базы данных по его user_id в Telegram.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
str: Username пользователя.
|
||
None: Если пользователь не найден.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
username = result[0]
|
||
self.logger.info(f"Username пользователя найден: user_id={user_id}, username={username}")
|
||
return username
|
||
else:
|
||
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_user_id_by_username(self, username: str):
|
||
"""
|
||
Возвращает user_id пользователя из базы данных по его user_name в Telegram.
|
||
|
||
Args:
|
||
username (str): Username пользователя.
|
||
|
||
Returns:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
None: Если пользователь не найден.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT user_id FROM our_users WHERE username = ?", (username,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
user_id = result[0]
|
||
self.logger.info(f"User_id пользователя найден: username={username}, user_id={user_id}")
|
||
return user_id
|
||
else:
|
||
self.logger.info(f"Пользователь с username={username} не найден в базе данных.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_full_name_by_id(self, user_id: str):
|
||
"""
|
||
Возвращает full_name пользователя из базы данных по его username в Telegram.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
full_name (str): Username пользователя.
|
||
None: Если пользователь не найден.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
full_name = result[0]
|
||
self.logger.info(f"Username пользователя найден: user_id={user_id}, full_name={full_name}")
|
||
return full_name
|
||
else:
|
||
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_all_user_id(self):
|
||
"""
|
||
Возвращает список всех user_id из базы данных.
|
||
|
||
Returns:
|
||
list: Список user_id.
|
||
[]: Если в базе данных нет пользователей.
|
||
|
||
Raises:
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Попытка получения всех user_id")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT user_id FROM our_users")
|
||
fetch_all = self.cursor.fetchall()
|
||
list_of_users = [user_id[0] for user_id in fetch_all]
|
||
self.logger.info(f"Получен список всех user_id: {list_of_users}")
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении списка user_id из базы данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_user_first_name(self, user_id: int):
|
||
"""
|
||
Возвращает имя пользователя из базы данных по его user_id в Telegram.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
str: Имя пользователя.
|
||
None: Если пользователь не найден.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Попытка получения имени пользователя по user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
first_name = result[0]
|
||
self.logger.info(f"Имя пользователя найдено: user_id={user_id}, first_name={first_name}")
|
||
return first_name
|
||
else:
|
||
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_info_about_stickers(self, user_id: int):
|
||
"""
|
||
Проверяет, получил ли пользователь стикеры.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
bool: True, если пользователь получил стикеры, False - иначе.
|
||
None: Если пользователь не найден.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
if result:
|
||
has_stickers = result[0] == 1
|
||
self.logger.info(
|
||
f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}")
|
||
return has_stickers
|
||
else:
|
||
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении информации о получении стикеров: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def update_info_about_stickers(self, user_id):
|
||
"""
|
||
Обновляет информацию о получении стикеров пользователем.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
None: Если обновление успешно выполнено.
|
||
|
||
Raises:
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,))
|
||
self.conn.commit()
|
||
self.logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_users_blacklist(self):
|
||
"""
|
||
Возвращает список пользователей в черном списке.
|
||
|
||
Returns:
|
||
dict: Словарь, где ключ - user_id, значение - username.
|
||
{}: Если в черном списке нет пользователей.
|
||
|
||
Raises:
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции get_users_blacklist")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT user_id, user_name FROM blacklist")
|
||
fetch_all = self.cursor.fetchall()
|
||
list_of_users = {user_id: username for user_id, username in fetch_all}
|
||
self.logger.info(f"Получен список пользователей в черном списке")
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_users_for_unblock_today(self, date_to_unban: str):
|
||
"""
|
||
Возвращает список пользователей, у которых истекает срок блокировки сегодня.
|
||
|
||
Args:
|
||
date_to_unban (str): Дата разблокировки.
|
||
|
||
Returns:
|
||
dict: Словарь, где ключ - user_id, значение - username.
|
||
{}: Если сегодня нет пользователей, у которых истекает срок блокировки.
|
||
|
||
Raises:
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT user_id, user_name "
|
||
"FROM blacklist WHERE date_to_unban = ?", (date_to_unban,))
|
||
fetch_all = result.fetchall()
|
||
list_of_users = {user_id: username for user_id, username in fetch_all}
|
||
self.logger.info(f"Получен список пользователей для разблокировки сегодня: {list_of_users}")
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_blacklist_users_by_id(self, user_id: int):
|
||
"""
|
||
Возвращает информацию о пользователе в черном списке по user_id.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
tuple: Кортеж (user_id, user_name, message_for_user, date_to_unban).
|
||
None: Если пользователь не найден в черном списке.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban "
|
||
"FROM blacklist WHERE user_id = ?", (user_id,))
|
||
return self.cursor.fetchone()
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def check_user_in_blacklist(self, user_id: int):
|
||
"""
|
||
Проверяет, существует ли запись с данным user_id в blacklist.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
bool: True, если пользователь найден в черном списке, False - иначе.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
self.logger.info(f"Существует ли пользователь: user_id={user_id} Итог: {result}")
|
||
return bool(result)
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None):
|
||
"""
|
||
Добавляет пользователя в черный список.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
user_name (str, optional): Username пользователя. Defaults to None.
|
||
message_for_user (str, optional): Сообщение для пользователя. Defaults to None.
|
||
date_to_unban (datetime, optional): Дата разблокировки. Defaults to None.
|
||
|
||
Returns:
|
||
None: Если добавление в черный список успешно выполнено.
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name},"
|
||
f" message_for_user={message_for_user}, date_to_unban={date_to_unban}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
|
||
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
|
||
(user_id, user_name, message_for_user, date_to_unban,))
|
||
self.conn.commit()
|
||
self.logger.info(f"Пользователь добавлен в черный список: user_id={user_id}")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка при добавлении пользователя в черный список: {error}")
|
||
return error
|
||
finally:
|
||
self.close()
|
||
|
||
def delete_user_blacklist(self, user_id: int):
|
||
"""
|
||
Удаляет пользователя из черного списка.
|
||
|
||
Args:
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
bool: True, если удаление прошло успешно, False - в случае ошибки.
|
||
|
||
Raises:
|
||
None: Ошибки обрабатываются в блоке except, возвращая False.
|
||
"""
|
||
self.logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
|
||
self.conn.commit()
|
||
self.logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.")
|
||
return True
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} "
|
||
f"из таблицы blacklist. Ошибка: {str(error)}")
|
||
return False
|
||
finally:
|
||
self.close()
|
||
|
||
def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str):
|
||
"""
|
||
Добавляет новое сообщение пользователя в базу данных.
|
||
|
||
Args:
|
||
message_text (str): Текст сообщения.
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
message_id (int): Идентификатор сообщения в Telegram.
|
||
date (str): Дата отправки сообщения.
|
||
|
||
Returns:
|
||
None: Если добавление прошло успешно.
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(
|
||
f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute(
|
||
"INSERT INTO user_messages (message_text, user_id, message_id, date) "
|
||
"VALUES (?, ?, ?, ?)",
|
||
(message_text, user_id, message_id, date))
|
||
self.conn.commit()
|
||
self.logger.info(f"Новое сообщение добавлено в базу данных: message_id={message_id}")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_username_and_full_name(self, user_id: int):
|
||
"""
|
||
Получает full_name и username пользователя по ID из базы
|
||
|
||
Args:
|
||
date (str): Новая дата изменения.
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
username (str): username пользователя
|
||
full_name (str): full_name пользователя
|
||
"""
|
||
self.logger.info(
|
||
f"Запуск функции check_username_and_first_name: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
|
||
username = self.cursor.fetchone()[0]
|
||
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
|
||
full_name = self.cursor.fetchone()[0]
|
||
self.logger.info(
|
||
f"Функция check_username_and_first_name успешно отработала: user_id={user_id}, username={username}, full_name={full_name}")
|
||
return username, full_name
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка в функции get_username_and_first_name: {error}")
|
||
return None
|
||
finally:
|
||
self.close()
|
||
|
||
def update_username_and_full_name(self, user_id: int, username: str, full_name: str):
|
||
"""
|
||
Обновляет full_name и username пользователя
|
||
|
||
Args:
|
||
username (str): username пользователя
|
||
full_name (str): full_name пользователя
|
||
user_id (int): Идентификатор пользователя в Telegram
|
||
|
||
Returns:
|
||
True (bool): Если обновления прошли успешно
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(
|
||
f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
|
||
(username, full_name, user_id,))
|
||
self.conn.commit()
|
||
self.logger.info(
|
||
f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены")
|
||
return True
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def update_date_for_user(self, date: str, user_id: int):
|
||
"""
|
||
#TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users
|
||
Обновляет дату последнего изменения данных пользователя в базе.
|
||
|
||
Args:
|
||
date (str): Новая дата изменения.
|
||
user_id (int): Идентификатор пользователя в Telegram.
|
||
|
||
Returns:
|
||
None: Если обновление прошло успешно.
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?",
|
||
(date, user_id,))
|
||
self.conn.commit()
|
||
self.logger.info(f"Дата изменения обновлена для пользователя: user_id={user_id}")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка обновления даты изменения для пользователя: {error}")
|
||
return error
|
||
finally:
|
||
self.close()
|
||
|
||
def is_admin(self, user_id: int):
|
||
"""
|
||
Проверяет, является ли пользователь администратором.
|
||
|
||
Args:
|
||
user_id: ID пользователя Telegram.
|
||
|
||
Returns:
|
||
True, если пользователь администратор, иначе False.
|
||
|
||
Raises:
|
||
None: В случае ошибки возвращается None
|
||
"""
|
||
self.logger.info(f"Запуск функции is_admin: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
return bool(result)
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
|
||
return None
|
||
finally:
|
||
self.close()
|
||
|
||
def add_admin(self, user_id: int, role: str):
|
||
"""
|
||
Добавляет пользователя в список администраторов.
|
||
|
||
Args:
|
||
user_id (int): ID пользователя Telegram.
|
||
role (str): Роль пользователя. Доступные варианты:
|
||
1. creator - создатель
|
||
2. admin - обычная роль
|
||
|
||
Returns:
|
||
None: Если добавление прошло успешно.
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
|
||
self.conn.commit()
|
||
self.logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка добавления пользователя в список администраторов: {error}")
|
||
return error
|
||
finally:
|
||
self.close()
|
||
|
||
def remove_admin(self, user_id: int):
|
||
"""
|
||
Удаляет пользователя из списка администраторов.
|
||
|
||
Args:
|
||
user_id (int): ID пользователя Telegram.
|
||
|
||
Returns:
|
||
None: Если удаление прошло успешно.
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции remove_admin: user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
|
||
self.conn.commit()
|
||
self.logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}")
|
||
return error
|
||
finally:
|
||
self.connect()
|
||
|
||
def get_user_by_message_id(self, message_id: int):
|
||
"""
|
||
#TODO: Возвращается TypeError вместо None
|
||
Возвращает идентификатор пользователя по идентификатору сообщения.
|
||
|
||
Args:
|
||
message_id (int): Идентификатор сообщения в Telegram.
|
||
|
||
Returns:
|
||
int: Идентификатор пользователя.
|
||
None: Если пользователь не найден.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,))
|
||
user = result.fetchone()[0]
|
||
self.logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}")
|
||
return user
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения user_id по message_id: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_last_users_from_db(self):
|
||
"""
|
||
Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот.
|
||
|
||
Returns:
|
||
list: Список кортежей (full_name, user_id) последних 30 пользователей.
|
||
[]: Если в базе данных нет пользователей.
|
||
|
||
Raises:
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info("Запуск функции get_last_users_from_db")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30")
|
||
users = result.fetchall()
|
||
self.logger.info(f"Получен список последних 30 пользователей: {users}")
|
||
return users
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения списка последних пользователей: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_banned_users_from_db(self):
|
||
"""
|
||
Возвращает список идентификаторов пользователей в черном списке бота.
|
||
|
||
Returns:
|
||
list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
|
||
[]: Если в черном списке нет пользователей.
|
||
|
||
Raises:
|
||
sqlite3.Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info("Запуск функции get_banned_users_from_db")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist")
|
||
users = result.fetchall()
|
||
self.logger.info(f"Получен список пользователей в черном списке: {users}")
|
||
return users
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
|
||
"""
|
||
Возвращает список идентификаторов пользователей в черном списке бота с учетом смещения и ограничения.
|
||
|
||
Args:
|
||
offset (int): Смещение для выборки
|
||
limit (int): Ограничение количества возвращаемых записей.
|
||
|
||
Returns:
|
||
list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
|
||
[]: Если в черном списке нет пользователей или количество записей с учетом смещения и ограничения равно 0.
|
||
|
||
Raises:
|
||
sqlite3. Error: Если произошла ошибка при выполнении запроса.
|
||
"""
|
||
self.logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban "
|
||
"FROM blacklist LIMIT ?, ?", (offset, limit,))
|
||
users = result.fetchall()
|
||
self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {users}")
|
||
return users
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_post_content_from_telegram_by_last_id(self, last_post_id: int):
|
||
self.logger.info(
|
||
f"Запуск функции get_post_content_from_telegram_by_last_id, идентификатор поста {last_post_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("""
|
||
SELECT cpft.content_name, cpft.content_type
|
||
FROM post_from_telegram_suggest pft
|
||
JOIN message_link_to_content mltc
|
||
ON pft.message_id = mltc.post_id
|
||
JOIN content_post_from_telegram cpft
|
||
ON cpft.message_id = mltc.message_id
|
||
WHERE pft.helper_text_message_id = ?
|
||
""", (last_post_id,))
|
||
post_content = result.fetchall()
|
||
self.logger.info(
|
||
f"Функция get_post_content_from_telegram_by_last_id получила список контента: {post_content}")
|
||
return post_content
|
||
finally:
|
||
self.close()
|
||
|
||
def get_post_ids_from_telegram_by_last_id(self, last_post_id: int):
|
||
self.logger.info(
|
||
f"Запуск функции get_post_ids_from_telegram_by_last_id, идентификатор поста {last_post_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("""
|
||
SELECT mltc.message_id
|
||
FROM post_from_telegram_suggest pft
|
||
JOIN message_link_to_content mltc
|
||
ON pft.message_id = mltc.post_id
|
||
WHERE pft.helper_text_message_id = ?
|
||
""", (last_post_id,))
|
||
post_ids = result.fetchall()
|
||
self.logger.info(f"Функция get_post_ids_from_telegram_by_last_id "
|
||
f"получила идентификаторы сообщений: {post_ids}")
|
||
return post_ids
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции get_post_ids_from_telegram_by_last_id {str(e)}")
|
||
return False
|
||
finally:
|
||
self.close()
|
||
|
||
def get_post_text_from_telegram_by_last_id(self, last_post_id: int):
|
||
self.logger.info(f"Запуск функции get_post_text_from_telegram_by_last_id, идентификатор поста {last_post_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT text "
|
||
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
|
||
(last_post_id,))
|
||
text = result.fetchone()[0]
|
||
self.logger.info(f"Функция get_post_text_from_telegram_by_last_id получила text")
|
||
return text
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции get_post_text_from_telegram_by_last_id {str(e)}")
|
||
|
||
def get_author_id_by_message_id(self, message_id: int):
|
||
self.logger.info(f"Запуск функции get_author_id_by_message_id, идентификатор поста {message_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT author_id "
|
||
"FROM post_from_telegram_suggest WHERE message_id = ?",
|
||
(message_id,))
|
||
author_id = result.fetchone()[0]
|
||
self.logger.info(f"Функция get_author_id_by_message_id получила author_id {author_id}")
|
||
return author_id
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции get_author_id_by_message_id {str(e)}")
|
||
|
||
def get_author_id_by_helper_message_id(self, helper_text_message_id: int):
|
||
self.logger.info(f"Запуск функции get_author_id_by_helper_message_id, идентификатор поста "
|
||
f"{helper_text_message_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute("SELECT author_id "
|
||
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
|
||
(helper_text_message_id,))
|
||
author_id = result.fetchone()[0]
|
||
self.logger.info(f"Функция get_author_id_by_helper_message_id получила author_id {author_id}")
|
||
return author_id
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции get_author_id_by_helper_message_id {str(e)}")
|
||
|
||
def add_post_content_in_db(self, post_id: int, message_id: int, content_name: str, type_content: str):
|
||
self.logger.info(
|
||
f"Запуск функции add_post_content_in_db: post_id={post_id}, message_id={message_id}, "
|
||
f"content_name={content_name}, content_type={type_content}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute(
|
||
"INSERT INTO message_link_to_content (post_id, message_id)"
|
||
"VALUES (?, ?)", (post_id, message_id))
|
||
self.conn.commit()
|
||
self.cursor.execute(
|
||
"INSERT INTO content_post_from_telegram (message_id, content_name, content_type)"
|
||
"VALUES (?, ?, ?)", (message_id, content_name, type_content))
|
||
self.conn.commit()
|
||
self.logger.info(f"Функция add_post_content_in_db отработала успешно")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции add_post_content_in_db при добавлении поста в базу данных: {e}")
|
||
return False
|
||
|
||
def add_post_in_db(self, message_id: int, text: str, author_id: int):
|
||
self.logger.info(
|
||
f"Запуск функции add_post_in_db: message_id={message_id}, "
|
||
f"author_id={author_id}")
|
||
try:
|
||
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
|
||
self.connect()
|
||
self.cursor.execute(
|
||
"INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)"
|
||
"VALUES (?, ?, ?, ?)", (message_id, text, author_id, today))
|
||
self.conn.commit()
|
||
self.logger.info(f"Функция add_post_in_db отработала успешно")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции add_post_in_db при добавлении поста в базу данных: {e}")
|
||
return False
|
||
|
||
def update_helper_message_in_db(self, message_id: int, helper_message_id: int):
|
||
self.logger.info(
|
||
f"Запуск функции update_helper_message_in_db: message_id={message_id}, "
|
||
f"helper_message_id={helper_message_id}")
|
||
try:
|
||
self.connect()
|
||
self.cursor.execute(
|
||
"UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
|
||
(helper_message_id, message_id,))
|
||
self.conn.commit()
|
||
self.logger.info(f"Функция update_helper_message_in_db отработала успешно")
|
||
return True
|
||
except Exception as e:
|
||
self.logger.error(f"Ошибка в функции update_helper_message_in_db при добавлении поста в базу данных: {e}")
|
||
return False
|
||
|
||
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
|
||
"""Добавляет информацию о войсе юзера в БД"""
|
||
self.logger.info(
|
||
f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id},"
|
||
f" date_added = {date_added}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute(
|
||
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) "
|
||
"VALUES (?, ?, ?, ?, ?)",
|
||
(file_name, author_id, date_added, listen_count, file_id))
|
||
self.conn.commit()
|
||
self.logger.info(
|
||
f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, "
|
||
f"date_added = {date_added}")
|
||
return None
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def last_date_audio(self):
|
||
"""Получаем дату последнего войса"""
|
||
self.logger.info(
|
||
f"Запуск функции last_date_audio")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute(
|
||
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
|
||
last_date = result.fetchone()[0]
|
||
self.logger.info(f"Последняя дата сообщения {last_date}")
|
||
return last_date
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения последней даты войса: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_last_user_audio_record(self, user_id):
|
||
"""Получает данные о количестве записей пользователя"""
|
||
self.logger.info(
|
||
f"Запуск функции get_last_user_audio_record. user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
r = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
|
||
(user_id,))
|
||
result = bool(len(r.fetchall()))
|
||
self.logger.info(
|
||
f"Результат функции get_last_user_audio_record: {result}")
|
||
return result
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения последней даты войса: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_id_for_audio_record(self, user_id):
|
||
"""Получает ID аудио сообщения пользователя"""
|
||
self.logger.info(
|
||
f"Запуск функции get_id_for_audio_record. user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
r = self.cursor.execute(
|
||
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? "
|
||
"ORDER BY date_added DESC LIMIT 1",
|
||
(user_id,))
|
||
result = r.fetchone()[0]
|
||
self.logger.info(
|
||
f"Результат функции get_id_for_audio_record: {result}")
|
||
return result
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения последней даты войса: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def get_path_for_audio_record(self, user_id):
|
||
"""Получает данные о названии файла"""
|
||
self.logger.info(
|
||
f"Запуск функции get_path_for_audio_record. user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
r = self.cursor.execute(
|
||
"SELECT `file_name` "
|
||
"FROM `audio_message_reference` "
|
||
"WHERE `author_id` = ? "
|
||
"ORDER BY date_added "
|
||
"DESC LIMIT 1",
|
||
(user_id,))
|
||
result = r.fetchone()[0]
|
||
self.logger.info(
|
||
f"Результат функции get_path_for_audio_record: {result}")
|
||
return result
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения последней даты войса: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def check_listen_audio(self, user_id):
|
||
"""Проверяет прослушано ли аудио пользователем"""
|
||
self.logger.info(
|
||
f"Запуск функции check_listen_audio. user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
query_listen_audio = self.cursor.execute(
|
||
"""SELECT l.file_name
|
||
FROM audio_message_reference a
|
||
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
|
||
WHERE l.user_id = ?
|
||
AND l.file_name IS NOT NULL""", (user_id,))
|
||
check_sign = query_listen_audio.fetchall()
|
||
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
|
||
(user_id,))
|
||
sign_all_audio = query_all_audio.fetchall()
|
||
new_sign1 = list(set(sign_all_audio) - set(check_sign))
|
||
new_sign = []
|
||
for i in new_sign1:
|
||
new_sign.append(i[0])
|
||
self.logger.info(
|
||
f"Функция check_listen_audio успешно отработала.")
|
||
return new_sign
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения последней даты войса: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def mark_listened_audio(self, file_name, user_id):
|
||
"""Отмечает аудио прослушанным для конкретного пользователя."""
|
||
self.logger.info(
|
||
f"Запуск функции mark_listened_audio. file_name={file_name}, user_id={user_id}")
|
||
try:
|
||
self.connect()
|
||
result = self.cursor.execute(
|
||
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
|
||
(file_name, user_id, 1))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
self.logger.error(f"Ошибка получения последней даты войса: {error}")
|
||
raise
|
||
finally:
|
||
self.close()
|
||
|
||
def close(self):
|
||
"""Закрытие соединения и курсора."""
|
||
if self.cursor:
|
||
self.cursor.close()
|
||
if self.conn:
|
||
self.conn.close()
|