diff --git a/helper_bot/__init__.py b/helper_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/helper_bot/helper_bot.py b/helper_bot/helper_bot.py new file mode 100644 index 0000000..8e4b8f6 --- /dev/null +++ b/helper_bot/helper_bot.py @@ -0,0 +1,615 @@ +from datetime import datetime, timedelta +import random +import traceback +from enum import Enum +from pathlib import Path +from time import sleep +import telebot +from telebot import types +from telebot.apihelper import ApiTelegramException +import messages + + +class State(Enum): + START = "START" + SUGGEST = "SUGGEST" + ADMIN = "ADMIN" + CHAT = "CHAT" + PRE_CHAT = "PRE_CHAT" + + +class TelegramHelperBot: + def __init__(self, dependency_factory): + self.state = State.START + self.BotDB = dependency_factory.get_database() + self.settings = dependency_factory.get_settings() + token = self.settings['Telegram']['bot_token'] + self.GROUP_FOR_POST = self.settings['Telegram']['group_for_posts'] + self.GROUP_FOR_MESSAGE = self.settings['Telegram']['group_for_message'] + self.MAIN_PUBLIC = self.settings['Telegram']['main_public'] + self.GROUP_FOR_LOGS = self.settings['Telegram']['group_for_logs'] + self.IMPORTANT_LOGS = self.settings['Telegram']['important_logs'] + self.PREVIEW_LINK = self.settings['Telegram']['preview_link'] + self.LOGS = self.settings['Settings']['logs'] + self.TEST = self.settings['Settings']['test'] + self.bot = telebot.TeleBot(token) + + # Router for user + @self.bot.message_handler(func=lambda message: True, chat_types=['private']) + def handle_message(message): + if self.BotDB.check_user_in_blacklist(message.from_user.id): + attribute = self.BotDB.get_blacklist_users_by_id(message.from_user.id) + self.bot.send_message(message.chat.id, + f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}', parse_mode='HTML') + return + if self.state == State.START: + if message.text == '/start': + self.start_message(message) + elif message.text == '📢Предложить свой пост': + self.suggest_post(message) + self.state = State.SUGGEST + elif message.text == '🤪Хочу стикеры': + self.stickers(message) + self.state = State.START + elif message.text == '📩Связаться с админами': + self.connect_with_admin(message) + self.state = State.PRE_CHAT + elif message.text == '👋🏼Сказать пока!': + self.end_message(message) + self.state = State.START + elif message.text == 'Выйти из чата': + self.end_message(message) + elif message.text == '/admin': + access = self.check_access(message.from_user.id) + if access: + self.admin_panel(message) + self.state = State.ADMIN + else: + self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') + elif message.text == '/state': + self.bot.send_message(message.chat.id, + f'Твой state == {self.state.value}') + else: + self.bot.send_message(message.chat.id, + #TODO: Здесь раньше был /state + "Не понимаю где ты находишься. Нажми /start, и я перезапущусь") + + if self.state == State.SUGGEST: + self.bot.register_next_step_handler(message, self.send_to_suggest) + self.state = State.START + if message.text == '/start': + self.state = State.START + self.start_message(message) + if self.state == State.PRE_CHAT: + self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message) + self.state = State.START + if message.text == '/start': + self.state = State.START + self.start_message(message) + + if self.state == State.CHAT: + if message.text == 'Выйти из чата': + self.state = State.START + self.end_message(message) + elif message.text == '/start': + self.state = State.START + self.start_message(message) + else: + self.resend_message_in_group_for_message(message) + + if self.state == State.ADMIN: + if message == '/admin' or message == '/restart' or message == 'Вернуться в админку': + access = self.check_access(message.from_user.id) + if access: + self.admin_panel(message) + else: + self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') + if message.text == '/start': + self.state = State.START + self.start_message(message) + + @self.bot.message_handler(func=lambda message: True, chat_types=['group']) + def handle_message(message): + """Функция ответа админа пользователю через закрытый чат""" + self.state = State.CHAT + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + item1 = types.KeyboardButton("Выйти из чата") + markup.add(item1) + message_id = 0 + try: + message_id = message.reply_to_message.id + except AttributeError: + self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!') + message_from_admin = message.text + try: + chat_id = self.BotDB.get_user_by_message_id(message_id) + self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) + except TypeError: + self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.') + + # Админка + @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) + def post_for_group(call): + if call.data == 'publish' and call.message.content_type == 'text': + try: + self.bot.send_message(chat_id=self.MAIN_PUBLIC, text=call.message.text) + self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) + except Exception as e: + if self.LOGS: + self.bot.send_message(chat_id=self.IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + elif call.data == 'publish' and call.message.content_type == 'photo': + try: + self.bot.send_photo( + chat_id=self.MAIN_PUBLIC, + caption=call.message.caption, + photo=call.message.photo[-1].file_id, + ) + self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) + except Exception as e: + if self.LOGS: + self.bot.send_message(chat_id=self.IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + elif call.data == 'decline': + try: + self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) + except Exception as e: + if self.LOGS: + self.bot.send_message(self.IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + @self.bot.callback_query_handler(func=lambda call: True) + def pagination(call): + if call.data[:3] == 'ban': + user_id = call.data[4:] + self.ban_user(call.message, user_id) + if call.data == 'return': + self.bot.delete_message(call.message.chat.id, call.message.message_id) + self.admin_panel(call.message) + + if call.data[:5] == 'unban': + self.delete_user_blacklist(call.data[6:]) + msg = f'Успешно удалено.' + self.bot.send_message(chat_id=call.message.chat.id, text=msg) + elif call.data[:4] == 'page': + if call.message.text == 'Список пользователей которые последними обращались к боту': + list_users = self.BotDB.get_last_users_from_db() + keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(list_users), list_users, + 'ban') + self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, + reply_markup=keyboard) + if "Список заблокированных пользователей".lower() in call.message.text.lower(): + #Готовим сообщения + message_user = self.get_banned_users_list(int(call.data[5:]) * 7 - 7) + self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, + text=message_user) + + #Готовим клавиатуру + buttons = self.get_banned_users_buttons() + keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban') + self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, + reply_markup=keyboard) + + def start(self): + while True: + try: + self.bot.polling(none_stop=True) + except (ConnectionError, Exception): + print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}") + + def unban_notifier(self): + # Получение сегодняшней даты в формате DD-MM-YYYY + current_date = datetime.now() + print('Мы в функции unban_notifier') + today = current_date.strftime("%d-%m-%Y") + # Получение списка разблокированных пользователей + unblocked_users = self.BotDB.get_users_for_unblock_today(today) + message = "Разблокированные пользователи:\n" + for user_id, user_name in unblocked_users.items(): + message += f"ID: {user_id}, Имя: {user_name}\n" + + # Отправка сообщения в канал + self.bot.send_message(self.GROUP_FOR_MESSAGE, message) + + # Черный список + def admin_panel(self, message): + try: + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + item1 = types.KeyboardButton("Бан (Список)") + #item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю + #item3 = types.KeyboardButton("Удалить админа") + item4 = types.KeyboardButton("Разбан (список)") + item5 = types.KeyboardButton("Вернуться в бота") + markup.add(item1, item4, item5) + self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", + reply_markup=markup) + self.bot.register_next_step_handler(message, self.handle_admin_message) + except Exception as e: + self.bot.register_next_step_handler(message, self.admin_panel) + + def handle_admin_message(self, message): + try: + if message.text == "Бан (Список)": + self.get_last_users(message) + elif message.text == "Разбан (список)": + self.get_banned_users(message) + elif message.text == "Вернуться в бота": + self.start_message(message) + except Exception as e: + self.bot.reply_to(message, f"Ошибка\n\n {e}") + self.admin_panel(message) + + def ban_user(self, message, user_id: int): + user_name = self.BotDB.get_username(user_id=user_id) + ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None} + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + item1 = types.KeyboardButton("Спам") + item2 = types.KeyboardButton("Заебал стикерами") + markup.add(item1, item2) + self.bot.send_message(message.chat.id, + f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат", + reply_markup=markup) + self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object) + + def ban_user_step_2(self, message, ban_object: dict): + ban_object['message_for_user'] = message.text + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + item1 = types.KeyboardButton("1") + item2 = types.KeyboardButton("7") + item3 = types.KeyboardButton("30") + item4 = types.KeyboardButton("Навсегда") + markup.add(item1, item2, item3, item4) + self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " + f"его в чат", + reply_markup=markup) + self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object) + + def ban_user_step_3(self, message, ban_object: dict): + date_to_unban = None + if message.text != 'Навсегда': + date_to_unban = self.add_days_to_date(message.text) + else: + pass + ban_object['date_to_unban'] = date_to_unban + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + item1 = types.KeyboardButton("Подтвердить") + item2 = types.KeyboardButton("Отменить") + markup.add(item1, item2) + self.bot.send_message(message.chat.id, + f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}", + parse_mode='html', + reply_markup=markup) + self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object) + + def ban_user_final_step(self, message, ban_object: dict): + if message.text == 'Подтвердить': + exists = self.BotDB.check_user_in_blacklist(ban_object['user_id']) + if exists: + self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.") + self.admin_panel(message) + else: + self.BotDB.set_user_blacklist(ban_object['user_id'], + ban_object['user_name'], + ban_object['message_for_user'], + ban_object['date_to_unban']) + self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.") + self.admin_panel(message) + + def get_last_users(self, message): + list_users = self.BotDB.get_last_users_from_db() + keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') + self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту", + reply_markup=keyboard) + + def get_banned_users(self, message): + message_text = self.get_banned_users_list(0) + buttons_list = self.get_banned_users_buttons() + if buttons_list: + k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban') + self.bot.send_message(message.chat.id, message_text, reply_markup=k) + else: + self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет") + self.admin_panel(message) + + def start_message(self, message): + try: + name_stick_hello = list(Path('Stick').rglob('Hello_*')) + random_stick_hello = open(random.choice(name_stick_hello), 'rb') + # logging + if self.LOGS: + self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, + from_chat_id=message.chat.id, + message_id=message.message_id) + self.bot.send_sticker(message.chat.id, random_stick_hello) + sleep(0.3) + except Exception as e: + print(f'{str(e)}') + if self.LOGS: + self.bot.send_message(self.IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + try: + user_id = message.from_user.id + first_name = message.from_user.first_name + full_name = message.from_user.full_name + is_bot = message.from_user.is_bot + username = message.from_user.username + language_code = message.from_user.language_code + current_date = datetime.now() + date = current_date.strftime("%Y-%m-%d %H:%M:%S") + if not self.BotDB.user_exists(user_id): + self.BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, + date) + self.BotDB.update_date_for_user(date, user_id) + markup = self.get_reply_keyboard(message) + hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE') + self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, + disable_web_page_preview=not self.PREVIEW_LINK) + except Exception as e: + print(f'{str(e)}') + if self.LOGS: + self.bot.send_message(self.IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + def resend_message_in_group_for_message(self, message): + self.bot.forward_message(chat_id=self.GROUP_FOR_MESSAGE, + from_chat_id=message.chat.id, + message_id=message.message_id + ) + current_date = datetime.now() + date = current_date.strftime("%Y-%m-%d %H:%M:%S") + self.BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) + question = messages.get_message(self.__get_first_name(message), 'QUESTION') + markup = self.get_reply_keyboard(message) + self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not self.PREVIEW_LINK, + reply_markup=markup) + + def suggest_post(self, message): + try: + markup = types.ReplyKeyboardRemove() + suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS') + self.bot.send_message(message.chat.id, suggest_news, parse_mode='html') + sleep(0.3) + suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2') + self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup) + except Exception as e: + self.bot.send_message(self.IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + # logging + if self.LOGS: + self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, + from_chat_id=message.chat.id, + message_id=message.message_id) + + def stickers(self, message): + self.BotDB.update_info_about_stickers(user_id=message.from_user.id) + markup = self.get_reply_keyboard(message) + try: + self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, + from_chat_id=message.chat.id, + message_id=message.message_id) + self.bot.send_message(message.chat.id, + text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', + reply_markup=markup) + except ApiTelegramException as e: + self.bot.send_message(chat_id=self.IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + def connect_with_admin(self, message): + connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN') + self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html") + # logging + if self.LOGS: + self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, + from_chat_id=message.chat.id, + message_id=message.message_id) + + def end_message(self, message): + try: + name_stick_bye = list(Path('Stick').rglob('Universal_*')) + random_stick_bye = open(random.choice(name_stick_bye), 'rb') + self.bot.send_sticker(message.chat.id, random_stick_bye) + except ApiTelegramException as e: + if self.LOGS: + self.bot.send_message(chat_id=self.IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + markup = types.ReplyKeyboardRemove() + try: + bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE') + self.bot.send_message(message.chat.id, bye_message, + parse_mode='html', reply_markup=markup, disable_web_page_preview=not self.PREVIEW_LINK) + except Exception as e: + if self.LOGS: + self.bot.send_message(chat_id=self.IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + if self.LOGS: + # logging + self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, + from_chat_id=message.chat.id, + message_id=message.message_id) + + def send_to_suggest(self, message): + markup = types.InlineKeyboardMarkup(row_width=1) + item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish') + item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline') + markup.add(item1, item2) + try: + if message.content_type == 'text': + post_text = message.text.lower() + if post_text.find('неанон') != -1 or post_text.find('не анон') != -1: + self.bot.send_message( + # TODO: GROUP_FOR_POST + chat_id=self.GROUP_FOR_POST, + text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', + reply_markup=markup + ) + elif post_text.find('анон') != -1: + self.bot.send_message( + # TODO: GROUP_FOR_POST + chat_id=self.GROUP_FOR_POST, + text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно', + reply_markup=markup + ) + else: + self.bot.send_message( + # TODO: GROUP_FOR_POST + chat_id=self.GROUP_FOR_POST, + text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', + reply_markup=markup + ) + elif message.content_type == 'photo' and message.media_group_id is None: + post_text_for_photo = message.caption.lower() + if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1: + self.bot.send_photo( + # TODO: GROUP_FOR_POST + chat_id=self.GROUP_FOR_POST, + caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', + photo=message.photo[-1].file_id, + reply_markup=markup + ) + elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1: + self.bot.send_photo( + # TODO: GROUP_FOR_POST + chat_id=self.GROUP_FOR_POST, + caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно', + photo=message.photo[-1].file_id, + reply_markup=markup + ) + else: + self.bot.send_photo( + # TODO: GROUP_FOR_POST + chat_id=self.GROUP_FOR_POST, + caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', + photo=message.photo[-1].file_id, + reply_markup=markup + ) + # TODO: Не понятна реализация с альбомами от слова совсем + # elif message.content_type == 'photo' and message.media_group_id != None: + # bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id ) + else: + pass + except Exception as e: + if self.LOGS: + self.bot.send_message(chat_id=self.IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + markup_for_user = self.get_reply_keyboard(message) + success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') + self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', + disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user) + + + def get_reply_keyboard(self, message): + markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) + item1 = types.KeyboardButton("📢Предложить свой пост") + item2 = types.KeyboardButton("📩Связаться с админами") + item3 = types.KeyboardButton("👋🏼Сказать пока!") + #TODO: Есть ощущение что не совсем так работает как надо + item4 = types.KeyboardButton("🤪Хочу стикеры") if not self.BotDB.get_info_about_stickers( + user_id=message.from_user.id) else None + + if item4: + markup.add(item1, item2, item3, item4) + else: + markup.add(item1, item2, item3) + + return markup + + @staticmethod + def __get_first_name(message): + return message.from_user.first_name + + def check_access(self, user_id: int): + """Проверка прав на совершение действий""" + return self.BotDB.is_admin(user_id) + + @staticmethod + def add_days_to_date(days): + """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" + current_date = datetime.now() + future_date = current_date + timedelta(days=int(days)) + formatted_date = future_date.strftime("%d-%m-%Y") + return formatted_date + + @staticmethod + def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str): + """ + Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback + + Args: + page: Номер текущей страницы. + total_items: Общее количество элементов. + array_items: Лист кортежей. Содержит в себе user_name: user_id + callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) + + Returns: + InlineKeyboardMarkup: Клавиатура с кнопками пагинации. + """ + + # Определяем общее количество страниц + total_pages = (total_items + 7 - 1) // 7 + + page = page + # Создаем список кнопок + buttons = [] + # Вычисляем стартовый номер для текущей страницы + start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы + # Кнопки с номерами страниц + for i in range(start_index, min(start_index + 7, + len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы + buttons.append( + types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}")) + + # Добавляем кнопки "Предыдущая" и "Следующая" + if int(page) > 1: + buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}")) + if page < total_pages: + buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}")) + #Добавляем кнопку назад + buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return")) + # Формируем клавиатуру с 3 кнопками в ряд + keyboard = [] + for i in range(0, len(buttons), 3): + keyboard.append(buttons[i:i + 3]) + return types.InlineKeyboardMarkup(keyboard) + + + def get_banned_users_list(self, offset: int): + """ + Возвращает сообщение со списком пользователей и словарь с ником + идентификатором + + Args: + offset: отступ для запроса в базу данных + + Returns: + message - текст сообщения + user_ids - лист кортежей [(user_name: user_id)] + """ + users = self.BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset) + message = "Список заблокированных пользователей:\n" + + for user in users: + message += f"Пользователь: {user[0]}\n" + message += f"Причина бана: {user[2]}\n" + message += f"Дата разбана: {user[3]}\n\n" + return message + + def get_banned_users_buttons(self): + """ + Возвращает сообщение со списком пользователей и словарь с ником + идентификатором + + Args: + offset: отступ для запроса в базу данных + + Returns: + message - текст сообщения + user_ids - лист кортежей [(user_name: user_id)] + """ + users = self.BotDB.get_banned_users_from_db() + user_ids = [] + + for user in users: + user_ids.append((user[0], user[1])) + return user_ids + + def delete_user_blacklist(self, user_id): + return self.BotDB.delete_user_blacklist(user_id=user_id) diff --git a/main.py b/main.py index fed9481..0c6f69b 100644 --- a/main.py +++ b/main.py @@ -1,647 +1,59 @@ import configparser import os import sys -from pathlib import Path -from time import sleep -from enum import Enum from database.db import BotDB -import telebot -import random -from datetime import datetime, timedelta -from telebot import types -from telebot.apihelper import ApiTelegramException -import messages -import traceback +from helper_bot.helper_bot import TelegramHelperBot +from loguru import logger +from logs.custom_logger import Logger + +#TODO: Дублируются логи с этого класса почему-то. Импортировал все так же +main_logger = Logger(name='main') + #TODO: Добавить проверку можно ли отвечать пользователю? Сейчас если у него скрыто лс, ему похоже не приходят сообщения #TODO Подумать над реализацией функционала с поступлениями в колледжи #TODO: Покрыть все логированием и ошибками корректными. Ерроры кидать в чат. #TODO: Покрыть все тестами -# Настройки -config_path = os.path.join(sys.path[0], 'settings.ini') -config = configparser.ConfigParser() -config.read(config_path) -# TELEGRAM -BOT_TOKEN = config.get('Telegram', 'BOT_TOKEN') -GROUP_FOR_POST = config.get('Telegram', 'group_for_posts') -GROUP_FOR_MESSAGE = config.get('Telegram', 'group_for_message') -MAIN_PUBLIC = config.get('Telegram', 'main_public') -GROUP_FOR_LOGS = config.get('Telegram', 'group_for_logs') -IMPORTANT_LOGS = config.get('Telegram', 'important_logs') -PREVIEW_LINK = config.getboolean('Telegram', 'PREVIEW_LINK') -# SETTINGS -LOGS = config.getboolean('Settings', 'logs') -TEST = config.getboolean('Settings', 'test') -# Инициализируем бота и базку -BotDB = BotDB(name='tg-bot-database') +class DependencyFactory: + def __init__(self): + # Загрузка настроек из settings.ini + logger.info("Initializing Dependency Factory") + config_path = os.path.join(sys.path[0], 'settings.ini') + self.config = configparser.ConfigParser() + self.config.read(config_path) + self.BotDB = BotDB('database/tg-bot-database') - -class State(Enum): - START = "START" - SUGGEST = "SUGGEST" - ADMIN = "ADMIN" - CHAT = "CHAT" - PRE_CHAT = "PRE_CHAT" - - -class TelegramHelperBot: - def __init__(self, token): - self.bot = telebot.TeleBot(token) - self.state = State.START - - # Router for user - @self.bot.message_handler(func=lambda message: True, chat_types=['private']) - def handle_message(message): - if BotDB.check_user_in_blacklist(message.from_user.id): - attribute = BotDB.get_blacklist_users_by_id(message.from_user.id) - self.bot.send_message(message.chat.id, - f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}', parse_mode='HTML') - return - if self.state == State.START: - if message.text == '/start': - self.start_message(message) - elif message.text == '📢Предложить свой пост': - self.suggest_post(message) - self.state = State.SUGGEST - elif message.text == '🤪Хочу стикеры': - self.stickers(message) - self.state = State.START - elif message.text == '📩Связаться с админами': - self.connect_with_admin(message) - self.state = State.PRE_CHAT - elif message.text == '👋🏼Сказать пока!': - self.end_message(message) - self.state = State.START - elif message.text == 'Выйти из чата': - self.end_message(message) - elif message.text == '/admin': - access = self.check_access(message.from_user.id) - if access: - self.admin_panel(message) - self.state = State.ADMIN - else: - self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') - elif message.text == '/state': - self.bot.send_message(message.chat.id, - f'Твой state == {self.state.value}') + self.settings = {} + for section in self.config.sections(): + self.settings[section] = {} + for key in self.config[section]: + # Преобразование значений в соответствующий тип + if key == 'PREVIEW_LINK': + self.settings[section][key] = self.config.getboolean(section, key) + elif key == 'LOGS' or key == 'TEST': + self.settings[section][key] = self.config.getboolean(section, key) else: - self.bot.send_message(message.chat.id, - #TODO: Здесь раньше был /state - "Не понимаю где ты находишься. Нажми /start, и я перезапущусь") + self.settings[section][key] = self.config.get(section, key) + logger.info("Initializing Dependency Factory success") - if self.state == State.SUGGEST: - self.bot.register_next_step_handler(message, self.send_to_suggest) - self.state = State.START - if message.text == '/start': - self.state = State.START - self.start_message(message) - if self.state == State.PRE_CHAT: - self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message) - self.state = State.START - if message.text == '/start': - self.state = State.START - self.start_message(message) + def get_database(self): + return self.BotDB - if self.state == State.CHAT: - if message.text == 'Выйти из чата': - self.state = State.START - self.end_message(message) - elif message.text == '/start': - self.state = State.START - self.start_message(message) - else: - self.resend_message_in_group_for_message(message) + def get_settings(self): + return self.settings - if self.state == State.ADMIN: - if message == '/admin' or message == '/restart' or message == 'Вернуться в админку': - access = self.check_access(message.from_user.id) - if access: - self.admin_panel(message) - else: - self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') - if message.text == '/start': - self.state = State.START - self.start_message(message) + def get_config(self): + return self.config - @self.bot.message_handler(func=lambda message: True, chat_types=['group']) - def handle_message(message): - """Функция ответа админа пользователю через закрытый чат""" - self.state = State.CHAT - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Выйти из чата") - markup.add(item1) - message_id = 0 - try: - message_id = message.reply_to_message.id - except AttributeError: - self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!') - message_from_admin = message.text - try: - chat_id = BotDB.get_user_by_message_id(message_id) - self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) - except TypeError: - self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.') - - # Админка - @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) - def post_for_group(call): - if call.data == 'publish' and call.message.content_type == 'text': - try: - self.bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text) - self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) - except Exception as e: - if LOGS: - self.bot.send_message(chat_id=IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - elif call.data == 'publish' and call.message.content_type == 'photo': - try: - self.bot.send_photo( - chat_id=MAIN_PUBLIC, - caption=call.message.caption, - photo=call.message.photo[-1].file_id, - ) - self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) - except Exception as e: - if LOGS: - self.bot.send_message(chat_id=IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - elif call.data == 'decline': - try: - self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) - except Exception as e: - if LOGS: - self.bot.send_message(IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - @self.bot.callback_query_handler(func=lambda call: True) - def pagination(call): - if call.data[:3] == 'ban': - user_id = call.data[4:] - self.ban_user(call.message, user_id) - if call.data == 'return': - self.bot.delete_message(call.message.chat.id, call.message.message_id) - self.admin_panel(call.message) - - if call.data[:5] == 'unban': - self.delete_user_blacklist(call.data[6:]) - msg = f'Успешно удалено.' - self.bot.send_message(chat_id=call.message.chat.id, text=msg) - elif call.data[:4] == 'page': - if call.message.text == 'Список пользователей которые последними обращались к боту': - list_users = BotDB.get_last_users_from_db() - keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(list_users), list_users, - 'ban') - self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, - reply_markup=keyboard) - if "Список заблокированных пользователей".lower() in call.message.text.lower(): - #Готовим сообщения - message_user = self.get_banned_users_list(int(call.data[5:]) * 7 - 7) - self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, - text=message_user) - - #Готовим клавиатуру - buttons = self.get_banned_users_buttons() - keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban') - self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, - reply_markup=keyboard) - - def start(self): - while True: - try: - self.bot.polling(none_stop=True) - except (ConnectionError, Exception): - print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}") - - def unban_notifier(self): - # Получение сегодняшней даты в формате DD-MM-YYYY - current_date = datetime.now() - print('Мы в функции unban_notifier') - today = current_date.strftime("%d-%m-%Y") - # Получение списка разблокированных пользователей - unblocked_users = BotDB.get_users_for_unblock_today(today) - message = "Разблокированные пользователи:\n" - for user_id, user_name in unblocked_users.items(): - message += f"ID: {user_id}, Имя: {user_name}\n" - - # Отправка сообщения в канал - self.bot.send_message(GROUP_FOR_MESSAGE, message) - - # Черный список - def admin_panel(self, message): - try: - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Бан (Список)") - #item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю - #item3 = types.KeyboardButton("Удалить админа") - item4 = types.KeyboardButton("Разбан (список)") - item5 = types.KeyboardButton("Вернуться в бота") - markup.add(item1, item4, item5) - self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", - reply_markup=markup) - self.bot.register_next_step_handler(message, self.handle_admin_message) - except Exception as e: - self.bot.register_next_step_handler(message, self.admin_panel) - - def handle_admin_message(self, message): - try: - if message.text == "Бан (Список)": - self.get_last_users(message) - elif message.text == "Разбан (список)": - self.get_banned_users(message) - elif message.text == "Вернуться в бота": - self.start_message(message) - except Exception as e: - self.bot.reply_to(message, f"Ошибка\n\n {e}") - self.admin_panel(message) - - def ban_user(self, message, user_id: int): - user_name = BotDB.get_username(user_id=user_id) - ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None} - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Спам") - item2 = types.KeyboardButton("Заебал стикерами") - markup.add(item1, item2) - self.bot.send_message(message.chat.id, - f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат", - reply_markup=markup) - self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object) - - def ban_user_step_2(self, message, ban_object: dict): - ban_object['message_for_user'] = message.text - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("1") - item2 = types.KeyboardButton("7") - item3 = types.KeyboardButton("30") - item4 = types.KeyboardButton("Навсегда") - markup.add(item1, item2, item3, item4) - self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " - f"его в чат", - reply_markup=markup) - self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object) - - def ban_user_step_3(self, message, ban_object: dict): - date_to_unban = None - if message.text != 'Навсегда': - date_to_unban = self.add_days_to_date(message.text) - else: - pass - ban_object['date_to_unban'] = date_to_unban - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Подтвердить") - item2 = types.KeyboardButton("Отменить") - markup.add(item1, item2) - self.bot.send_message(message.chat.id, - f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}", - parse_mode='html', - reply_markup=markup) - self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object) - - def ban_user_final_step(self, message, ban_object: dict): - if message.text == 'Подтвердить': - exists = BotDB.check_user_in_blacklist(ban_object['user_id']) - if exists: - self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.") - self.admin_panel(message) - else: - BotDB.set_user_blacklist(ban_object['user_id'], - ban_object['user_name'], - ban_object['message_for_user'], - ban_object['date_to_unban']) - self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.") - self.admin_panel(message) - - def get_last_users(self, message): - list_users = BotDB.get_last_users_from_db() - keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') - self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту", - reply_markup=keyboard) - - def get_banned_users(self, message): - message_text = self.get_banned_users_list(0) - buttons_list = self.get_banned_users_buttons() - if buttons_list: - k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban') - self.bot.send_message(message.chat.id, message_text, reply_markup=k) - else: - self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет") - self.admin_panel(message) - - def start_message(self, message): - try: - name_stick_hello = list(Path('Stick').rglob('Hello_*')) - random_stick_hello = open(random.choice(name_stick_hello), 'rb') - # logging - if LOGS: - self.bot.forward_message(chat_id=GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - self.bot.send_sticker(message.chat.id, random_stick_hello) - sleep(0.3) - except Exception as e: - print(f'{str(e)}') - if LOGS: - self.bot.send_message(IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - try: - user_id = message.from_user.id - first_name = message.from_user.first_name - full_name = message.from_user.full_name - is_bot = message.from_user.is_bot - username = message.from_user.username - language_code = message.from_user.language_code - current_date = datetime.now() - date = current_date.strftime("%Y-%m-%d %H:%M:%S") - if not BotDB.user_exists(user_id): - BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, - date) - BotDB.update_date_for_user(date, user_id) - markup = self.get_reply_keyboard(message) - hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE') - self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, - disable_web_page_preview=not PREVIEW_LINK) - except Exception as e: - print(f'{str(e)}') - if LOGS: - self.bot.send_message(IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - def resend_message_in_group_for_message(self, message): - self.bot.forward_message(chat_id=GROUP_FOR_MESSAGE, - from_chat_id=message.chat.id, - message_id=message.message_id - ) - current_date = datetime.now() - date = current_date.strftime("%Y-%m-%d %H:%M:%S") - BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) - question = messages.get_message(self.__get_first_name(message), 'QUESTION') - markup = self.get_reply_keyboard(message) - self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK, - reply_markup=markup) - - def suggest_post(self, message): - try: - markup = types.ReplyKeyboardRemove() - suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS') - self.bot.send_message(message.chat.id, suggest_news, parse_mode='html') - sleep(0.3) - suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2') - self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup) - except Exception as e: - self.bot.send_message(IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - # logging - if LOGS: - self.bot.forward_message(chat_id=GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - - def stickers(self, message): - BotDB.update_info_about_stickers(user_id=message.from_user.id) - markup = self.get_reply_keyboard(message) - try: - self.bot.forward_message(chat_id=GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - self.bot.send_message(message.chat.id, - text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', - reply_markup=markup) - except ApiTelegramException as e: - self.bot.send_message(chat_id=IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - def connect_with_admin(self, message): - connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN') - self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html") - # logging - if LOGS: - self.bot.forward_message(chat_id=GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - - def end_message(self, message): - try: - name_stick_bye = list(Path('Stick').rglob('Universal_*')) - random_stick_bye = open(random.choice(name_stick_bye), 'rb') - self.bot.send_sticker(message.chat.id, random_stick_bye) - except ApiTelegramException as e: - if LOGS: - self.bot.send_message(chat_id=IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - markup = types.ReplyKeyboardRemove() - try: - bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE') - self.bot.send_message(message.chat.id, bye_message, - parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK) - except Exception as e: - if LOGS: - self.bot.send_message(chat_id=IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - if LOGS: - # logging - self.bot.forward_message(chat_id=GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - - def send_to_suggest(self, message): - markup = types.InlineKeyboardMarkup(row_width=1) - item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish') - item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline') - markup.add(item1, item2) - try: - if message.content_type == 'text': - post_text = message.text.lower() - if post_text.find('неанон') != -1 or post_text.find('не анон') != -1: - self.bot.send_message( - # TODO: GROUP_FOR_POST - chat_id=GROUP_FOR_POST, - text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - reply_markup=markup - ) - elif post_text.find('анон') != -1: - self.bot.send_message( - # TODO: GROUP_FOR_POST - chat_id=GROUP_FOR_POST, - text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно', - reply_markup=markup - ) - else: - self.bot.send_message( - # TODO: GROUP_FOR_POST - chat_id=GROUP_FOR_POST, - text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - reply_markup=markup - ) - elif message.content_type == 'photo' and message.media_group_id is None: - post_text_for_photo = message.caption.lower() - if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1: - self.bot.send_photo( - # TODO: GROUP_FOR_POST - chat_id=GROUP_FOR_POST, - caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - photo=message.photo[-1].file_id, - reply_markup=markup - ) - elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1: - self.bot.send_photo( - # TODO: GROUP_FOR_POST - chat_id=GROUP_FOR_POST, - caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно', - photo=message.photo[-1].file_id, - reply_markup=markup - ) - else: - self.bot.send_photo( - # TODO: GROUP_FOR_POST - chat_id=GROUP_FOR_POST, - caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - photo=message.photo[-1].file_id, - reply_markup=markup - ) - # TODO: Не понятна реализация с альбомами от слова совсем - # elif message.content_type == 'photo' and message.media_group_id != None: - # bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id ) - else: - pass - except Exception as e: - if LOGS: - self.bot.send_message(chat_id=IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - markup_for_user = self.get_reply_keyboard(message) - success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') - self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', - disable_web_page_preview=not PREVIEW_LINK, reply_markup=markup_for_user) - - @staticmethod - def get_reply_keyboard(message): - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("📢Предложить свой пост") - item2 = types.KeyboardButton("📩Связаться с админами") - item3 = types.KeyboardButton("👋🏼Сказать пока!") - #TODO: Есть ощущение что не совсем так работает как надо - item4 = types.KeyboardButton("🤪Хочу стикеры") if not BotDB.get_info_about_stickers( - user_id=message.from_user.id) else None - - if item4: - markup.add(item1, item2, item3, item4) - else: - markup.add(item1, item2, item3) - - return markup - - @staticmethod - def __get_first_name(message): - return message.from_user.first_name - - @staticmethod - def check_access(user_id: int): - """Проверка прав на совершение действий""" - return BotDB.is_admin(user_id) - - @staticmethod - def add_days_to_date(days): - """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" - current_date = datetime.now() - future_date = current_date + timedelta(days=int(days)) - formatted_date = future_date.strftime("%d-%m-%Y") - return formatted_date - - @staticmethod - def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str): - """ - Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback - - Args: - page: Номер текущей страницы. - total_items: Общее количество элементов. - array_items: Лист кортежей. Содержит в себе user_name: user_id - callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) - - Returns: - InlineKeyboardMarkup: Клавиатура с кнопками пагинации. - """ - - # Определяем общее количество страниц - total_pages = (total_items + 7 - 1) // 7 - - page = page - # Создаем список кнопок - buttons = [] - # Вычисляем стартовый номер для текущей страницы - start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы - # Кнопки с номерами страниц - for i in range(start_index, min(start_index + 7, - len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы - buttons.append( - types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}")) - - # Добавляем кнопки "Предыдущая" и "Следующая" - if int(page) > 1: - buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}")) - if page < total_pages: - buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}")) - #Добавляем кнопку назад - buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return")) - # Формируем клавиатуру с 3 кнопками в ряд - keyboard = [] - for i in range(0, len(buttons), 3): - keyboard.append(buttons[i:i + 3]) - return types.InlineKeyboardMarkup(keyboard) - - @staticmethod - def get_banned_users_list(offset: int): - """ - Возвращает сообщение со списком пользователей и словарь с ником + идентификатором - - Args: - offset: отступ для запроса в базу данных - - Returns: - message - текст сообщения - user_ids - лист кортежей [(user_name: user_id)] - """ - users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset) - message = "Список заблокированных пользователей:\n" - - for user in users: - message += f"Пользователь: {user[0]}\n" - message += f"Причина бана: {user[2]}\n" - message += f"Дата разбана: {user[3]}\n\n" - return message - - @staticmethod - def get_banned_users_buttons(): - """ - Возвращает сообщение со списком пользователей и словарь с ником + идентификатором - - Args: - offset: отступ для запроса в базу данных - - Returns: - message - текст сообщения - user_ids - лист кортежей [(user_name: user_id)] - """ - users = BotDB.get_banned_users_from_db() - user_ids = [] - - for user in users: - user_ids.append((user[0], user[1])) - return user_ids - - @staticmethod - def delete_user_blacklist(user_id): - return BotDB.delete_user_blacklist(user_id=user_id) - - -bot = TelegramHelperBot(BOT_TOKEN) if __name__ == "__main__": - # Запускаем бота + # Запускаем тг бота + bot = TelegramHelperBot(DependencyFactory()) + logger.info("Bot Started") bot.start() #scheduler = BackgroundScheduler() #scheduler.add_job(bot.unban_notifier(), 'cron', hour=22, minute=9) #scheduler.start() -