Files
telegram-helper-bot/database/db.py
2024-11-14 00:05:37 +03:00

1170 lines
58 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sqlite3
from datetime import datetime
from logs.custom_logger import logger
class BotDB:
def __init__(self, current_dir, name):
self.db_file = os.path.join(current_dir, name)
self.conn = None
self.cursor = None
self.logger = logger
self.logger.info(f'Инициация базы данных: {self.db_file}')
def connect(self):
"""Создание соединения и курсора."""
self.conn = sqlite3.connect(self.db_file)
self.cursor = self.conn.cursor()
def create_table(self, sql_script):
"""
Создает таблицу в базе. Используется в миграциях
Args:
sql_script: DDL скрипт таблицы
Returns:
None
"""
try:
self.connect()
self.cursor.execute(sql_script)
self.conn.commit()
self.logger.info(f'Таблица создана: {sql_script}')
except Exception as e:
self.logger.error(f'Ошибка при создании таблицы. Данные: {sql_script} Ошибка: {e}')
raise
finally:
self.close()
def get_current_version(self):
"""
Возвращает текущую последнюю версию миграции
Args:
None
Returns:
int: Версия последней миграции.
"""
self.logger.info(f'Попытка получения версии миграции')
try:
self.connect()
self.cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
version = self.cursor.fetchone()[0]
self.logger.info(f'Получена текущая версия миграции: {version}')
return version
except Exception as e:
self.logger.error(f'Ошибка при получении текущей версии миграции: {str(e)}')
raise
finally:
self.close()
def update_version(self, new_version: int, script_name: str):
"""
Обновляет версию миграций в таблице migrations.
Добавляет новую запись в таблицу migrations с указанной версией,
именем скрипта и текущей датой и временем.
Args:
new_version (int): Новая версия миграции
script_name (str): Имя скрипта миграции
Returns:
None
Raises:
sqlite3. IntegrityError: Если возникает ошибка целостности при вставке
данных в таблицу migrations.
Exception: Если возникает любая другая ошибка при обновлении версии.
"""
self.logger.info(f'Попытка обновления версии: {new_version}, название скрипта: {script_name}')
try:
self.connect()
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
self.cursor.execute(
"INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
(new_version, script_name, today),
)
self.conn.commit()
self.logger.info(f"Версия обновлена: {new_version}, название скрипта: {script_name}")
except sqlite3.IntegrityError as e:
self.logger.error(f"Ошибка при обновлении версии: {e}")
raise
except Exception as e:
self.logger.error(f"Ошибка при обновлении версии: {e}")
raise
finally:
self.close()
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда
def get_error_message_from_db(self, id: int):
"""
@deprecated
Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
id - идентификатор ошибки
"""
# Подключаемся к базе
try:
self.connect()
self.cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
response_from_database = str(self.cursor.fetchone()[1])
return response_from_database
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении сообщения об ошибка voice_bot: {error}")
finally:
self.close()
def add_new_user_in_db(self, user_id: int, first_name: str, full_name: str, username: str, is_bot: bool,
language_code: str, date_added: str,
date_changed: str):
"""
Добавляет нового пользователя в базу данных.
Args:
user_id (int): Идентификатор пользователя в Telegram.
first_name (str): Имя пользователя.
full_name (str): Полное имя пользователя.
username (str): Username пользователя в Telegram.
is_bot (bool): Флаг, указывающий, является ли пользователь ботом.
language_code (str): Код языка пользователя.
date_added (str): Дата добавления пользователя в базу.
date_changed (str): Дата последнего изменения данных пользователя.
Returns:
None: Если запись успешно добавлена в базу.
Exception: Если произошла ошибка при добавлении записи.
"""
self.logger.info(f"Попытка добавить пользователя в базу данных: user_id={user_id}, first_name={first_name}")
try:
self.connect()
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name,
username, is_bot, language_code, date_added, date_changed))
self.conn.commit()
self.logger.info(f"Новый пользователь добавлен в базу: user_id={user_id}, first_name={first_name}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при добавлении пользователя в базу: {error}. "
f"Данные пользователя: user_id={user_id}, first_name={first_name}")
raise
finally:
self.close()
def user_exists(self, user_id: int):
"""
Проверяет, существует ли пользователь в базе данных.
Args:
user_id (int): Идентификатор пользователя.
Returns:
bool: True, если пользователь найден, False - иначе.
"""
self.logger.info(f"Попытка проверки существования пользователя: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchall()
self.logger.info(f"Проверка существования пользователя: user_id={user_id}, результат={result}")
return bool(len(result))
except sqlite3.Error as error:
self.logger.error(f"Ошибка при проверке существования пользователя: {error}")
raise
finally:
self.close()
def get_user_id(self, user_id: int):
"""
@deprecated
Возвращает ID пользователя в базе данных по его user_id.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
int: ID пользователя в базе данных.
None: Если пользователь не найден.
"""
self.logger.info(f"Попытка получения ID пользователя в базе данных для user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT id FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
user_id_db = result[0]
self.logger.info(f"ID пользователя в базе найден: user_id={user_id}, id_db={user_id_db}")
return user_id_db
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении ID пользователя из базы данных: {error}")
raise
finally:
self.close()
def get_username(self, user_id: int):
"""
Возвращает username пользователя из базы данных по его user_id в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
str: Username пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
username = result[0]
self.logger.info(f"Username пользователя найден: user_id={user_id}, username={username}")
return username
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_user_id_by_username(self, username: str):
"""
Возвращает user_id пользователя из базы данных по его user_name в Telegram.
Args:
username (str): Username пользователя.
Returns:
user_id (int): Идентификатор пользователя в Telegram.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT user_id FROM our_users WHERE username = ?", (username,))
result = self.cursor.fetchone()
if result:
user_id = result[0]
self.logger.info(f"User_id пользователя найден: username={username}, user_id={user_id}")
return user_id
else:
self.logger.info(f"Пользователь с username={username} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_full_name_by_id(self, user_id: str):
"""
Возвращает full_name пользователя из базы данных по его username в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
full_name (str): Username пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
try:
self.connect()
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
full_name = result[0]
self.logger.info(f"Username пользователя найден: user_id={user_id}, full_name={full_name}")
return full_name
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении username из базы данных: {error}")
raise
finally:
self.close()
def get_all_user_id(self):
"""
Возвращает список всех user_id из базы данных.
Returns:
list: Список user_id.
[]: Если в базе данных нет пользователей.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Попытка получения всех user_id")
try:
self.connect()
self.cursor.execute("SELECT user_id FROM our_users")
fetch_all = self.cursor.fetchall()
list_of_users = [user_id[0] for user_id in fetch_all]
self.logger.info(f"Получен список всех user_id: {list_of_users}")
return list_of_users
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении списка user_id из базы данных: {error}")
raise
finally:
self.close()
def get_user_first_name(self, user_id: int):
"""
Возвращает имя пользователя из базы данных по его user_id в Telegram.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
str: Имя пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Попытка получения имени пользователя по user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT first_name FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
first_name = result[0]
self.logger.info(f"Имя пользователя найдено: user_id={user_id}, first_name={first_name}")
return first_name
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении имени пользователя из базы данных: {error}")
raise
finally:
self.close()
def get_info_about_stickers(self, user_id: int):
"""
Проверяет, получил ли пользователь стикеры.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
bool: True, если пользователь получил стикеры, False - иначе.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Попытка проверки получил ли пользователь с user_id={user_id} стикеры.")
try:
self.connect()
self.cursor.execute("SELECT has_stickers FROM our_users WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
if result:
has_stickers = result[0] == 1
self.logger.info(
f"Проверено получение стикеров пользователем: user_id={user_id}, has_stickers={has_stickers}")
return has_stickers
else:
self.logger.info(f"Пользователь с user_id={user_id} не найден в базе данных.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении информации о получении стикеров: {error}")
raise
finally:
self.close()
def update_info_about_stickers(self, user_id):
"""
Обновляет информацию о получении стикеров пользователем.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
None: Если обновление успешно выполнено.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции update_info_about_stickers. Параметры: user_id={user_id}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET has_stickers = 1 WHERE user_id = ?", (user_id,))
self.conn.commit()
self.logger.info(f"Информация о получении стикеров обновлена: user_id={user_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при обновлении информации о получении стикеров: {error}")
raise
finally:
self.close()
def get_users_blacklist(self):
"""
Возвращает список пользователей в черном списке.
Returns:
dict: Словарь, где ключ - user_id, значение - username.
{}: Если в черном списке нет пользователей.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_users_blacklist")
try:
self.connect()
self.cursor.execute("SELECT user_id, user_name FROM blacklist")
fetch_all = self.cursor.fetchall()
list_of_users = {user_id: username for user_id, username in fetch_all}
self.logger.info(f"Получен список пользователей в черном списке")
return list_of_users
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении списка пользователей в черном списке: {error}")
raise
finally:
self.close()
def get_users_for_unblock_today(self, date_to_unban: str):
"""
Возвращает список пользователей, у которых истекает срок блокировки сегодня.
Args:
date_to_unban (str): Дата разблокировки.
Returns:
dict: Словарь, где ключ - user_id, значение - username.
{}: Если сегодня нет пользователей, у которых истекает срок блокировки.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_users_for_unblock_today: date_to_unban={date_to_unban}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id, user_name "
"FROM blacklist WHERE date_to_unban = ?", (date_to_unban,))
fetch_all = result.fetchall()
list_of_users = {user_id: username for user_id, username in fetch_all}
self.logger.info(f"Получен список пользователей для разблокировки сегодня: {list_of_users}")
return list_of_users
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении списка пользователей для разблокировки: {error}")
raise
finally:
self.close()
def get_blacklist_users_by_id(self, user_id: int):
"""
Возвращает информацию о пользователе в черном списке по user_id.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
tuple: Кортеж (user_id, user_name, message_for_user, date_to_unban).
None: Если пользователь не найден в черном списке.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_blacklist_users_by_id: user_id={user_id}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban "
"FROM blacklist WHERE user_id = ?", (user_id,))
return self.cursor.fetchone()
except sqlite3.Error as error:
self.logger.error(f"Ошибка при получении информации о пользователе в черном списке: {error}")
raise
finally:
self.close()
def check_user_in_blacklist(self, user_id: int):
"""
Проверяет, существует ли запись с данным user_id в blacklist.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
bool: True, если пользователь найден в черном списке, False - иначе.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции check_user_in_blacklist: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
self.logger.info(f"Существует ли пользователь: user_id={user_id} Итог: {result}")
return bool(result)
except sqlite3.Error as error:
self.logger.error(f"Ошибка при проверке пользователя в черном списке. user_id: {user_id} : {error}")
raise
finally:
self.close()
def set_user_blacklist(self, user_id: int, user_name=None, message_for_user=None, date_to_unban=None):
"""
Добавляет пользователя в черный список.
Args:
user_id (int): Идентификатор пользователя в Telegram.
user_name (str, optional): Username пользователя. Defaults to None.
message_for_user (str, optional): Сообщение для пользователя. Defaults to None.
date_to_unban (datetime, optional): Дата разблокировки. Defaults to None.
Returns:
None: Если добавление в черный список успешно выполнено.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции set_user_blacklist: user_id={user_id}, user_name={user_name},"
f" message_for_user={message_for_user}, date_to_unban={date_to_unban}")
try:
self.connect()
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban,))
self.conn.commit()
self.logger.info(f"Пользователь добавлен в черный список: user_id={user_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка при добавлении пользователя в черный список: {error}")
return error
finally:
self.close()
def delete_user_blacklist(self, user_id: int):
"""
Удаляет пользователя из черного списка.
Args:
user_id (int): Идентификатор пользователя в Telegram.
Returns:
bool: True, если удаление прошло успешно, False - в случае ошибки.
Raises:
None: Ошибки обрабатываются в блоке except, возвращая False.
"""
self.logger.info(f"Запуск функции delete_user_blacklist: user_id={user_id}")
try:
self.connect()
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit()
self.logger.info(f"Пользователь с идентификатором {user_id} успешно удален из черного списка.")
return True
except sqlite3.Error as error:
self.logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} "
f"из таблицы blacklist. Ошибка: {str(error)}")
return False
finally:
self.close()
def add_new_message_in_db(self, message_text: str, user_id: int, message_id: int, date: str):
"""
Добавляет новое сообщение пользователя в базу данных.
Args:
message_text (str): Текст сообщения.
user_id (int): Идентификатор пользователя в Telegram.
message_id (int): Идентификатор сообщения в Telegram.
date (str): Дата отправки сообщения.
Returns:
None: Если добавление прошло успешно.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(
f"Запуск функции add_new_message_in_db: user_id={user_id}, message_id={message_id}, date={date}")
try:
self.connect()
self.cursor.execute(
"INSERT INTO user_messages (message_text, user_id, message_id, date) "
"VALUES (?, ?, ?, ?)",
(message_text, user_id, message_id, date))
self.conn.commit()
self.logger.info(f"Новое сообщение добавлено в базу данных: message_id={message_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
raise
finally:
self.close()
def get_username_and_full_name(self, user_id: int):
"""
Получает full_name и username пользователя по ID из базы
Args:
date (str): Новая дата изменения.
user_id (int): Идентификатор пользователя в Telegram.
Returns:
username (str): username пользователя
full_name (str): full_name пользователя
"""
self.logger.info(
f"Запуск функции check_username_and_first_name: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT username FROM our_users WHERE user_id = ?", (user_id,))
username = self.cursor.fetchone()[0]
self.cursor.execute("SELECT full_name FROM our_users WHERE user_id = ?", (user_id,))
full_name = self.cursor.fetchone()[0]
self.logger.info(
f"Функция check_username_and_first_name успешно отработала: user_id={user_id}, username={username}, full_name={full_name}")
return username, full_name
except sqlite3.Error as error:
self.logger.error(f"Ошибка в функции get_username_and_first_name: {error}")
return None
finally:
self.close()
def update_username_and_full_name(self, user_id: int, username: str, full_name: str):
"""
Обновляет full_name и username пользователя
Args:
username (str): username пользователя
full_name (str): full_name пользователя
user_id (int): Идентификатор пользователя в Telegram
Returns:
True (bool): Если обновления прошли успешно
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(
f"Запуск функции update_username_and_full_name: user_id={user_id}, username={username}, full_name={full_name}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET username = ?, full_name = ? WHERE user_id = ?",
(username, full_name, user_id,))
self.conn.commit()
self.logger.info(
f"Функция update_username_and_full_name. Данные пользователя: user_id={user_id} успешно обновлены")
return True
except sqlite3.Error as error:
self.logger.error(f"Ошибка в функции update_username_and_full_name: {error}")
raise
finally:
self.close()
def update_date_for_user(self, date: str, user_id: int):
"""
#TODO: Не возвращается ошибка sqlite3. Error. Тест не перехватывает. Возвращается no such table: our_users
Обновляет дату последнего изменения данных пользователя в базе.
Args:
date (str): Новая дата изменения.
user_id (int): Идентификатор пользователя в Telegram.
Returns:
None: Если обновление прошло успешно.
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции update_date_for_user: user_id={user_id}, date={date}")
try:
self.connect()
self.cursor.execute("UPDATE our_users SET date_changed = ? WHERE user_id = ?",
(date, user_id,))
self.conn.commit()
self.logger.info(f"Дата изменения обновлена для пользователя: user_id={user_id}")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка обновления даты изменения для пользователя: {error}")
return error
finally:
self.close()
def is_admin(self, user_id: int):
"""
Проверяет, является ли пользователь администратором.
Args:
user_id: ID пользователя Telegram.
Returns:
True, если пользователь администратор, иначе False.
Raises:
None: В случае ошибки возвращается None
"""
self.logger.info(f"Запуск функции is_admin: user_id={user_id}")
try:
self.connect()
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
except sqlite3.Error as error:
self.logger.error(f"Ошибка добавления сообщения в базу данных: {error}")
return None
finally:
self.close()
def add_admin(self, user_id: int, role: str):
"""
Добавляет пользователя в список администраторов.
Args:
user_id (int): ID пользователя Telegram.
role (str): Роль пользователя. Доступные варианты:
1. creator - создатель
2. admin - обычная роль
Returns:
None: Если добавление прошло успешно.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции add_admin: user_id={user_id}, role={role}")
try:
self.connect()
self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
self.conn.commit()
self.logger.info(f"Пользователь с user_id={user_id} добавлен в список администраторов с ролью {role}.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка добавления пользователя в список администраторов: {error}")
return error
finally:
self.close()
def remove_admin(self, user_id: int):
"""
Удаляет пользователя из списка администраторов.
Args:
user_id (int): ID пользователя Telegram.
Returns:
None: Если удаление прошло успешно.
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции remove_admin: user_id={user_id}")
try:
self.connect()
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
self.conn.commit()
self.logger.info(f"Пользователь с user_id={user_id} удален из списка администраторов.")
return None
except sqlite3.Error as error:
self.logger.error(f"Ошибка удаления пользователя из списка администраторов: {error}")
return error
finally:
self.connect()
def get_user_by_message_id(self, message_id: int):
"""
#TODO: Возвращается TypeError вместо None
Возвращает идентификатор пользователя по идентификатору сообщения.
Args:
message_id (int): Идентификатор сообщения в Telegram.
Returns:
int: Идентификатор пользователя.
None: Если пользователь не найден.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_user_by_message_id: message_id={message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT user_id FROM user_messages WHERE message_id = ?", (message_id,))
user = result.fetchone()[0]
self.logger.info(f"Пользователь успешно получен user_id={user} по message_id={message_id}")
return user
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения user_id по message_id: {error}")
raise
finally:
self.close()
def get_last_users_from_db(self):
"""
Возвращает список идентификаторов последних 30 пользователей, обращавшихся в бот.
Returns:
list: Список кортежей (full_name, user_id) последних 30 пользователей.
[]: Если в базе данных нет пользователей.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info("Запуск функции get_last_users_from_db")
try:
self.connect()
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 30")
users = result.fetchall()
self.logger.info(f"Получен список последних 30 пользователей: {users}")
return users
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения списка последних пользователей: {error}")
raise
finally:
self.close()
def get_banned_users_from_db(self):
"""
Возвращает список идентификаторов пользователей в черном списке бота.
Returns:
list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
[]: Если в черном списке нет пользователей.
Raises:
sqlite3.Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info("Запуск функции get_banned_users_from_db")
try:
self.connect()
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist")
users = result.fetchall()
self.logger.info(f"Получен список пользователей в черном списке: {users}")
return users
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
raise
finally:
self.close()
def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
"""
Возвращает список идентификаторов пользователей в черном списке бота с учетом смещения и ограничения.
Args:
offset (int): Смещение для выборки
limit (int): Ограничение количества возвращаемых записей.
Returns:
list: Список кортежей (user_name, user_id, message_for_user, date_to_unban) пользователей в черном списке.
[]: Если в черном списке нет пользователей или количество записей с учетом смещения и ограничения равно 0.
Raises:
sqlite3. Error: Если произошла ошибка при выполнении запроса.
"""
self.logger.info(f"Запуск функции get_banned_users_from_db_with_limits: offset={offset}, limit={limit}")
try:
self.connect()
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban "
"FROM blacklist LIMIT ?, ?", (offset, limit,))
users = result.fetchall()
self.logger.info(f"Получен список пользователей в черном списке (offset={offset}, limit={limit}): {users}")
return users
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения списка пользователей в черном списке: {error}")
raise
finally:
self.close()
def get_post_content_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(
f"Запуск функции get_post_content_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("""
SELECT cpft.content_name, cpft.content_type
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc
ON pft.message_id = mltc.post_id
JOIN content_post_from_telegram cpft
ON cpft.message_id = mltc.message_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,))
post_content = result.fetchall()
self.logger.info(
f"Функция get_post_content_from_telegram_by_last_id получила список контента: {post_content}")
return post_content
finally:
self.close()
def get_post_ids_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(
f"Запуск функции get_post_ids_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("""
SELECT mltc.message_id
FROM post_from_telegram_suggest pft
JOIN message_link_to_content mltc
ON pft.message_id = mltc.post_id
WHERE pft.helper_text_message_id = ?
""", (last_post_id,))
post_ids = result.fetchall()
self.logger.info(f"Функция get_post_ids_from_telegram_by_last_id "
f"получила идентификаторы сообщений: {post_ids}")
return post_ids
except Exception as e:
self.logger.error(f"Ошибка в функции get_post_ids_from_telegram_by_last_id {str(e)}")
return False
finally:
self.close()
def get_post_text_from_telegram_by_last_id(self, last_post_id: int):
self.logger.info(f"Запуск функции get_post_text_from_telegram_by_last_id, идентификатор поста {last_post_id}")
try:
self.connect()
result = self.cursor.execute("SELECT text "
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(last_post_id,))
text = result.fetchone()[0]
self.logger.info(f"Функция get_post_text_from_telegram_by_last_id получила text")
return text
except Exception as e:
self.logger.error(f"Ошибка в функции get_post_text_from_telegram_by_last_id {str(e)}")
def get_author_id_by_message_id(self, message_id: int):
self.logger.info(f"Запуск функции get_author_id_by_message_id, идентификатор поста {message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM post_from_telegram_suggest WHERE message_id = ?",
(message_id,))
author_id = result.fetchone()[0]
self.logger.info(f"Функция get_author_id_by_message_id получила author_id {author_id}")
return author_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_author_id_by_message_id {str(e)}")
def get_author_id_by_helper_message_id(self, helper_text_message_id: int):
self.logger.info(f"Запуск функции get_author_id_by_helper_message_id, идентификатор поста "
f"{helper_text_message_id}")
try:
self.connect()
result = self.cursor.execute("SELECT author_id "
"FROM post_from_telegram_suggest WHERE helper_text_message_id = ?",
(helper_text_message_id,))
author_id = result.fetchone()[0]
self.logger.info(f"Функция get_author_id_by_helper_message_id получила author_id {author_id}")
return author_id
except Exception as e:
self.logger.error(f"Ошибка в функции get_author_id_by_helper_message_id {str(e)}")
def add_post_content_in_db(self, post_id: int, message_id: int, content_name: str, type_content: str):
self.logger.info(
f"Запуск функции add_post_content_in_db: post_id={post_id}, message_id={message_id}, "
f"content_name={content_name}, content_type={type_content}")
try:
self.connect()
self.cursor.execute(
"INSERT INTO message_link_to_content (post_id, message_id)"
"VALUES (?, ?)", (post_id, message_id))
self.conn.commit()
self.cursor.execute(
"INSERT INTO content_post_from_telegram (message_id, content_name, content_type)"
"VALUES (?, ?, ?)", (message_id, content_name, type_content))
self.conn.commit()
self.logger.info(f"Функция add_post_content_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции add_post_content_in_db при добавлении поста в базу данных: {e}")
return False
def add_post_in_db(self, message_id: int, text: str, author_id: int):
self.logger.info(
f"Запуск функции add_post_in_db: message_id={message_id}, "
f"author_id={author_id}")
try:
today = datetime.now().strftime("%d-%m-%Y %H:%M:%S")
self.connect()
self.cursor.execute(
"INSERT INTO post_from_telegram_suggest (message_id, text, author_id, created_at)"
"VALUES (?, ?, ?, ?)", (message_id, text, author_id, today))
self.conn.commit()
self.logger.info(f"Функция add_post_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции add_post_in_db при добавлении поста в базу данных: {e}")
return False
def update_helper_message_in_db(self, message_id: int, helper_message_id: int):
self.logger.info(
f"Запуск функции update_helper_message_in_db: message_id={message_id}, "
f"helper_message_id={helper_message_id}")
try:
self.connect()
self.cursor.execute(
"UPDATE post_from_telegram_suggest SET helper_text_message_id = ? WHERE message_id = ?",
(helper_message_id, message_id,))
self.conn.commit()
self.logger.info(f"Функция update_helper_message_in_db отработала успешно")
return True
except Exception as e:
self.logger.error(f"Ошибка в функции update_helper_message_in_db при добавлении поста в базу данных: {e}")
return False
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД"""
self.logger.info(
f"Запуск функции add_audio_record (file_name = {file_name}, author_id = {author_id},"
f" date_added = {date_added}")
try:
self.connect()
result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) "
"VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id))
self.conn.commit()
self.logger.info(
f"Аудио успешно добавлено в БД (file_name = {file_name}, author_id = {author_id}, "
f"date_added = {date_added}")
return None
except sqlite3.Error as error:
print(error)
raise
finally:
self.close()
def last_date_audio(self):
"""Получаем дату последнего войса"""
self.logger.info(
f"Запуск функции last_date_audio")
try:
self.connect()
result = self.cursor.execute(
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
last_date = result.fetchone()[0]
self.logger.info(f"Последняя дата сообщения {last_date}")
return last_date
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя"""
self.logger.info(
f"Запуск функции get_last_user_audio_record. user_id={user_id}")
try:
self.connect()
r = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,))
result = bool(len(r.fetchall()))
self.logger.info(
f"Результат функции get_last_user_audio_record: {result}")
return result
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def get_id_for_audio_record(self, user_id):
"""Получает ID аудио сообщения пользователя"""
self.logger.info(
f"Запуск функции get_id_for_audio_record. user_id={user_id}")
try:
self.connect()
r = self.cursor.execute(
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? "
"ORDER BY date_added DESC LIMIT 1",
(user_id,))
result = r.fetchone()[0]
self.logger.info(
f"Результат функции get_id_for_audio_record: {result}")
return result
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла"""
self.logger.info(
f"Запуск функции get_path_for_audio_record. user_id={user_id}")
try:
self.connect()
r = self.cursor.execute(
"SELECT `file_name` "
"FROM `audio_message_reference` "
"WHERE `author_id` = ? "
"ORDER BY date_added "
"DESC LIMIT 1",
(user_id,))
result = r.fetchone()[0]
self.logger.info(
f"Результат функции get_path_for_audio_record: {result}")
return result
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def check_listen_audio(self, user_id):
"""Проверяет прослушано ли аудио пользователем"""
self.logger.info(
f"Запуск функции check_listen_audio. user_id={user_id}")
try:
self.connect()
query_listen_audio = self.cursor.execute(
"""SELECT l.file_name
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ?
AND l.file_name IS NOT NULL""", (user_id,))
check_sign = query_listen_audio.fetchall()
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
(user_id,))
sign_all_audio = query_all_audio.fetchall()
new_sign1 = list(set(sign_all_audio) - set(check_sign))
new_sign = []
for i in new_sign1:
new_sign.append(i[0])
self.logger.info(
f"Функция check_listen_audio успешно отработала.")
return new_sign
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя."""
self.logger.info(
f"Запуск функции mark_listened_audio. file_name={file_name}, user_id={user_id}")
try:
self.connect()
result = self.cursor.execute(
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1))
return self.conn.commit()
except sqlite3.Error as error:
self.logger.error(f"Ошибка получения последней даты войса: {error}")
raise
finally:
self.close()
def close(self):
"""Закрытие соединения и курсора."""
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()