407 lines
20 KiB
Python
407 lines
20 KiB
Python
import sqlite3
|
||
import configparser
|
||
import os
|
||
from datetime import datetime
|
||
from loguru import logger
|
||
from custom_logger import Logger
|
||
|
||
db_logger = Logger(name='db')
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
config_path = os.path.join(script_dir, 'settings.ini')
|
||
config = configparser.ConfigParser()
|
||
config.read(config_path)
|
||
LOGS = config.getboolean('Settings', 'logs')
|
||
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
|
||
|
||
|
||
class BotDB:
|
||
|
||
def __init__(self):
|
||
db_file_path = os.path.dirname(os.path.abspath(__file__))
|
||
db_file = os.path.join(db_file_path, 'tg-bot-database')
|
||
self.conn = sqlite3.connect(db_file, check_same_thread=False)
|
||
self.cursor = self.conn.cursor()
|
||
logger.info(f'Подключен к базе данных: {db_file_path}')
|
||
|
||
def create_table(self, sql_script):
|
||
"""Создает таблицу в базе."""
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(sql_script)
|
||
except Exception as e:
|
||
print(f"Ошибка при создании таблицы: {e}")
|
||
|
||
def get_current_version(self):
|
||
"""Получает текущую версию миграций из таблицы migrations."""
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute("SELECT version FROM migrations ORDER BY version DESC LIMIT 1")
|
||
version = cursor.fetchone()[0]
|
||
return version
|
||
except Exception as e:
|
||
print(f"Ошибка при получении версии: {e}")
|
||
return 0
|
||
|
||
def update_version(self, new_version, script_name):
|
||
"""Обновляет версию миграций в таблице migrations."""
|
||
logger.info(f'Попытка обновления версии: {new_version}, скрипт: {script_name}')
|
||
try:
|
||
current_date = datetime.now()
|
||
today = current_date.strftime("%d-%m-%Y %H:%M:%S")
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(
|
||
"INSERT INTO migrations (version, script_name, created_at) VALUES(?, ?, ?)",
|
||
(new_version, script_name, today),
|
||
)
|
||
self.conn.commit()
|
||
logger.info(f"Версия обновлена: {new_version}, скрипт: {script_name}")
|
||
except sqlite3.IntegrityError as e:
|
||
logger.error(f"Ошибка при обновлении версии: {e}")
|
||
except Exception as e:
|
||
logger.error(f"Ошибка при обновлении версии: {e}")
|
||
|
||
# TODO: Deprecated, удалить
|
||
def get_message_from_db(self, type: str, username):
|
||
"""Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются:
|
||
type - тип получаемой обратной связи, строковое значение, сохраненное в БД
|
||
username - имя пользователя
|
||
"""
|
||
# Подключаемся к базе
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"SELECT * FROM messages WHERE type=?", (type,))
|
||
# Забираем данные из таблицы, преобразуем в строку, заменяем поле username на имя пользователя,
|
||
# и вместо амберсанда подставляем перенос строки
|
||
if type == 'connect_with_admin' or type == 'del_message' or type == 'suggest_news' or type == 'start_message':
|
||
response_from_database = str(cursor.fetchone()[1]).replace('username', username).replace('&', '\n')
|
||
else:
|
||
response_from_database = str(cursor.fetchone()[1]).replace('&', '\n')
|
||
return response_from_database
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда
|
||
def get_error_message_from_db(self, id: int):
|
||
"""Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
|
||
id - идентификатор ошибки
|
||
"""
|
||
# Подключаемся к базе
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
|
||
response_from_database = str(cursor.fetchone()[1])
|
||
return response_from_database
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added,
|
||
date_changed):
|
||
"""Добавляем юзера в базу"""
|
||
try:
|
||
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
|
||
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||
(user_id, first_name, full_name,
|
||
username, is_bot, language_code, date_added, date_changed))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def user_exists(self, user_id):
|
||
"""Проверяем, есть ли юзер в базе"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return bool(len(result.fetchall()))
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_user_id(self, user_id):
|
||
"""Достаем id юзера в базе по его user_id"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_username(self, user_id):
|
||
"""Достаем id юзера в базе по его user_id"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `username` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_all_user_id(self):
|
||
"""Достаем все айдишники юзеров из БД и преобразуем их в список"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `user_id` FROM `our_users`", )
|
||
fetch_all = result.fetchall()
|
||
list_of_users = []
|
||
for i in fetch_all:
|
||
list_of_users.append(i[0])
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_user_first_name(self, user_id):
|
||
try:
|
||
result = self.cursor.execute("SELECT `first_name` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def change_name(self, user_id):
|
||
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed
|
||
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
|
||
pass
|
||
|
||
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
|
||
"""Добавляет информацию о войсе юзера в БД"""
|
||
try:
|
||
result = self.cursor.execute(
|
||
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)",
|
||
(file_name, author_id, date_added, listen_count, file_id))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def last_date_audio(self):
|
||
"""Получаем дату последнего войса"""
|
||
try:
|
||
result = self.cursor.execute(
|
||
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_last_user_audio_record(self, user_id):
|
||
"""Получает данные о количестве записей пользователя"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
|
||
(user_id,))
|
||
return bool(len(result.fetchall()))
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_id_for_audio_record(self, user_id):
|
||
"""Получает ID аудио сообщения пользователя"""
|
||
try:
|
||
result = self.cursor.execute(
|
||
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
|
||
(user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_path_for_audio_record(self, user_id):
|
||
"""Получает данные о названии файла"""
|
||
try:
|
||
result = self.cursor.execute(
|
||
"SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
|
||
(user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def check_listen_audio(self, user_id):
|
||
"""Проверяет прослушано ли аудио пользователем"""
|
||
try:
|
||
query_listen_audio = self.cursor.execute(
|
||
"""SELECT l.file_name
|
||
FROM audio_message_reference a
|
||
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
|
||
WHERE l.user_id = ?
|
||
AND l.file_name IS NOT NULL""", (user_id,))
|
||
check_sign = query_listen_audio.fetchall()
|
||
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
|
||
(user_id,))
|
||
sign_all_audio = query_all_audio.fetchall()
|
||
new_sign1 = list(set(sign_all_audio) - set(check_sign))
|
||
new_sign = []
|
||
for i in new_sign1:
|
||
new_sign.append(i[0])
|
||
return new_sign
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def mark_listened_audio(self, file_name, user_id):
|
||
"""Отмечает аудио прослушанным для конкретного пользователя."""
|
||
try:
|
||
result = self.cursor.execute(
|
||
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
|
||
(file_name, user_id, 1))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_info_about_stickers(self, user_id):
|
||
"""Получает данные о получении стикеров пользователем"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0] == 1
|
||
#return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def update_info_about_stickers(self, user_id):
|
||
"""Обновляет данные о получении стикеров пользователем"""
|
||
try:
|
||
result = self.cursor.execute("UPDATE `our_users` SET `has_stickers` = 1 WHERE `user_id` = ?", (user_id,))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_users_blacklist(self):
|
||
"""Возвращает список пользователей в черном списке"""
|
||
try:
|
||
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist`")
|
||
fetch_all = result.fetchall()
|
||
list_of_users = {}
|
||
for i in fetch_all:
|
||
list_of_users[i[0]] = i[1]
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_users_for_unblock_today(self, date_to_unban):
|
||
"""Возвращает пользователей у которых истекает срок блокировки сегодня"""
|
||
try:
|
||
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist` WHERE date_to_unban = ?", (date_to_unban,))
|
||
fetch_all = result.fetchall()
|
||
list_of_users = {}
|
||
for i in fetch_all:
|
||
list_of_users[i[0]] = i[1]
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_blacklist_users_by_id(self, user_id):
|
||
"""Возвращает список пользователей в черном списке по user_id"""
|
||
try:
|
||
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban FROM `blacklist` WHERE user_id = ?", (user_id, ))
|
||
return self.cursor.fetchone()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
|
||
def check_user_in_blacklist(self, user_id):
|
||
"""Проверяет, существует ли запись с данным user_id в blacklist."""
|
||
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
return bool(result)
|
||
|
||
def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None):
|
||
"""Добавляет пользователя в черный список"""
|
||
try:
|
||
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
|
||
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
|
||
(user_id, user_name, message_for_user, date_to_unban,))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
return error
|
||
|
||
def delete_user_blacklist(self, user_id):
|
||
"""Удаляет пользователя из черного списка"""
|
||
try:
|
||
#TODO: Функция всегда возвращает true, даже если такого id нет в таблице
|
||
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
|
||
self.conn.commit()
|
||
logger.info(f"Пользователь с идентификатором {user_id} успешно удален.")
|
||
return True
|
||
except sqlite3.Error as error:
|
||
logger.error(f"Ошибка удаления пользователя с идентификатором {user_id} из таблицы blacklist. Ошибка: {str(error)}")
|
||
return False
|
||
|
||
def add_new_message_in_db(self, message_text, user_id, message_id, date, has_answer):
|
||
"""Добавляем сообщение юзера в базу"""
|
||
try:
|
||
|
||
self.cursor.execute(
|
||
"INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) "
|
||
"VALUES (?, ?, ?, ?, ?)",
|
||
(message_text, message_id, user_id, date, has_answer))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def update_date_for_user(self, date, user_id: int):
|
||
try:
|
||
result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?",
|
||
(date, user_id,))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def is_admin(self, user_id):
|
||
"""
|
||
Проверяет, является ли пользователь администратором.
|
||
Args:
|
||
user_id: ID пользователя Telegram.
|
||
Returns:
|
||
True, если пользователь администратор, иначе False.
|
||
"""
|
||
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
|
||
result = self.cursor.fetchone()
|
||
return bool(result)
|
||
|
||
def add_admin(self, user_id, role):
|
||
"""
|
||
Добавляет пользователя в список администраторов.
|
||
Args:
|
||
user_id: ID пользователя Telegram.
|
||
role: Роль пользователя.
|
||
Доступные варианты:
|
||
1. creator - создатель
|
||
2. admin - обычная роль
|
||
"""
|
||
self.cursor.execute("INSERT INTO admins (user_id, role) VALUES (?, ?)", (user_id, role))
|
||
return self.conn.commit()
|
||
|
||
def remove_admin(self, user_id):
|
||
"""
|
||
Удаляет пользователя из списка администраторов.
|
||
|
||
Args:
|
||
user_id: ID пользователя Telegram.
|
||
"""
|
||
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
|
||
return self.conn.commit()
|
||
|
||
def get_user_by_message_id(self, message_id):
|
||
"""Возвращает идентификатор пользователя по идентификатору сообщения"""
|
||
try:
|
||
result = self.cursor.execute("SELECT user_id FROM `user_messages` WHERE message_id = ?", (message_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_last_users_from_db(self):
|
||
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
|
||
try:
|
||
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC")
|
||
users = result.fetchall()
|
||
return users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_banned_users_from_db(self):
|
||
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
|
||
try:
|
||
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist", )
|
||
users = result.fetchall()
|
||
return users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
|
||
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
|
||
try:
|
||
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist LIMIT ?, ?", (offset, limit,) )
|
||
users = result.fetchall()
|
||
return users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def close(self):
|
||
"""Закрываем соединение с БД"""
|
||
self.conn.close()
|