Files
telegram-helper-bot/main.py
2024-07-04 00:53:41 +03:00

411 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import configparser
import os
import sys
from pathlib import Path
from time import sleep
from enum import Enum
import db
from db import BotDB
import telebot
import random
from datetime import datetime
import time
from telebot import types
from telebot.apihelper import ApiTelegramException
import messages
import traceback
# Настройки
config_path = os.path.join(sys.path[0], 'settings.ini')
config = configparser.ConfigParser()
config.read(config_path)
# TELEGRAM
BOT_TOKEN = config.get('Telegram', 'BOT_TOKEN')
GROUP_FOR_POST = config.get('Telegram', 'group_for_posts')
GROUP_FOR_MESSAGE = config.get('Telegram', 'group_for_message')
MAIN_PUBLIC = config.get('Telegram', 'main_public')
GROUP_FOR_LOGS = config.get('Telegram', 'group_for_logs')
IMPORTANT_LOGS = config.get('Telegram', 'important_logs')
PREVIEW_LINK = config.getboolean('Telegram', 'PREVIEW_LINK')
# SETTINGS
LOGS = config.getboolean('Settings', 'logs')
TEST = config.getboolean('Settings', 'test')
# Инициализируем бота и базку
BotDB = BotDB('tg-bot-database')
#TODO: state хранить в базе
#TODO: Перенести обработчик коллбэков
class State(Enum):
START = "START"
SUGGEST = "SUGGEST"
ADMIN = "ADMIN"
CHAT = "CHAT"
PRE_CHAT = "PRE_CHAT"
class TelegramHelperBot:
def __init__(self, token):
self.bot = telebot.TeleBot(token)
self.state = State.START
# Router for user
@self.bot.message_handler(func=lambda message: True, chat_types=['private'])
def handle_message(message):
if self.state == State.START:
if message.text == '/start':
print(f'Внутри функции handle_message // Команда /start // state - {self.state.value}')
self.start_message(message)
elif message.text == '📢Предложить свой пост':
print(f'Внутри функции handle_message // Команда /suggest // state - {self.state.value}')
self.suggest_post(message)
self.state = State.SUGGEST
elif message.text == '🤪Хочу стикеры':
print(f'Внутри функции handle_message // Команда /stickers // state - {self.state.value}')
self.stickers(message)
self.state = State.START
elif message.text == '📩Связаться с админами':
print(f'Внутри функции handle_message // Команда /connect // state - {self.state.value}')
self.connect_with_admin(message)
self.state = State.PRE_CHAT
print(f'В state.START - {self.state.value}')
elif message.text == '👋🏼Сказать пока!':
print(f'Внутри функции handle_message // Команда /end // state - {self.state.value}')
self.end_message(message)
self.state = State.START
elif message.text == 'Выйти из чата':
print(f'Внутри функции handle_message // Команда /end // state - {self.state.value}')
self.end_message(message)
elif message.text == '/admin':
self.state = State.ADMIN
#TODO: Админку сделать!
self.bot.send_message(message.chat.id,
"Ты в админке, Ура! Делай что хочешь")
self.admin_panel(message)
elif message.text == '/state':
print(f'Внутри функции handle_message // Команда /state // state - {self.state.value}')
self.bot.send_message(message.chat.id,
f'Твой state == {self.state.value}')
else:
self.bot.send_message(message.chat.id,
"Не понимаю где ты находишься. Нажми /state, и я расскажу что ты можешь "
"сделать")
if self.state == State.SUGGEST:
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_post)
self.state = State.START
elif self.state == State.PRE_CHAT:
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
self.state = State.START
if self.state == State.CHAT:
print(f'В state.CHAT - {self.state.value}')
if message.text == 'Выйти из чата':
self.state = State.START
self.end_message(message)
print(f'Обработчик в CHAT - {self.state.value}')
else:
print(f'Обработчик чата в чате CHAT - {self.state.value}')
self.resend_message_in_group_for_message(message)
@self.bot.message_handler(func=lambda message: True, chat_types=['group'])
def handle_message(message):
"""Функция ответа админа пользователю через закрытый чат"""
self.state = State.CHAT
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Выйти из чата")
markup.add(item1)
message_id = message.reply_to_message.id
message_from_admin = message.text
chat_id = BotDB.get_user_by_message_id(message_id)
self.bot.send_message(chat_id, message_from_admin, reply_markup=markup)
# Админка
@self.bot.callback_query_handler(func=lambda call: True)
def post_for_group(call):
if call.data == 'publish' and call.message.content_type == 'text':
try:
self.bot.send_message(chat_id=MAIN_PUBLIC, text=call.message.text)
self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
elif call.data == 'publish' and call.message.content_type == 'photo':
try:
self.bot.send_photo(
chat_id=MAIN_PUBLIC,
caption=call.message.caption,
photo=call.message.photo[-1].file_id,
)
self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
elif call.data == 'decline':
try:
self.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
except Exception as e:
if LOGS:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def register_chat_handler(self, message):
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
def start(self):
while True:
try:
self.bot.polling(none_stop=True)
except (ConnectionError, Exception):
print(f"Произошла ошибка: {str(Exception)}\n\nTraceback:\n{traceback.format_exc()}")
# Черный список
def admin_panel(self, message):
try:
#Добавить админа: 842766148
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Забанить пользователя из списка")
item2 = types.KeyboardButton("Забанить пользователя по ID")
markup.add(item1, item2)
self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user)
except Exception as e:
self.bot.register_next_step_handler(message, self.admin_panel)
def ban_user(self, message):
# проверяем, что ID передан правильно
#TODO: остановился где-то тут, функция не дописана. Хотел сделать админку + бан пользователей
try:
if message.text == "Забанить пользователя из списка":
pass
elif message.text == "Забанить пользователя по ID":
self.bot.send_message(message.chat.id, "Пришли ID юзера")
#BotDB.set_user_blacklist(ban_user_id)
#bot.reply_to(message, f"Пользователь {ban_user_id} заблокирован.")
except Exception as e:
self.bot.reply_to(message, f"Укажи ID пользователя. Ошибка\n\n {e}")
self.bot.register_next_step_handler(message, self.ban_user)
def start_message(self, message):
try:
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = open(random.choice(name_stick_hello), 'rb')
# logging
if LOGS:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_sticker(message.chat.id, random_stick_hello)
sleep(0.3)
except Exception as e:
print(f'{str(e)}')
if LOGS:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
user_id = message.from_user.id
first_name = message.from_user.first_name
full_name = message.from_user.full_name
is_bot = message.from_user.is_bot
username = message.from_user.username
language_code = message.from_user.language_code
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not BotDB.user_exists(user_id):
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
date)
BotDB.update_date_for_user(date, user_id)
markup = self.get_reply_keyboard(message)
hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE')
self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup,
disable_web_page_preview=not PREVIEW_LINK)
except Exception as e:
print(f'{str(e)}')
if LOGS:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
#TODO: При отправке более одного сообщения, не пересылает сообщение в чат, БАГ!!!!
def resend_message_in_group_for_message(self, message):
self.bot.forward_message(chat_id=GROUP_FOR_MESSAGE,
from_chat_id=message.chat.id,
message_id=message.message_id
)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_message_in_db(message.text, message.message_id + 1, message.from_user.id, date, 0)
question = messages.get_message(self.__get_first_name(message), 'QUESTION')
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Выйти из чата")
markup.add(item1)
self.bot.send_message(message.chat.id, question, parse_mode='html', disable_web_page_preview=not PREVIEW_LINK,
reply_markup=markup)
def suggest_post(self, message):
try:
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS')
self.bot.send_message(message.chat.id, suggest_news, parse_mode='html')
sleep(0.3)
suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2')
self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup)
except Exception as e:
if LOGS:
self.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
# logging
if LOGS:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def stickers(self, message):
BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup = self.get_reply_keyboard(message)
try:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_message(message.chat.id,
text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
except ApiTelegramException as e:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def connect_with_admin(self, message):
connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN')
self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
# logging
if LOGS:
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def end_message(self, message):
try:
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = open(random.choice(name_stick_bye), 'rb')
self.bot.send_sticker(message.chat.id, random_stick_bye)
except ApiTelegramException as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup = types.ReplyKeyboardRemove()
try:
bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE')
self.bot.send_message(message.chat.id, bye_message,
parse_mode='html', reply_markup=markup, disable_web_page_preview=not PREVIEW_LINK)
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
if LOGS:
# logging
self.bot.forward_message(chat_id=GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def resend_message_in_group_for_post(self, message):
markup = types.InlineKeyboardMarkup(row_width=1)
item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish')
item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline')
markup.add(item1, item2)
try:
if message.content_type == 'text':
post_text = message.text.lower()
if post_text.find('неанон') != -1 or post_text.find('не анон') != -1:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif post_text.find('анон') != -1:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно',
reply_markup=markup
)
else:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif message.content_type == 'photo' and message.media_group_id is None:
post_text_for_photo = message.caption.lower()
if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно',
photo=message.photo[-1].file_id,
reply_markup=markup
)
else:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
# TODO: Не понятна реализация с альбомами от слова совсем
# elif message.content_type == 'photo' and message.media_group_id != None:
# bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id )
else:
pass
except Exception as e:
if LOGS:
self.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not PREVIEW_LINK, reply_markup=markup_for_user)
@staticmethod
def get_reply_keyboard(message):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
item3 = types.KeyboardButton("👋🏼Сказать пока!")
#TODO: Есть ощущение что не совсем так работает как надо
item4 = types.KeyboardButton("🤪Хочу стикеры") if not BotDB.get_info_about_stickers(
user_id=message.from_user.id) else None
if item4:
markup.add(item1, item2, item3, item4)
else:
markup.add(item1, item2, item3)
return markup
@staticmethod
def __get_first_name(message):
return message.from_user.first_name
bot = TelegramHelperBot(BOT_TOKEN)
if __name__ == "__main__":
# Запускаем бота
bot.start()