import configparser import os import sys import sqlite3 from pathlib import Path from time import sleep import telebot import random from telebot import types from telebot.apihelper import ApiTelegramException #Настройки config_path = os.path.join(sys.path[0], 'settings.ini') config = configparser.ConfigParser() config.read(config_path) #TELEGRAM BOT_TOKEN = config.get('Telegram', 'BOT_TOKEN') GROUP_FOR_POST = config.get('Telegram', 'group_for_posts') GROUP_FOR_MESSAGE = config.get('Telegram', 'group_for_message') MAIN_PUBLIC = config.get('Telegram', 'main_public') GROUP_FOR_LOGS = config.get('Telegram', 'group_for_logs') IMPORTANT_LOGS = config.get('Telegram', 'important_logs') PREVIEW_LINK = config.getboolean('Telegram', 'PREVIEW_LINK') #SETTINGS LOGS = config.getboolean('Settings', 'logs') #Инициализируем бота bot = telebot.TeleBot(BOT_TOKEN, parse_mode=None) def select_message_from_db(type: str, username): """Функция для запроса к базе данных и получения сообщений для бота. В аргументы передаются: type - тип получаемой обратной связи, строковое значение, сохраненное в БД username - имя пользователя """ # Подключаемся к базе conn = sqlite3.connect('tg-bot-database', check_same_thread=False) try: cursor = conn.cursor() cursor.execute(f"SELECT * FROM messages WHERE type=?", (type,)) #Забираем данные из таблицы, преобразуем в строку, заменяем поле username на имя пользователя, #и вместо амберсанда подставляем перенос строки response_from_database = str(cursor.fetchone()[1]).replace('username', username).replace('&', '\n') return response_from_database except sqlite3.Error as error: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=f'Ошибка при работе с SQLite, {error}') finally: if conn: conn.close() def error_message_from_db(id: int): """Функция для запроса к базе данных и получения сообщений ошибки. В аргументы передаются: id - идентификатор ошибки """ # Подключаемся к базе conn = sqlite3.connect('tg-bot-database', check_same_thread=False) try: cursor = conn.cursor() cursor.execute(f"SELECT * FROM error_messages WHERE id=?", (id,)) response_from_database = str(cursor.fetchone()[1]) return response_from_database except sqlite3.Error as error: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=f'Ошибка при работе с SQLite, {error}') finally: if conn: conn.close() def telegram_bot(): @bot.message_handler(commands=['start']) def send_welcome(message): #TODO: Здесь переписать через randint #TODO: Тексты приветствий вынести в отдельный файл try: name_stick_hello = list(Path('Stick').rglob('Hello_*')) number_stick_hello = random.randint(1, len(name_stick_hello)) random_stick_hello = open(name_stick_hello[number_stick_hello], 'rb') #logging if LOGS: bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) bot.send_sticker(message.chat.id, random_stick_hello) sleep(0.3) except: if LOGS: bot.send_message(IMPORTANT_LOGS, error_message_from_db(7)) markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) item1 = types.KeyboardButton("Предложить свой пост") item2 = types.KeyboardButton("Связаться с админами") item3 = types.KeyboardButton("Удалить пост") markup.add(item1, item2, item3) try: username = message.from_user.first_name hello_message = select_message_from_db('start_message', username) bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK) except: if LOGS: bot.send_message(IMPORTANT_LOGS, error_message_from_db(8)) bot.register_next_step_handler(message, go_send_messages) @bot.message_handler(commands=['end_command']) def after_post(message): markup = types.ReplyKeyboardMarkup(row_width=1) item1 = types.KeyboardButton("Предложить свой пост") item2 = types.KeyboardButton("Связаться с админами") item3 = types.KeyboardButton("Удалить пост") item4 = types.KeyboardButton("Сказать пока!") markup.add(item1, item2, item3, item4) bot.send_message(message.chat.id, "Выбери нужную кнопку внизу экрана".format( message.from_user, bot.get_me()), parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK) bot.register_next_step_handler(message, go_send_messages) def go_send_messages(message): global msg if message.text == 'Предложить свой пост': try: markup = types.ReplyKeyboardRemove() username = message.from_user.first_name suggest_news = select_message_from_db('suggest_news', username) bot.send_message(message.chat.id, suggest_news, parse_mode='html') sleep(0.3) username = message.from_user.first_name suggest_news_2 = select_message_from_db('suggest_news_2', username) msg = bot.send_message(message.chat.id, suggest_news_2,parse_mode='html', reply_markup=markup) except: if LOGS: bot.send_message(IMPORTANT_LOGS, error_message_from_db(10)) #logging if LOGS: bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) bot.register_next_step_handler(msg, resend_message_in_group_for_post) elif message.text == "Связаться с админами": username = message.from_user.first_name connect_with_admin = select_message_from_db('connect_with_admin', username) msg = bot.send_message(message.chat.id, connect_with_admin, parse_mode="html") #logging if LOGS: bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) bot.register_next_step_handler(msg, resend_message_in_group_for_message) elif message.text == "Удалить пост": #TODO: требует автоматизации. На входе говорим пришли мне пост, на выходе получаем идентификатор поста, удаляем из ТГ. Насчет удаления из ВК надо подумать username = message.from_user.first_name del_message = select_message_from_db('del_message', username) msg = bot.send_message(message.chat.id, del_message, parse_mode="html") #logging if LOGS: bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) bot.register_next_step_handler(msg, resend_message_in_group_for_message) elif message.text == "Сказать пока!": try: name_stick_bye = list(Path('Stick').rglob('Universal_*')) number_stick_bye = random.randint(1, len(name_stick_bye)) random_stick_bye = open(name_stick_bye[number_stick_bye], 'rb') bot.send_sticker(message.chat.id, random_stick_bye) except ApiTelegramException: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message_from_db(11)) markup = types.ReplyKeyboardRemove() try: username = message.from_user.first_name bye_message = select_message_from_db('bye_message', username) bot.send_message(message.chat.id, bye_message, parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK) except: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message_from_db(6)) if LOGS: #logging bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) else: try: username = message.from_user.first_name user_error = select_message_from_db('user_error', username) bot.send_message(message.chat.id, user_error, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK) except: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message_from_db(9)) #logging if LOGS: bot.forward_message(chat_id=GROUP_FOR_LOGS, from_chat_id=message.chat.id, message_id=message.message_id) after_post(message=message) def resend_message_in_group_for_post(message): markup = types.InlineKeyboardMarkup(row_width=1) item1 = types.InlineKeyboardButton("Опубликовать", callback_data='post_post_post') item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline') markup.add(item1, item2) try: if message.content_type == 'text': post_text = message.text.lower() if post_text.find('неанон') != -1 or post_text.find('не анон') != -1: bot.send_message( #TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', reply_markup=markup ) elif post_text.find('анон') != -1: bot.send_message( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно', reply_markup=markup ) else: bot.send_message( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', reply_markup=markup ) elif message.content_type == 'photo' and message.media_group_id == None: post_text_for_photo = message.caption.lower() bot.send_photo( # TODO: GROUP_FOR_POST chat_id=GROUP_FOR_POST, caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', photo=message.photo[-1].file_id, reply_markup=markup ) #TODO: Не понятна реализация с альбомами от слова совсем #elif message.content_type == 'photo' and message.media_group_id != None: # bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id ) else: pass except: if LOGS: username = message.from_user.first_name error_message = str(error_message_from_db(5)).replace('username', username) bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message) username = message.from_user.first_name success_send_message = select_message_from_db('success_send_message', username) bot.send_message(message.chat.id, success_send_message, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK) after_post(message=message) def resend_message_in_group_for_message(message): try: bot.forward_message(chat_id=GROUP_FOR_MESSAGE, from_chat_id=message.chat.id, message_id=message.message_id ) username = message.from_user.first_name question = select_message_from_db('question', username) bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK) except: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message_from_db(4)) after_post(message=message) #Админка @bot.callback_query_handler(func=lambda call: True) def post_for_group(call): if call.data == 'post_post_post' and call.message.content_type == 'text': try: bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text) bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message_from_db(3)) elif call.data == 'post_post_post' and call.message.content_type == 'photo': try: bot.send_photo( chat_id=MAIN_PUBLIC, caption=call.message.caption, photo=call.message.photo[-1].file_id, ) bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except: if LOGS: bot.send_message(chat_id=IMPORTANT_LOGS, text=error_message_from_db(2)) elif call.data == 'decline': try: bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) except: if LOGS: logs(IMPORTANT_LOGS, error_message_from_db(1)) def logs(chat_name, text_error): bot.send_message(chat_id=chat_name, text=text_error) if __name__ == '__main__': telegram_bot() try: bot.polling(none_stop=True) bot.enable_save_next_step_handlers(delay=2) bot.load_next_step_handlers() except ConnectionError as e: if LOGS: bot.send_message(IMPORTANT_LOGS, "Ошибка соединения, потерял связь") except Exception as r: if LOGS: bot.send_message(IMPORTANT_LOGS, "Произошло что-то непредвиденное, хелп") finally: if LOGS: bot.send_message(IMPORTANT_LOGS, 'Я упал, помогите')