from datetime import datetime, timedelta import random import traceback from enum import Enum from pathlib import Path from time import sleep import telebot from telebot import types from telebot.apihelper import ApiTelegramException import messages class State(Enum): START = "START" SUGGEST = "SUGGEST" ADMIN = "ADMIN" CHAT = "CHAT" PRE_CHAT = "PRE_CHAT" class TelegramHelperBot: def __init__(self, dependency_factory): self.state = State.START self.BotDB = dependency_factory.get_database() self.settings = dependency_factory.get_settings() token = self.settings['Telegram']['bot_token'] self.GROUP_FOR_POST = self.settings['Telegram']['group_for_posts'] self.GROUP_FOR_MESSAGE = self.settings['Telegram']['group_for_message'] self.MAIN_PUBLIC = self.settings['Telegram']['main_public'] self.GROUP_FOR_LOGS = self.settings['Telegram']['group_for_logs'] self.IMPORTANT_LOGS = self.settings['Telegram']['important_logs'] self.PREVIEW_LINK = self.settings['Telegram']['preview_link'] self.LOGS = self.settings['Settings']['logs'] self.TEST = self.settings['Settings']['test'] self.bot = telebot.TeleBot(token) # Router for user @self.bot.message_handler(func=lambda message: True, chat_types=['private']) def handle_message(message): if self.BotDB.check_user_in_blacklist(message.from_user.id): attribute = self.BotDB.get_blacklist_users_by_id(message.from_user.id) self.bot.send_message(message.chat.id, f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}', parse_mode='HTML') return if self.state == State.START: if message.text == '/start': self.start_message(message) elif message.text == '📢Предложить свой пост': self.suggest_post(message) self.state = State.SUGGEST elif message.text == '🤪Хочу стикеры': self.stickers(message) self.state = State.START elif message.text == '📩Связаться с админами': self.connect_with_admin(message) self.state = State.PRE_CHAT elif message.text == '👋🏼Сказать пока!': self.end_message(message) self.state = State.START elif message.text == 'Выйти из чата': self.end_message(message) elif message.text == '/admin': access = self.check_access(message.from_user.id) if access: self.admin_panel(message) self.state = State.ADMIN else: self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') elif message.text == '/state': self.bot.send_message(message.chat.id, f'Твой state == {self.state.value}') else: self.bot.send_message(message.chat.id, #TODO: Здесь раньше был /state "Не понимаю где ты находишься. Нажми /start, и я перезапущусь") if self.state == State.SUGGEST: self.bot.register_next_step_handler(message, self.send_to_suggest) self.state = State.START if message.text == '/start': self.state = State.START self.start_message(message) if self.state == State.PRE_CHAT: self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message) self.state = State.START if message.text == '/start': self.state = State.START self.start_message(message) if self.state == State.CHAT: if message.text == 'Выйти из чата': self.state = State.START self.end_message(message) elif message.text == '/start': self.state = State.START self.start_message(message) else: self.resend_message_in_group_for_message(message) if self.state == State.ADMIN: if message == '/admin' or message == '/restart' or message == 'Вернуться в админку': access = self.check_access(message.from_user.id) if access: self.admin_panel(message) else: self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') if message.text == '/start': self.state = State.START self.start_message(message) @self.bot.message_handler(func=lambda message: True, chat_types=['group']) def handle_message(message): """Функция ответа админа пользователю через закрытый чат""" self.state = State.CHAT markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Выйти из чата") markup.add(item1) message_id = 0 try: message_id = message.reply_to_message.id except AttributeError: self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!') message_from_admin = message.text try: chat_id = self.BotDB.get_user_by_message_id(message_id) self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) except TypeError: self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.') # Админка @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) def post_for_group(call): if call.data == 'publish' and call.message.content_type == 'text': try: self.bot.send_message(chat_id=self.MAIN_PUBLIC, text=call.message.text) self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e: if self.LOGS: self.bot.send_message(chat_id=self.IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") elif call.data == 'publish' and call.message.content_type == 'photo': try: self.bot.send_photo( chat_id=self.MAIN_PUBLIC, caption=call.message.caption, photo=call.message.photo[-1].file_id, ) self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e: if self.LOGS: self.bot.send_message(chat_id=self.IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") elif call.data == 'decline': try: self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e: if self.LOGS: self.bot.send_message(self.IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @self.bot.callback_query_handler(func=lambda call: True) def pagination(call): if call.data[:3] == 'ban': user_id = call.data[4:] self.ban_user(call.message, user_id) if call.data == 'return': self.bot.delete_message(call.message.chat.id, call.message.message_id) self.admin_panel(call.message) if call.data[:5] == 'unban': self.delete_user_blacklist(call.data[6:]) msg = f'Успешно удалено.' self.bot.send_message(chat_id=call.message.chat.id, text=msg) elif call.data[:4] == 'page': if call.message.text == 'Список пользователей которые последними обращались к боту': list_users = self.BotDB.get_last_users_from_db() keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(list_users), list_users, 'ban') self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=keyboard) if "Список заблокированных пользователей".lower() in call.message.text.lower(): #Готовим сообщения message_user = self.get_banned_users_list(int(call.data[5:]) * 7 - 7) self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, text=message_user) #Готовим клавиатуру buttons = self.get_banned_users_buttons() keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban') self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=keyboard) def start(self): while True: try: self.bot.polling(none_stop=True) except (ConnectionError, Exception): print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}") def unban_notifier(self): # Получение сегодняшней даты в формате DD-MM-YYYY current_date = datetime.now() print('Мы в функции unban_notifier') today = current_date.strftime("%d-%m-%Y") # Получение списка разблокированных пользователей unblocked_users = self.BotDB.get_users_for_unblock_today(today) message = "Разблокированные пользователи:\n" for user_id, user_name in unblocked_users.items(): message += f"ID: {user_id}, Имя: {user_name}\n" # Отправка сообщения в канал self.bot.send_message(self.GROUP_FOR_MESSAGE, message) # Черный список def admin_panel(self, message): try: markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Бан (Список)") #item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю #item3 = types.KeyboardButton("Удалить админа") item4 = types.KeyboardButton("Разбан (список)") item5 = types.KeyboardButton("Вернуться в бота") markup.add(item1, item4, item5) self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", reply_markup=markup) self.bot.register_next_step_handler(message, self.handle_admin_message) except Exception as e: self.bot.register_next_step_handler(message, self.admin_panel) def handle_admin_message(self, message): try: if message.text == "Бан (Список)": self.get_last_users(message) elif message.text == "Разбан (список)": self.get_banned_users(message) elif message.text == "Вернуться в бота": self.start_message(message) except Exception as e: self.bot.reply_to(message, f"Ошибка\n\n {e}") self.admin_panel(message) def ban_user(self, message, user_id: int): user_name = self.BotDB.get_username(user_id=user_id) ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None} markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Спам") item2 = types.KeyboardButton("Заебал стикерами") markup.add(item1, item2) self.bot.send_message(message.chat.id, f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат", reply_markup=markup) self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object) def ban_user_step_2(self, message, ban_object: dict): ban_object['message_for_user'] = message.text markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("1") item2 = types.KeyboardButton("7") item3 = types.KeyboardButton("30") item4 = types.KeyboardButton("Навсегда") markup.add(item1, item2, item3, item4) self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " f"его в чат", reply_markup=markup) self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object) def ban_user_step_3(self, message, ban_object: dict): date_to_unban = None if message.text != 'Навсегда': date_to_unban = self.add_days_to_date(message.text) else: pass ban_object['date_to_unban'] = date_to_unban markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Подтвердить") item2 = types.KeyboardButton("Отменить") markup.add(item1, item2) self.bot.send_message(message.chat.id, f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}", parse_mode='html', reply_markup=markup) self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object) def ban_user_final_step(self, message, ban_object: dict): if message.text == 'Подтвердить': exists = self.BotDB.check_user_in_blacklist(ban_object['user_id']) if exists: self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.") self.admin_panel(message) else: self.BotDB.set_user_blacklist(ban_object['user_id'], ban_object['user_name'], ban_object['message_for_user'], ban_object['date_to_unban']) self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.") self.admin_panel(message) def get_last_users(self, message): list_users = self.BotDB.get_last_users_from_db() keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту", reply_markup=keyboard) def get_banned_users(self, message): message_text = self.get_banned_users_list(0) buttons_list = self.get_banned_users_buttons() if buttons_list: k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban') self.bot.send_message(message.chat.id, message_text, reply_markup=k) else: self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет") self.admin_panel(message) def start_message(self, message): try: name_stick_hello = list(Path('Stick').rglob('Hello_*')) random_stick_hello = open(random.choice(name_stick_hello), 'rb') # logging if self.LOGS: self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) self.bot.send_sticker(message.chat.id, random_stick_hello) sleep(0.3) except Exception as e: print(f'{str(e)}') if self.LOGS: self.bot.send_message(self.IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") try: user_id = message.from_user.id first_name = message.from_user.first_name full_name = message.from_user.full_name is_bot = message.from_user.is_bot username = message.from_user.username language_code = message.from_user.language_code current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") if not self.BotDB.user_exists(user_id): self.BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, date) self.BotDB.update_date_for_user(date, user_id) markup = self.get_reply_keyboard(message) hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE') self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not self.PREVIEW_LINK) except Exception as e: print(f'{str(e)}') if self.LOGS: self.bot.send_message(self.IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") def resend_message_in_group_for_message(self, message): self.bot.forward_message(chat_id=self.GROUP_FOR_MESSAGE, from_chat_id=message.chat.id, message_id=message.message_id ) current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") self.BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) question = messages.get_message(self.__get_first_name(message), 'QUESTION') markup = self.get_reply_keyboard(message) self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup) def suggest_post(self, message): try: markup = types.ReplyKeyboardRemove() suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS') self.bot.send_message(message.chat.id, suggest_news, parse_mode='html') sleep(0.3) suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2') self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup) except Exception as e: self.bot.send_message(self.IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") # logging if self.LOGS: self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) def stickers(self, message): self.BotDB.update_info_about_stickers(user_id=message.from_user.id) markup = self.get_reply_keyboard(message) try: self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) self.bot.send_message(message.chat.id, text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', reply_markup=markup) except ApiTelegramException as e: self.bot.send_message(chat_id=self.IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") def connect_with_admin(self, message): connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN') self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html") # logging if self.LOGS: self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) def end_message(self, message): try: name_stick_bye = list(Path('Stick').rglob('Universal_*')) random_stick_bye = open(random.choice(name_stick_bye), 'rb') self.bot.send_sticker(message.chat.id, random_stick_bye) except ApiTelegramException as e: if self.LOGS: self.bot.send_message(chat_id=self.IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") markup = types.ReplyKeyboardRemove() try: bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE') self.bot.send_message(message.chat.id, bye_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not self.PREVIEW_LINK) except Exception as e: if self.LOGS: self.bot.send_message(chat_id=self.IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") if self.LOGS: # logging self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) def send_to_suggest(self, message): markup = types.InlineKeyboardMarkup(row_width=1) item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish') item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline') markup.add(item1, item2) try: if message.content_type == 'text': post_text = message.text.lower() if post_text.find('неанон') != -1 or post_text.find('не анон') != -1: self.bot.send_message( # TODO: GROUP_FOR_POST chat_id=self.GROUP_FOR_POST, text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', reply_markup=markup ) elif post_text.find('анон') != -1: self.bot.send_message( # TODO: GROUP_FOR_POST chat_id=self.GROUP_FOR_POST, text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно', reply_markup=markup ) else: self.bot.send_message( # TODO: GROUP_FOR_POST chat_id=self.GROUP_FOR_POST, text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', reply_markup=markup ) elif message.content_type == 'photo' and message.media_group_id is None: post_text_for_photo = message.caption.lower() if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1: self.bot.send_photo( # TODO: GROUP_FOR_POST chat_id=self.GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', photo=message.photo[-1].file_id, reply_markup=markup ) elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1: self.bot.send_photo( # TODO: GROUP_FOR_POST chat_id=self.GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно', photo=message.photo[-1].file_id, reply_markup=markup ) else: self.bot.send_photo( # TODO: GROUP_FOR_POST chat_id=self.GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', photo=message.photo[-1].file_id, reply_markup=markup ) # TODO: Не понятна реализация с альбомами от слова совсем # elif message.content_type == 'photo' and message.media_group_id != None: # bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id ) else: pass except Exception as e: if self.LOGS: self.bot.send_message(chat_id=self.IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") markup_for_user = self.get_reply_keyboard(message) success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user) def get_reply_keyboard(self, message): markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("📢Предложить свой пост") item2 = types.KeyboardButton("📩Связаться с админами") item3 = types.KeyboardButton("👋🏼Сказать пока!") #TODO: Есть ощущение что не совсем так работает как надо item4 = types.KeyboardButton("🤪Хочу стикеры") if not self.BotDB.get_info_about_stickers( user_id=message.from_user.id) else None if item4: markup.add(item1, item2, item3, item4) else: markup.add(item1, item2, item3) return markup @staticmethod def __get_first_name(message): return message.from_user.first_name def check_access(self, user_id: int): """Проверка прав на совершение действий""" return self.BotDB.is_admin(user_id) @staticmethod def add_days_to_date(days): """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" current_date = datetime.now() future_date = current_date + timedelta(days=int(days)) formatted_date = future_date.strftime("%d-%m-%Y") return formatted_date @staticmethod def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str): """ Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback Args: page: Номер текущей страницы. total_items: Общее количество элементов. array_items: Лист кортежей. Содержит в себе user_name: user_id callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) Returns: InlineKeyboardMarkup: Клавиатура с кнопками пагинации. """ # Определяем общее количество страниц total_pages = (total_items + 7 - 1) // 7 page = page # Создаем список кнопок buttons = [] # Вычисляем стартовый номер для текущей страницы start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы # Кнопки с номерами страниц for i in range(start_index, min(start_index + 7, len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы buttons.append( types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}")) # Добавляем кнопки "Предыдущая" и "Следующая" if int(page) > 1: buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}")) if page < total_pages: buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}")) #Добавляем кнопку назад buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return")) # Формируем клавиатуру с 3 кнопками в ряд keyboard = [] for i in range(0, len(buttons), 3): keyboard.append(buttons[i:i + 3]) return types.InlineKeyboardMarkup(keyboard) def get_banned_users_list(self, offset: int): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: offset: отступ для запроса в базу данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = self.BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset) message = "Список заблокированных пользователей:\n" for user in users: message += f"Пользователь: {user[0]}\n" message += f"Причина бана: {user[2]}\n" message += f"Дата разбана: {user[3]}\n\n" return message def get_banned_users_buttons(self): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: offset: отступ для запроса в базу данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = self.BotDB.get_banned_users_from_db() user_ids = [] for user in users: user_ids.append((user[0], user[1])) return user_ids def delete_user_blacklist(self, user_id): return self.BotDB.delete_user_blacklist(user_id=user_id)