import configparser import os import sys from pathlib import Path from time import sleep from enum import Enum from typing import Any from apscheduler.schedulers.background import BackgroundScheduler import db from db import BotDB import telebot import random from datetime import datetime, timedelta import time from telebot import types from telebot.apihelper import ApiTelegramException import messages import traceback # Настройки config_path = os.path.join(sys.path[0], 'settings.ini') config = configparser.ConfigParser() config.read(config_path) # TELEGRAM BOT_TOKEN = config.get('Telegram', 'BOT_TOKEN') GROUP_FOR_POST = config.get('Telegram', 'group_for_posts') GROUP_FOR_MESSAGE = config.get('Telegram', 'group_for_message') MAIN_PUBLIC = config.get('Telegram', 'main_public') GROUP_FOR_LOGS = config.get('Telegram', 'group_for_logs') IMPORTANT_LOGS = config.get('Telegram', 'important_logs') PREVIEW_LINK = config.getboolean('Telegram', 'PREVIEW_LINK') # SETTINGS LOGS = config.getboolean('Settings', 'logs') TEST = config.getboolean('Settings', 'test') # Инициализируем бота и базку BotDB = BotDB() class State(Enum): START = "START" SUGGEST = "SUGGEST" ADMIN = "ADMIN" CHAT = "CHAT" PRE_CHAT = "PRE_CHAT" class TelegramHelperBot: def __init__(self, token): self.bot = telebot.TeleBot(token) self.state = State.START # Router for user @self.bot.message_handler(func=lambda message: True, chat_types=['private']) def handle_message(message): if BotDB.check_user_in_blacklist(message.from_user.id): attribute = BotDB.get_blacklist_users_by_id(message.from_user.id) self.bot.send_message(message.chat.id, f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}', parse_mode='HTML') return if self.state == State.START: if message.text == '/start': self.start_message(message) elif message.text == '📢Предложить свой пост': self.suggest_post(message) self.state = State.SUGGEST elif message.text == '🤪Хочу стикеры': self.stickers(message) self.state = State.START elif message.text == '📩Связаться с админами': self.connect_with_admin(message) self.state = State.PRE_CHAT elif message.text == '👋🏼Сказать пока!': self.end_message(message) self.state = State.START elif message.text == 'Выйти из чата': self.end_message(message) elif message.text == '/admin': access = self.check_access(message.from_user.id) if access: self.admin_panel(message) self.state = State.ADMIN else: self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') elif message.text == '/state': self.bot.send_message(message.chat.id, f'Твой state == {self.state.value}') else: self.bot.send_message(message.chat.id, "Не понимаю где ты находишься. Нажми /state, и я расскажу что ты можешь " "сделать") if self.state == State.SUGGEST: self.bot.register_next_step_handler(message, self.send_to_suggest) self.state = State.START if message.text == '/start': self.state = State.START self.start_message(message) if self.state == State.PRE_CHAT: self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message) self.state = State.START if message.text == '/start': self.state = State.START self.start_message(message) if self.state == State.CHAT: if message.text == 'Выйти из чата': self.state = State.START self.end_message(message) elif message.text == '/start': self.state = State.START self.start_message(message) else: self.resend_message_in_group_for_message(message) if self.state == State.ADMIN: if message == '/admin' or message == '/restart' or message == 'Вернуться в админку': access = self.check_access(message.from_user.id) if access: self.admin_panel(message) else: self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') if message.text == '/start': self.state = State.START self.start_message(message) @self.bot.message_handler(func=lambda message: True, chat_types=['group']) def handle_message(message): """Функция ответа админа пользователю через закрытый чат""" self.state = State.CHAT markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Выйти из чата") markup.add(item1) message_id = message.reply_to_message.id message_from_admin = message.text chat_id = BotDB.get_user_by_message_id(message_id) self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) # Админка @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) def post_for_group(call): if call.data == 'publish' and call.message.content_type == 'text': try: self.bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text) self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e: if LOGS: self.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") elif call.data == 'publish' and call.message.content_type == 'photo': try: self.bot.send_photo( chat_id=MAIN_PUBLIC, caption=call.message.caption, photo=call.message.photo[-1].file_id, ) self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e: if LOGS: self.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") elif call.data == 'decline': try: self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except Exception as e: if LOGS: self.bot.send_message(IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") @self.bot.callback_query_handler(func=lambda call: True) def pagination(call): if call.data[:3] == 'ban': user_id = call.data[4:] self.ban_user(call.message, user_id) if call.data == 'return': self.bot.delete_message(call.message.chat.id, call.message.message_id) self.admin_panel(call.message) if call.data[:5] == 'unban': self.delete_user_blacklist(call.data[6:]) msg = f'Успешно удалено.' self.bot.send_message(chat_id=call.message.chat.id, text=msg) elif call.data[:4] == 'page': if call.message.text == 'Список пользователей которые последними обращались к боту': list_users = BotDB.get_last_users_from_db() keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(list_users), list_users, 'ban') self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=keyboard) if "Список заблокированных пользователей".lower() in call.message.text.lower(): #Готовим сообщения message_user = self.get_banned_users_list(int(call.data[5:]) * 7 - 7) self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, text=message_user) #Готовим клавиатуру buttons = self.get_banned_users_buttons() keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban') self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, reply_markup=keyboard) def start(self): while True: try: self.bot.polling(none_stop=True) except (ConnectionError, Exception): print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}") def unban_notifier(self): # Получение сегодняшней даты в формате DD-MM-YYYY current_date = datetime.now() today = current_date.strftime("%d-%m-%Y") # Получение списка разблокированных пользователей unblocked_users = BotDB.get_users_for_unblock_today(today) message = "Разблокированные пользователи:\n" for user_id, user_name in unblocked_users.items(): message += f"ID: {user_id}, Имя: {user_name}\n" # Отправка сообщения в канал self.bot.send_message(GROUP_FOR_MESSAGE, message) # Черный список def admin_panel(self, message): try: markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Бан (Список)") #item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю #item3 = types.KeyboardButton("Удалить админа") item4 = types.KeyboardButton("Разбан (список)") markup.add(item1, item4) self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", reply_markup=markup) self.bot.register_next_step_handler(message, self.handle_admin_message) except Exception as e: self.bot.register_next_step_handler(message, self.admin_panel) def handle_admin_message(self, message): try: if message.text == "Бан (Список)": self.get_last_users(message) elif message.text == "Разбан (список)": self.get_banned_users(message) except Exception as e: self.bot.reply_to(message, f"Ошибка\n\n {e}") self.admin_panel(message) def ban_user(self, message, user_id: int): user_name = BotDB.get_username(user_id=user_id) ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None} markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Спам") item2 = types.KeyboardButton("Заебал стикерами") markup.add(item1, item2) self.bot.send_message(message.chat.id, f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат", reply_markup=markup) self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object) def ban_user_step_2(self, message, ban_object: dict): ban_object['message_for_user'] = message.text markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("1") item2 = types.KeyboardButton("7") item3 = types.KeyboardButton("30") item4 = types.KeyboardButton("Навсегда") markup.add(item1, item2, item3, item4) self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " f"его в чат", reply_markup=markup) self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object) def ban_user_step_3(self, message, ban_object: dict): date_to_unban = None if message.text != 'Навсегда': date_to_unban = self.add_days_to_date(message.text) else: pass ban_object['date_to_unban'] = date_to_unban markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Подтвердить") item2 = types.KeyboardButton("Отменить") markup.add(item1, item2) self.bot.send_message(message.chat.id, f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}", parse_mode='html', reply_markup=markup) self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object) def ban_user_final_step(self, message, ban_object: dict): if message.text == 'Подтвердить': exists = BotDB.check_user_in_blacklist(ban_object['user_id']) if exists: self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.") self.admin_panel(message) else: BotDB.set_user_blacklist(ban_object['user_id'], ban_object['user_name'], ban_object['message_for_user'], ban_object['date_to_unban']) self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.") self.admin_panel(message) def get_last_users(self, message): list_users = BotDB.get_last_users_from_db() keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту", reply_markup=keyboard) def get_banned_users(self, message): message_text = self.get_banned_users_list(0) buttons_list = self.get_banned_users_buttons() if buttons_list: k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban') self.bot.send_message(message.chat.id, message_text, reply_markup=k) else: self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет") self.admin_panel(message) def start_message(self, message): try: name_stick_hello = list(Path('Stick').rglob('Hello_*')) random_stick_hello = open(random.choice(name_stick_hello), 'rb') # logging if LOGS: self.bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) self.bot.send_sticker(message.chat.id, random_stick_hello) sleep(0.3) except Exception as e: print(f'{str(e)}') if LOGS: self.bot.send_message(IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") try: user_id = message.from_user.id first_name = message.from_user.first_name full_name = message.from_user.full_name is_bot = message.from_user.is_bot username = message.from_user.username language_code = message.from_user.language_code current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") if not BotDB.user_exists(user_id): BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, date) BotDB.update_date_for_user(date, user_id) markup = self.get_reply_keyboard(message) hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE') self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK) except Exception as e: print(f'{str(e)}') if LOGS: self.bot.send_message(IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") def resend_message_in_group_for_message(self, message): self.bot.forward_message(chat_id=GROUP_FOR_MESSAGE, from_chat_id=message.chat.id, message_id=message.message_id ) current_date = datetime.now() date = current_date.strftime("%Y-%m-%d %H:%M:%S") BotDB.add_new_message_in_db(message.text, message.message_id + 1, message.from_user.id, date, 0) question = messages.get_message(self.__get_first_name(message), 'QUESTION') markup = self.get_reply_keyboard(message) self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK, reply_markup=markup) def suggest_post(self, message): try: markup = types.ReplyKeyboardRemove() suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS') self.bot.send_message(message.chat.id, suggest_news, parse_mode='html') sleep(0.3) suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2') self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup) except Exception as e: self.bot.send_message(IMPORTANT_LOGS, f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") # logging if LOGS: self.bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) def stickers(self, message): BotDB.update_info_about_stickers(user_id=message.from_user.id) markup = self.get_reply_keyboard(message) try: self.bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) self.bot.send_message(message.chat.id, text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', reply_markup=markup) except ApiTelegramException as e: self.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") def connect_with_admin(self, message): connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN') self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html") # logging if LOGS: self.bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) def end_message(self, message): try: name_stick_bye = list(Path('Stick').rglob('Universal_*')) random_stick_bye = open(random.choice(name_stick_bye), 'rb') self.bot.send_sticker(message.chat.id, random_stick_bye) except ApiTelegramException as e: if LOGS: self.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") markup = types.ReplyKeyboardRemove() try: bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE') self.bot.send_message(message.chat.id, bye_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK) except Exception as e: if LOGS: self.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") if LOGS: # logging self.bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) def send_to_suggest(self, message): markup = types.InlineKeyboardMarkup(row_width=1) item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish') item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline') markup.add(item1, item2) try: if message.content_type == 'text': post_text = message.text.lower() if post_text.find('неанон') != -1 or post_text.find('не анон') != -1: self.bot.send_message( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', reply_markup=markup ) elif post_text.find('анон') != -1: self.bot.send_message( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно', reply_markup=markup ) else: self.bot.send_message( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', reply_markup=markup ) elif message.content_type == 'photo' and message.media_group_id is None: post_text_for_photo = message.caption.lower() if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1: self.bot.send_photo( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', photo=message.photo[-1].file_id, reply_markup=markup ) elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1: self.bot.send_photo( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно', photo=message.photo[-1].file_id, reply_markup=markup ) else: self.bot.send_photo( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', photo=message.photo[-1].file_id, reply_markup=markup ) # TODO: Не понятна реализация с альбомами от слова совсем # elif message.content_type == 'photo' and message.media_group_id != None: # bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id ) else: pass except Exception as e: if LOGS: self.bot.send_message(chat_id=IMPORTANT_LOGS, text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") markup_for_user = self.get_reply_keyboard(message) success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK, reply_markup=markup_for_user) @staticmethod def get_reply_keyboard(message): markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("📢Предложить свой пост") item2 = types.KeyboardButton("📩Связаться с админами") item3 = types.KeyboardButton("👋🏼Сказать пока!") #TODO: Есть ощущение что не совсем так работает как надо item4 = types.KeyboardButton("🤪Хочу стикеры") if not BotDB.get_info_about_stickers( user_id=message.from_user.id) else None if item4: markup.add(item1, item2, item3, item4) else: markup.add(item1, item2, item3) return markup @staticmethod def __get_first_name(message): return message.from_user.first_name @staticmethod def check_access(user_id: int): """Проверка прав на совершение действий""" return BotDB.is_admin(user_id) @staticmethod def add_days_to_date(days): """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" current_date = datetime.now() future_date = current_date + timedelta(days=int(days)) formatted_date = future_date.strftime("%d-%m-%Y") return formatted_date @staticmethod def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[Any, Any]], callback: str): """ Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback Args: page: Номер текущей страницы. total_items: Общее количество элементов. array_items: Лист кортежей. Содержит в себе user_name: user_id callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) Returns: InlineKeyboardMarkup: Клавиатура с кнопками пагинации. """ # Определяем общее количество страниц total_pages = (total_items + 7 - 1) // 7 page = page # Создаем список кнопок buttons = [] # Вычисляем стартовый номер для текущей страницы start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы # Кнопки с номерами страниц for i in range(start_index, min(start_index + 7, len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы buttons.append( types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}")) # Добавляем кнопки "Предыдущая" и "Следующая" if int(page) > 1: buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}")) if page < total_pages: buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}")) #Добавляем кнопку назад buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return")) # Формируем клавиатуру с 3 кнопками в ряд keyboard = [] for i in range(0, len(buttons), 3): keyboard.append(buttons[i:i + 3]) return types.InlineKeyboardMarkup(keyboard) @staticmethod def get_banned_users_list(offset: int): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: offset: отступ для запроса в базу данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset) message = "Список заблокированных пользователей:\n" for user in users: message += f"Пользователь: {user[0]}\n" message += f"Причина бана: {user[2]}\n" message += f"Дата разбана: {user[3]}\n\n" return message @staticmethod def get_banned_users_buttons(): """ Возвращает сообщение со списком пользователей и словарь с ником + идентификатором Args: offset: отступ для запроса в базу данных Returns: message - текст сообщения user_ids - лист кортежей [(user_name: user_id)] """ users = BotDB.get_banned_users_from_db() user_ids = [] for user in users: user_ids.append((user[0], user[1])) return user_ids @staticmethod def delete_user_blacklist(user_id): return BotDB.delete_user_blacklist(user_id=user_id) bot = TelegramHelperBot(BOT_TOKEN) if __name__ == "__main__": # Запускаем бота bot.start() scheduler = BackgroundScheduler() scheduler.add_job(bot.unban_notifier(), 'cron', hour=0, minute=0) scheduler.start()