Files
telegram-helper-bot/db.py
KatykhinAA c3a6bcbb8a some fix
2024-07-06 00:33:37 +03:00

363 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import sqlite3
import configparser
import os
import sys
import custom_logger
config_path = os.path.join(sys.path[0], 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
LOGS = config.getboolean('Settings', 'logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
db_logger = custom_logger.Logger('database')
class BotDB:
def __init__(self, db_file):
self.conn = sqlite3.connect(db_file, check_same_thread=False)
self.cursor = self.conn.cursor()
# TODO: Deprecated, удалить
def get_message_from_db(self, type: str, username):
"""Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются:
type - тип получаемой обратной связи, строковое значение, сохраненное в БД
username - имя пользователя
"""
# Подключаемся к базе
try:
cursor = self.conn.cursor()
cursor.execute(f"SELECT * FROM messages WHERE type=?", (type,))
# Забираем данные из таблицы, преобразуем в строку, заменяем поле username на имя пользователя,
# и вместо амберсанда подставляем перенос строки
if type == 'connect_with_admin' or type == 'del_message' or type == 'suggest_news' or type == 'start_message':
response_from_database = str(cursor.fetchone()[1]).replace('username', username).replace('&', '\n')
else:
response_from_database = str(cursor.fetchone()[1]).replace('&', '\n')
return response_from_database
except sqlite3.Error as error:
print(error)
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда
def get_error_message_from_db(self, id: int):
"""Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
id - идентификатор ошибки
"""
# Подключаемся к базе
try:
cursor = self.conn.cursor()
cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
response_from_database = str(cursor.fetchone()[1])
return response_from_database
except sqlite3.Error as error:
print(error)
def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added,
date_changed):
"""Добавляем юзера в базу"""
try:
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name,
username, is_bot, language_code, date_added, date_changed))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def user_exists(self, user_id):
"""Проверяем, есть ли юзер в базе"""
try:
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return bool(len(result.fetchall()))
except sqlite3.Error as error:
print(error)
def get_user_id(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_username(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
result = self.cursor.execute("SELECT `username` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_all_user_id(self):
"""Достаем все айдишники юзеров из БД и преобразуем их в список"""
try:
result = self.cursor.execute("SELECT `user_id` FROM `our_users`", )
fetch_all = result.fetchall()
list_of_users = []
for i in fetch_all:
list_of_users.append(i[0])
return list_of_users
except sqlite3.Error as error:
print(error)
def get_user_first_name(self, user_id):
try:
result = self.cursor.execute("SELECT `first_name` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def change_name(self, user_id):
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
pass
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД"""
try:
result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def last_date_audio(self):
"""Получаем дату последнего войса"""
try:
result = self.cursor.execute(
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя"""
try:
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,))
return bool(len(result.fetchall()))
except sqlite3.Error as error:
print(error)
def get_id_for_audio_record(self, user_id):
"""Получает ID аудио сообщения пользователя"""
try:
result = self.cursor.execute(
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла"""
try:
result = self.cursor.execute(
"SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def check_listen_audio(self, user_id):
"""Проверяет прослушано ли аудио пользователем"""
try:
query_listen_audio = self.cursor.execute(
"""SELECT l.file_name
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ?
AND l.file_name IS NOT NULL""", (user_id,))
check_sign = query_listen_audio.fetchall()
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
(user_id,))
sign_all_audio = query_all_audio.fetchall()
new_sign1 = list(set(sign_all_audio) - set(check_sign))
new_sign = []
for i in new_sign1:
new_sign.append(i[0])
return new_sign
except sqlite3.Error as error:
print(error)
def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя."""
try:
result = self.cursor.execute(
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def get_info_about_stickers(self, user_id):
"""Получает данные о получении стикеров пользователем"""
try:
result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0] == 1
#return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def update_info_about_stickers(self, user_id):
"""Обновляет данные о получении стикеров пользователем"""
try:
result = self.cursor.execute("UPDATE `our_users` SET `has_stickers` = 1 WHERE `user_id` = ?", (user_id,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def get_users_blacklist(self):
"""Возвращает список пользователей в черном списке"""
try:
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist`")
fetch_all = result.fetchall()
list_of_users = {}
for i in fetch_all:
list_of_users[i[0]] = i[1]
return list_of_users
except sqlite3.Error as error:
print(error)
def get_users_for_unblock_today(self, date_to_unban):
"""Возвращает пользователей у которых истекает срок блокировки сегодня"""
try:
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist` WHERE date_to_unban = ?", (date_to_unban,))
fetch_all = result.fetchall()
list_of_users = {}
for i in fetch_all:
list_of_users[i[0]] = i[1]
return list_of_users
except sqlite3.Error as error:
print(error)
def get_blacklist_users_by_id(self, user_id):
"""Возвращает список пользователей в черном списке по user_id"""
try:
result = self.cursor.execute("SELECT user_id, user_name, message_for_user, date_to_unban FROM `blacklist` WHERE user_id = ?", (user_id, ))
return self.cursor.fetchone()
except sqlite3.Error as error:
print(error)
def check_user_in_blacklist(self, user_id):
"""Проверяет, существует ли запись с данным user_id в blacklist."""
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None):
"""Добавляет пользователя в черный список"""
try:
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban,))
return self.conn.commit()
except sqlite3.Error as error:
return error
def delete_user_blacklist(self, user_id):
"""Удаляет пользователя из черного списка"""
try:
#TODO: Функция всегда возвращает true, даже если такого id нет в таблице
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit()
db_logger.info(f"User with ID {user_id} successfull delete.")
return True
except sqlite3.Error as error:
db_logger.error(f"Error delete user with ID {user_id} from blacklist table: {str(error)}")
return False
def add_new_message_in_db(self, message_text, user_id, message_id, date, has_answer):
"""Добавляем сообщение юзера в базу"""
try:
self.cursor.execute(
"INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) "
"VALUES (?, ?, ?, ?, ?)",
(message_text, message_id, user_id, date, has_answer))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def update_date_for_user(self, date, user_id: int):
try:
result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?",
(date, user_id,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def is_admin(self, user_id):
"""
Проверяет, является ли пользователь администратором.
Args:
user_id: ID пользователя Telegram.
Returns:
True, если пользователь администратор, иначе False.
"""
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def add_admin(self, user_id, username=None):
"""
Добавляет пользователя в список администраторов.
Args:
user_id: ID пользователя Telegram.
username: Username пользователя (необязательно).
"""
self.cursor.execute("INSERT INTO admins (user_id, username) VALUES (?, ?)", (user_id, username))
return self.conn.commit()
def remove_admin(self, user_id):
"""
Удаляет пользователя из списка администраторов.
Args:
user_id: ID пользователя Telegram.
"""
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
return self.conn.commit()
def get_user_by_message_id(self, message_id):
"""Возвращает идентификатор пользователя по идентификатору сообщения"""
try:
result = self.cursor.execute("SELECT user_id FROM `user_messages` WHERE message_id = ?", (message_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_last_users_from_db(self):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC")
users = result.fetchall()
return users
except sqlite3.Error as error:
print(error)
def get_banned_users_from_db(self):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist", )
users = result.fetchall()
return users
except sqlite3.Error as error:
print(error)
def get_banned_users_from_db_with_limits(self, offset: int, limit: int):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT user_name, user_id, message_for_user, date_to_unban FROM blacklist LIMIT ?, ?", (offset, limit,) )
users = result.fetchall()
return users
except sqlite3.Error as error:
print(error)
def close(self):
"""Закрываем соединение с БД"""
self.conn.close()