Files
telegram-helper-bot/db.py
2024-07-03 23:06:10 +03:00

301 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import sqlite3
import configparser
import os
import sys
import logging
from logging.handlers import RotatingFileHandler
config_path = os.path.join(sys.path[0], 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
LOGS = config.getboolean('Settings', 'logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
# Инициализация логгера
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Формат записи логов
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(message)s')
# Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d')
filename = f'helper_bot_{today}.log'
# Создание обработчика для файла логов
file_handler = RotatingFileHandler(
filename,
mode='a',
maxBytes=10 * 1024 * 1024, # Максимальный размер файла (10 МБ)
backupCount=3 # Количество резервных файлов
)
file_handler.setFormatter(formatter)
# Добавление обработчика к логгеру
logger.addHandler(file_handler)
# Добавление стандартного обработчика
# чтобы сообщения также отображались на консоли
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
class BotDB:
def __init__(self, db_file):
self.conn = sqlite3.connect(db_file, check_same_thread=False)
self.cursor = self.conn.cursor()
# TODO: Deprecated, удалить
def get_message_from_db(self, type: str, username):
"""Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются:
type - тип получаемой обратной связи, строковое значение, сохраненное в БД
username - имя пользователя
"""
# Подключаемся к базе
try:
cursor = self.conn.cursor()
cursor.execute(f"SELECT * FROM messages WHERE type=?", (type,))
# Забираем данные из таблицы, преобразуем в строку, заменяем поле username на имя пользователя,
# и вместо амберсанда подставляем перенос строки
if type == 'connect_with_admin' or type == 'del_message' or type == 'suggest_news' or type == 'start_message':
response_from_database = str(cursor.fetchone()[1]).replace('username', username).replace('&', '\n')
else:
response_from_database = str(cursor.fetchone()[1]).replace('&', '\n')
return response_from_database
except sqlite3.Error as error:
print(error)
# TODO: Deprecated. Остался только в voice боте, удалить и оттуда
def get_error_message_from_db(self, id: int):
"""Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
id - идентификатор ошибки
"""
# Подключаемся к базе
try:
cursor = self.conn.cursor()
cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
response_from_database = str(cursor.fetchone()[1])
return response_from_database
except sqlite3.Error as error:
print(error)
def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added,
date_changed):
"""Добавляем юзера в базу"""
try:
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(user_id, first_name, full_name,
username, is_bot, language_code, date_added, date_changed))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def user_exists(self, user_id):
"""Проверяем, есть ли юзер в базе"""
try:
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return bool(len(result.fetchall()))
except sqlite3.Error as error:
print(error)
def get_user_id(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_username(self, user_id):
"""Достаем id юзера в базе по его user_id"""
try:
result = self.cursor.execute("SELECT `username` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_all_user_id(self):
"""Достаем все айдишники юзеров из БД и преобразуем их в список"""
try:
result = self.cursor.execute("SELECT `user_id` FROM `our_users`", )
fetch_all = result.fetchall()
list_of_users = []
for i in fetch_all:
list_of_users.append(i[0])
return list_of_users
except sqlite3.Error as error:
print(error)
def get_user_first_name(self, user_id):
try:
result = self.cursor.execute("SELECT `first_name` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def change_name(self, user_id):
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
pass
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
"""Добавляет информацию о войсе юзера в БД"""
try:
result = self.cursor.execute(
"INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)",
(file_name, author_id, date_added, listen_count, file_id))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def last_date_audio(self):
"""Получаем дату последнего войса"""
try:
result = self.cursor.execute(
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_last_user_audio_record(self, user_id):
"""Получает данные о количестве записей пользователя"""
try:
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?",
(user_id,))
return bool(len(result.fetchall()))
except sqlite3.Error as error:
print(error)
def get_id_for_audio_record(self, user_id):
"""Получает ID аудио сообщения пользователя"""
try:
result = self.cursor.execute(
"SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def get_path_for_audio_record(self, user_id):
"""Получает данные о названии файла"""
try:
result = self.cursor.execute(
"SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
(user_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def check_listen_audio(self, user_id):
"""Проверяет прослушано ли аудио пользователем"""
try:
query_listen_audio = self.cursor.execute(
"""SELECT l.file_name
FROM audio_message_reference a
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
WHERE l.user_id = ?
AND l.file_name IS NOT NULL""", (user_id,))
check_sign = query_listen_audio.fetchall()
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?',
(user_id,))
sign_all_audio = query_all_audio.fetchall()
new_sign1 = list(set(sign_all_audio) - set(check_sign))
new_sign = []
for i in new_sign1:
new_sign.append(i[0])
return new_sign
except sqlite3.Error as error:
print(error)
def mark_listened_audio(self, file_name, user_id):
"""Отмечает аудио прослушанным для конкретного пользователя."""
try:
result = self.cursor.execute(
"INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)",
(file_name, user_id, 1))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def get_info_about_stickers(self, user_id):
"""Получает данные о получении стикеров пользователем"""
try:
result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,))
return result.fetchone()[0] == 1
#return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def update_info_about_stickers(self, user_id):
"""Обновляет данные о получении стикеров пользователем"""
try:
result = self.cursor.execute("UPDATE `our_users` SET `has_stickers` = 1 WHERE `user_id` = ?", (user_id,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def get_users_blacklist(self):
"""Возвращает список пользователей в черном списке"""
try:
result = self.cursor.execute("SELECT user_id, user_name FROM `blacklist`")
fetch_all = result.fetchall()
list_of_users = {}
for i in fetch_all:
list_of_users[i[0]] = i[1]
return list_of_users
except sqlite3.Error as error:
print(error)
def get_blacklist_users_by_filters(self):
"""Возвращает список пользователей в черном списке по фильтру"""
return None
def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None):
"""Добавляет пользователя в черный список"""
try:
result = self.cursor.execute("INSERT INTO 'blacklist' ('user_id', 'user_name',"
" 'message_for_user', 'date_to_unban') VALUES (?, ?, ?, ?)",
(user_id, user_name, message_for_user, date_to_unban,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def delete_user_blacklist(self, user_id):
"""Удаляет пользователя из черного списка"""
try:
#TODO: Функция всегда возвращает true, даже если такого id нет в таблице
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit()
logger.info(f"User with ID {user_id} successfull delete.")
return True
except sqlite3.Error as error:
logger.error(f"Error delete user with ID {user_id} from blacklist table: {error}")
return False
def add_new_message_in_db(self, message_text, user_id, message_id, date, has_answer):
"""Добавляем сообщение юзера в базу"""
try:
self.cursor.execute(
"INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) VALUES (?, ?, ?, ?, ?)",
(message_text, message_id, user_id, date, has_answer))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def get_user_by_message_id(self, message_id):
"""Возвращает идентификатор пользователя по идентификатору сообщения"""
try:
result = self.cursor.execute("SELECT user_id FROM `user_messages` WHERE message_id = ?", (message_id,))
return result.fetchone()[0]
except sqlite3.Error as error:
print(error)
def close(self):
"""Закрываем соединение с БД"""
self.conn.close()