205 lines
9.5 KiB
Python
205 lines
9.5 KiB
Python
import sqlite3
|
||
import configparser
|
||
import os
|
||
import sys
|
||
|
||
config_path = os.path.join(sys.path[0], 'settings.ini')
|
||
config = configparser.ConfigParser()
|
||
config.read(config_path)
|
||
LOGS = config.getboolean('Settings', 'logs')
|
||
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
|
||
|
||
|
||
class BotDB:
|
||
|
||
def __init__(self, db_file):
|
||
self.conn = sqlite3.connect(db_file, check_same_thread=False)
|
||
self.cursor = self.conn.cursor()
|
||
|
||
def get_message_from_db(self, type: str, username):
|
||
"""Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются:
|
||
type - тип получаемой обратной связи, строковое значение, сохраненное в БД
|
||
username - имя пользователя
|
||
"""
|
||
# Подключаемся к базе
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"SELECT * FROM messages WHERE type=?", (type,))
|
||
# Забираем данные из таблицы, преобразуем в строку, заменяем поле username на имя пользователя,
|
||
# и вместо амберсанда подставляем перенос строки
|
||
if type == 'connect_with_admin' or type == 'del_message' or type == 'suggest_news' or type == 'start_message':
|
||
response_from_database = str(cursor.fetchone()[1]).replace('username', username).replace('&', '\n')
|
||
else:
|
||
response_from_database = str(cursor.fetchone()[1]).replace('&', '\n')
|
||
return response_from_database
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_error_message_from_db(self, id: int):
|
||
"""Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются:
|
||
id - идентификатор ошибки
|
||
"""
|
||
# Подключаемся к базе
|
||
try:
|
||
cursor = self.conn.cursor()
|
||
cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,))
|
||
response_from_database = str(cursor.fetchone()[1])
|
||
return response_from_database
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def add_new_user_in_db(self, user_id, first_name, full_name, username, is_bot, language_code, date_added, date_changed):
|
||
"""Добавляем юзера в базу"""
|
||
try:
|
||
self.cursor.execute("INSERT INTO 'our_users' ('user_id', 'first_name', 'full_name', 'username', 'is_bot', "
|
||
"'language_code', 'date_added', 'date_changed') VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||
(user_id, first_name, full_name,
|
||
username,is_bot,language_code,date_added,date_changed))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
|
||
def user_exists(self, user_id):
|
||
"""Проверяем, есть ли юзер в базе"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return bool(len(result.fetchall()))
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
|
||
def get_user_id(self, user_id):
|
||
"""Достаем id юзера в базе по его user_id"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `id` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
|
||
def get_username(self, user_id):
|
||
"""Достаем id юзера в базе по его user_id"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `username` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_all_user_id(self):
|
||
"""Достаем все айдишники юзеров из БД и преобразуем их в список"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `user_id` FROM `our_users`",)
|
||
fetch_all = result.fetchall()
|
||
list_of_users = []
|
||
for i in fetch_all:
|
||
list_of_users.append(i[0])
|
||
return list_of_users
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_user_first_name(self, user_id):
|
||
try:
|
||
result = self.cursor.execute("SELECT `first_name` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
|
||
def change_name(self, user_id):
|
||
#TODO: реализовать функцию изменения имени пользователя по которому к нему обращается бот. ОБновляем поля first_name, date_changed
|
||
#result = self.cursor.execute("UPDATE 'our_users' SET (?) WHERE user_id = (?)", (new_user_name), )
|
||
pass
|
||
|
||
def add_audio_record(self, file_name, author_id, date_added, listen_count, file_id):
|
||
"""Добавляет информацию о войсе юзера в БД"""
|
||
try:
|
||
result = self.cursor.execute("INSERT INTO `audio_message_reference` (file_name, author_id, date_added, listen_count, file_id) VALUES (?, ?, ?, ?, ?)", (file_name, author_id, date_added, listen_count, file_id))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def last_date_audio(self):
|
||
"""Получаем дату последнего войса"""
|
||
try:
|
||
result = self.cursor.execute(
|
||
"SELECT `date_added` FROM `audio_message_reference` ORDER BY date_added DESC LIMIT 1")
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
|
||
def get_last_user_audio_record(self, user_id):
|
||
"""Получает данные о количестве записей пользователя"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ?", (user_id,))
|
||
return bool(len(result.fetchall()))
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_id_for_audio_record(self, user_id):
|
||
"""Получает ID аудио сообщения пользователя"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `file_id` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1",
|
||
(user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_path_for_audio_record(self, user_id):
|
||
"""Получает данные о названии файла"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `file_name` FROM `audio_message_reference` WHERE `author_id` = ? ORDER BY date_added DESC LIMIT 1", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def check_listen_audio(self, user_id):
|
||
"""Проверяет прослушано ли аудио пользователем"""
|
||
try:
|
||
query_listen_audio = self.cursor.execute(
|
||
"""SELECT l.file_name
|
||
FROM audio_message_reference a
|
||
LEFT JOIN listen_audio_users l ON l.file_name = a.file_name
|
||
WHERE l.user_id = ?
|
||
AND l.file_name IS NOT NULL""" , (user_id,))
|
||
check_sign = query_listen_audio.fetchall()
|
||
query_all_audio = self.cursor.execute('SELECT file_name FROM audio_message_reference WHERE author_id <> ?', (user_id,))
|
||
sign_all_audio = query_all_audio.fetchall()
|
||
new_sign1 = list(set(sign_all_audio) - set(check_sign))
|
||
new_sign = []
|
||
for i in new_sign1:
|
||
new_sign.append(i[0])
|
||
return new_sign
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def mark_listened_audio(self, file_name, user_id):
|
||
"""Отмечает аудио прослушанным для конкретного пользователя."""
|
||
try:
|
||
result = self.cursor.execute("INSERT INTO `listen_audio_users` (file_name, user_id, is_listen) VALUES (?, ?, ?)", (file_name, user_id, 1))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def get_info_about_stickers(self, user_id):
|
||
"""Получает данные о получении стикеров пользователем"""
|
||
try:
|
||
result = self.cursor.execute("SELECT `has_stickers` FROM `our_users` WHERE `user_id` = ?", (user_id,))
|
||
return result.fetchone()[0]
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def update_info_about_stickers(self, user_id):
|
||
"""Обновляет данные о получении стикеров пользователем"""
|
||
try:
|
||
result = self.cursor.execute("UPDATE `our_users` SET `has_stickers` = 1 WHERE `user_id` = ?", (user_id,))
|
||
return self.conn.commit()
|
||
except sqlite3.Error as error:
|
||
print(error)
|
||
|
||
def close(self):
|
||
"""Закрываем соединение с БД"""
|
||
self.conn.close()
|
||
|