Добавил новые функции #1

Merged
KerradKerridi merged 10 commits from dev into master 2024-07-06 10:07:34 +00:00
2 changed files with 175 additions and 76 deletions
Showing only changes of commit 1aed0c0c4f - Show all commits

93
db.py
View File

@@ -1,10 +1,8 @@
import datetime
import sqlite3
import configparser
import os
import sys
import logging
from logging.handlers import RotatingFileHandler
import custom_logger
config_path = os.path.join(sys.path[0], 'settings.ini')
config = configparser.ConfigParser()
@@ -12,34 +10,7 @@ config.read(config_path)
LOGS = config.getboolean('Settings', 'logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
# Инициализация логгера
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Формат записи логов
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(filename)s - %(lineno)d - %(message)s')
# Получение сегодняшней даты для имени файла
today = datetime.date.today().strftime('%Y-%m-%d')
filename = f'helper_bot_{today}.log'
# Создание обработчика для файла логов
file_handler = RotatingFileHandler(
filename,
mode='a',
maxBytes=10 * 1024 * 1024, # Максимальный размер файла (10 МБ)
backupCount=3 # Количество резервных файлов
)
file_handler.setFormatter(formatter)
# Добавление обработчика к логгеру
logger.addHandler(file_handler)
# Добавление стандартного обработчика
# чтобы сообщения также отображались на консоли
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
db_logger = custom_logger.Logger('database')
class BotDB:
@@ -254,6 +225,12 @@ class BotDB:
"""Возвращает список пользователей в черном списке по фильтру"""
return None
def check_user_in_blacklist(self, user_id):
"""Проверяет, существует ли запись с данным user_id в blacklist."""
self.cursor.execute("SELECT 1 FROM blacklist WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def set_user_blacklist(self, user_id, user_name=None, message_for_user=None, date_to_unban=None):
"""Добавляет пользователя в черный список"""
try:
@@ -262,7 +239,7 @@ class BotDB:
(user_id, user_name, message_for_user, date_to_unban,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
return error
def delete_user_blacklist(self, user_id):
"""Удаляет пользователя из черного списка"""
@@ -270,10 +247,10 @@ class BotDB:
#TODO: Функция всегда возвращает true, даже если такого id нет в таблице
self.cursor.execute("DELETE FROM blacklist WHERE user_id = ?", (user_id,))
self.conn.commit()
logger.info(f"User with ID {user_id} successfull delete.")
db_logger.info(f"User with ID {user_id} successfull delete.")
return True
except sqlite3.Error as error:
logger.error(f"Error delete user with ID {user_id} from blacklist table: {error}")
db_logger.error(f"Error delete user with ID {user_id} from blacklist table: {str(error)}")
return False
def add_new_message_in_db(self, message_text, user_id, message_id, date, has_answer):
@@ -281,7 +258,8 @@ class BotDB:
try:
self.cursor.execute(
"INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) VALUES (?, ?, ?, ?, ?)",
"INSERT INTO `user_messages` (message_text, user_id, message_id, date, has_answer) "
"VALUES (?, ?, ?, ?, ?)",
(message_text, message_id, user_id, date, has_answer))
return self.conn.commit()
except sqlite3.Error as error:
@@ -289,11 +267,44 @@ class BotDB:
def update_date_for_user(self, date, user_id: int):
try:
result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?", (date, user_id,))
result = self.cursor.execute("UPDATE `our_users` SET `date_changed` = ? WHERE `user_id` = ?",
(date, user_id,))
return self.conn.commit()
except sqlite3.Error as error:
print(error)
def is_admin(self, user_id):
"""
Проверяет, является ли пользователь администратором.
Args:
user_id: ID пользователя Telegram.
Returns:
True, если пользователь администратор, иначе False.
"""
self.cursor.execute("SELECT 1 FROM admins WHERE user_id = ?", (user_id,))
result = self.cursor.fetchone()
return bool(result)
def add_admin(self, user_id, username=None):
"""
Добавляет пользователя в список администраторов.
Args:
user_id: ID пользователя Telegram.
username: Username пользователя (необязательно).
"""
self.cursor.execute("INSERT INTO admins (user_id, username) VALUES (?, ?)", (user_id, username))
return self.conn.commit()
def remove_admin(self, user_id):
"""
Удаляет пользователя из списка администраторов.
Args:
user_id: ID пользователя Telegram.
"""
self.cursor.execute("DELETE FROM admins WHERE user_id = ?", (user_id,))
return self.conn.commit()
def get_user_by_message_id(self, message_id):
"""Возвращает идентификатор пользователя по идентификатору сообщения"""
try:
@@ -302,6 +313,16 @@ class BotDB:
except sqlite3.Error as error:
print(error)
def get_last_users_from_bot(self):
"""Возвращает список идентификаторов последних 10 пользователей обращавшихся в бот"""
try:
result = self.cursor.execute("SELECT full_name, user_id FROM our_users ORDER BY date_changed DESC LIMIT 10")
users = result.fetchall()
user_dict = {user[1]: user[0] for user in users}
return user_dict
except sqlite3.Error as error:
print(error)
def close(self):
"""Закрываем соединение с БД"""
self.conn.close()

148
main.py
View File

@@ -8,7 +8,7 @@ import db
from db import BotDB
import telebot
import random
from datetime import datetime
from datetime import datetime, timedelta
import time
from telebot import types
from telebot.apihelper import ApiTelegramException
@@ -56,36 +56,29 @@ class TelegramHelperBot:
def handle_message(message):
if self.state == State.START:
if message.text == '/start':
print(f'Внутри функции handle_message // Команда /start // state - {self.state.value}')
self.start_message(message)
elif message.text == '📢Предложить свой пост':
print(f'Внутри функции handle_message // Команда /suggest // state - {self.state.value}')
self.suggest_post(message)
self.state = State.SUGGEST
elif message.text == '🤪Хочу стикеры':
print(f'Внутри функции handle_message // Команда /stickers // state - {self.state.value}')
self.stickers(message)
self.state = State.START
elif message.text == '📩Связаться с админами':
print(f'Внутри функции handle_message // Команда /connect // state - {self.state.value}')
self.connect_with_admin(message)
self.state = State.PRE_CHAT
print(f'В state.START - {self.state.value}')
elif message.text == '👋🏼Сказать пока!':
print(f'Внутри функции handle_message // Команда /end // state - {self.state.value}')
self.end_message(message)
self.state = State.START
elif message.text == 'Выйти из чата':
print(f'Внутри функции handle_message // Команда /end // state - {self.state.value}')
self.end_message(message)
elif message.text == '/admin':
self.state = State.ADMIN
#TODO: Админку сделать!
self.bot.send_message(message.chat.id,
"Ты в админке, Ура! Делай что хочешь")
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
self.state = State.ADMIN
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
elif message.text == '/state':
print(f'Внутри функции handle_message // Команда /state // state - {self.state.value}')
self.bot.send_message(message.chat.id,
f'Твой state == {self.state.value}')
else:
@@ -101,15 +94,20 @@ class TelegramHelperBot:
self.state = State.START
if self.state == State.CHAT:
print(f'В state.CHAT - {self.state.value}')
if message.text == 'Выйти из чата':
self.state = State.START
self.end_message(message)
print(f'Обработчик в CHAT - {self.state.value}')
else:
print(f'Обработчик чата в чате CHAT - {self.state.value}')
self.resend_message_in_group_for_message(message)
if self.state == State.ADMIN:
if message == '/admin' or message == '/restart':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
@self.bot.message_handler(func=lambda message: True, chat_types=['group'])
def handle_message(message):
"""Функция ответа админа пользователю через закрытый чат"""
@@ -152,6 +150,9 @@ class TelegramHelperBot:
if LOGS:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
elif call.data[:3] == 'ban':
user_id = call.data[4:]
self.ban_user(call.message, user_id)
def register_chat_handler(self, message):
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
@@ -166,29 +167,96 @@ class TelegramHelperBot:
# Черный список
def admin_panel(self, message):
try:
#Добавить админа: 842766148
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Забанить пользователя из списка")
item2 = types.KeyboardButton("Забанить пользователя по ID")
markup.add(item1, item2)
self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user)
item1 = types.KeyboardButton("Бан (Список)")
item2 = types.KeyboardButton("Добавить админа")
item3 = types.KeyboardButton("Удалить админа")
markup.add(item1, item2, item3)
self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.handle_admin_message)
except Exception as e:
self.bot.register_next_step_handler(message, self.admin_panel)
def ban_user(self, message):
# проверяем, что ID передан правильно
#TODO: остановился где-то тут, функция не дописана. Хотел сделать админку + бан пользователей
def handle_admin_message(self, message):
try:
if message.text == "Забанить пользователя из списка":
pass
elif message.text == "Забанить пользователя по ID":
self.bot.send_message(message.chat.id, "Пришли ID юзера")
#BotDB.set_user_blacklist(ban_user_id)
#bot.reply_to(message, f"Пользователь {ban_user_id} заблокирован.")
self.get_last_users(message)
except Exception as e:
self.bot.reply_to(message, f"Укажи ID пользователя. Ошибка\n\n {e}")
self.bot.register_next_step_handler(message, self.ban_user)
self.bot.reply_to(message, f"Ошибка\n\n {e}")
self.admin_panel(message)
def ban_user(self, message, user_id: int):
user_name = BotDB.get_username(user_id=user_id)
ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None}
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Спам")
item2 = types.KeyboardButton("Заебал стикерами")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object)
def ban_user_step_2(self, message, ban_object: dict):
ban_object['message_for_user'] = message.text
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("1")
item2 = types.KeyboardButton("7")
item3 = types.KeyboardButton("30")
item4 = types.KeyboardButton("Навсегда")
markup.add(item1, item2, item3, item4)
self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
f"его в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object)
def ban_user_step_3(self, message, ban_object: dict):
date_to_unban = None
if message.text != 'Навсегда':
date_to_unban = self.add_days_to_date(message.text)
else:
pass
ban_object['date_to_unban'] = date_to_unban
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Подтвердить")
item2 = types.KeyboardButton("Отменить")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}",
parse_mode='html',
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object)
def ban_user_final_step(self, message, ban_object: dict):
if message.text == 'Подтвердить':
exists = BotDB.check_user_in_blacklist(ban_object['user_id'])
if exists:
self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.")
self.admin_panel(message)
else:
result = BotDB.set_user_blacklist(ban_object['user_id'],
ban_object['user_name'],
ban_object['message_for_user'],
ban_object['date_to_unban'])
if result:
self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.")
self.admin_panel(message)
else:
self.bot.reply_to(message, f"Произошла ошибка при блокировке пользователя.")
self.admin_panel(message)
def get_last_users(self, message):
list_users = BotDB.get_last_users_from_bot()
markup = types.InlineKeyboardMarkup(row_width=1)
for user_id, full_name in list_users.items():
button = types.InlineKeyboardButton(
f"{full_name}", callback_data=f'ban_{user_id}'
)
# Добавляем кнопки на клавиатуру
markup.add(button)
self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту",
reply_markup=markup)
def start_message(self, message):
try:
@@ -230,7 +298,6 @@ class TelegramHelperBot:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
#TODO: При отправке более одного сообщения, не пересылает сообщение в чат, БАГ!!!!
def resend_message_in_group_for_message(self, message):
self.bot.forward_message(chat_id=GROUP_FOR_MESSAGE,
from_chat_id=message.chat.id,
@@ -401,11 +468,22 @@ class TelegramHelperBot:
def __get_first_name(message):
return message.from_user.first_name
@staticmethod
def check_access(user_id: int):
"""Проверка прав на совершение действий"""
return BotDB.is_admin(user_id)
@staticmethod
def add_days_to_date(days):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=int(days))
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
bot = TelegramHelperBot(BOT_TOKEN)
if __name__ == "__main__":
# Запускаем бота
bot.start()