from datetime import datetime, timedelta
import random
import traceback
from enum import Enum
from pathlib import Path
from time import sleep
import telebot
from telebot import types
from telebot.apihelper import ApiTelegramException
from telebot.types import InputMediaPhoto
import messages
from logs.custom_logger import Logger
#Инициализируем логгер
bot_logger = Logger(name='bot')
class State(Enum):
START = "START"
SUGGEST = "SUGGEST"
ADMIN = "ADMIN"
CHAT = "CHAT"
PRE_CHAT = "PRE_CHAT"
class TelegramHelperBot:
def __init__(self, dependency_factory):
self.state = State.START
self.BotDB = dependency_factory.get_database()
self.settings = dependency_factory.get_settings()
token = self.settings['Telegram']['bot_token']
self.GROUP_FOR_POST = self.settings['Telegram']['group_for_posts']
self.GROUP_FOR_MESSAGE = self.settings['Telegram']['group_for_message']
self.MAIN_PUBLIC = self.settings['Telegram']['main_public']
self.GROUP_FOR_LOGS = self.settings['Telegram']['group_for_logs']
self.IMPORTANT_LOGS = self.settings['Telegram']['important_logs']
self.PREVIEW_LINK = self.settings['Telegram']['preview_link']
self.LOGS = self.settings['Settings']['logs']
self.TEST = self.settings['Settings']['test']
self.bot = telebot.TeleBot(token)
self.logger = bot_logger.get_logger()
# Router for user
@self.bot.message_handler(func=lambda message: True, chat_types=['private'])
def handle_message(message):
self.logger.info(
f'Получено сообщение: {message.text} от пользователя: {message.from_user.full_name} id юзера: {message.chat.id}')
if self.BotDB.check_user_in_blacklist(message.from_user.id):
attribute = self.BotDB.get_blacklist_users_by_id(message.from_user.id)
self.bot.send_message(message.chat.id,
f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}',
parse_mode='HTML')
self.logger.info(f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) заблокирован')
return
if self.state == State.START:
if message.text == '/start':
self.start_message(message)
elif message.text == '📢Предложить свой пост':
self.suggest_post(message)
self.state = State.SUGGEST
elif message.text == '🤪Хочу стикеры':
self.stickers(message)
self.state = State.START
elif message.text == '📩Связаться с админами':
self.connect_with_admin(message)
self.state = State.PRE_CHAT
elif message.text == '👋🏼Сказать пока!':
self.end_message(message)
self.state = State.START
elif message.text == 'Выйти из чата':
self.end_message(message)
elif message.text == '/admin':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
self.state = State.ADMIN
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вошел в админ-панель')
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
self.logger.info(
f'Пользователю {message.from_user.full_name} (ID: {message.chat.id}) отказано в доступе к админ-панели')
elif message.text == '/state':
self.bot.send_message(message.chat.id,
f'Твой state == {self.state.value}')
else:
self.bot.send_message(message.chat.id,
#TODO: Здесь раньше был /state
"Не понимаю где ты находишься. Нажми /start, и я перезапущусь")
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) отправил непонятное сообщение: {message.text}')
if self.state == State.SUGGEST:
self.bot.register_next_step_handler(message, self.suggest_router)
self.state = State.START
if message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
if self.state == State.PRE_CHAT:
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
self.state = State.START
if message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
if self.state == State.CHAT:
if message.text == 'Выйти из чата':
self.state = State.START
self.end_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вышел из чата')
elif message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
else:
self.resend_message_in_group_for_message(message)
if self.state == State.ADMIN:
if message == '/admin' or message == '/restart' or message == 'Вернуться в админку':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вошел в админ-панель')
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
self.logger.info(
f'Пользователю {message.from_user.full_name} (ID: {message.chat.id}) отказано в доступе к админ-панели')
if message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
@self.bot.message_handler(func=lambda message: True, chat_types=['group'])
def handle_message(message):
"""Функция ответа админа пользователю через закрытый чат"""
self.logger.info(
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
self.state = State.CHAT
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Выйти из чата")
markup.add(item1)
message_id = 0
try:
message_id = message.reply_to_message.id
except AttributeError:
self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!')
self.logger.warning(
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа.')
message_from_admin = message.text
try:
chat_id = self.BotDB.get_user_by_message_id(message_id)
self.bot.send_message(chat_id, message_from_admin, reply_markup=markup)
self.logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id}.')
except TypeError:
self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.')
self.logger.error(
f'Ошибка при поиске пользователя в базе для ответа: {message.text} в группе {message.chat.title} (ID: {message.chat.id})')
# Админка
@self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline'])
def post_for_group(call):
self.logger.info(
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
if call.data == 'publish' and call.message.content_type == 'text':
try:
self.bot.send_message(chat_id=self.MAIN_PUBLIC, text=call.message.text)
self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id)
self.logger.info(f'Текст сообщения опубликован в канале {self.MAIN_PUBLIC}.')
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(f'Ошибка при публикации текста в канал {self.MAIN_PUBLIC}: {str(e)}')
elif call.data == 'publish' and call.message.content_type == 'photo':
try:
self.bot.send_photo(
chat_id=self.MAIN_PUBLIC,
caption=call.message.caption,
photo=call.message.photo[-1].file_id,
)
self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id)
self.logger.info(f'Пост с фото опубликован в канале {self.MAIN_PUBLIC}.')
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(f'Ошибка при публикации фотографии в канал {self.MAIN_PUBLIC}: {str(e)}')
elif call.data == 'decline':
try:
self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id)
self.logger.info(
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
except Exception as e:
if self.LOGS:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(f'Ошибка при удалении сообщения в группе {self.GROUP_FOR_POST}: {str(e)}')
@self.bot.callback_query_handler(func=lambda call: True)
def pagination(call):
if call.data[:3] == 'ban':
user_id = call.data[4:]
self.logger.info(f"Бан пользователя с ID: {user_id}")
self.ban_user(call.message, user_id)
if call.data == 'return':
self.logger.info(f"Возврат в админ панель")
self.bot.delete_message(call.message.chat.id, call.message.message_id)
self.admin_panel(call.message)
if call.data[:5] == 'unban':
user_id = call.data[6:]
self.delete_user_blacklist(user_id)
msg = f'Успешно удалено.'
self.bot.send_message(chat_id=call.message.chat.id, text=msg)
self.logger.info(f"Разблокирован пользователь с ID: {user_id}")
elif call.data[:4] == 'page':
page_number = int(call.data[5:])
self.logger.info(f"Переход на страницу {page_number}")
if call.message.text == 'Список пользователей которые последними обращались к боту':
list_users = self.BotDB.get_last_users_from_db()
keyboard = self.create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
'ban')
self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
reply_markup=keyboard)
elif "Список заблокированных пользователей".lower() in call.message.text.lower():
#Готовим сообщения
message_user = self.get_banned_users_list(int(page_number) * 7 - 7)
self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=message_user)
#Готовим клавиатуру
buttons = self.get_banned_users_buttons()
keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban')
self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
reply_markup=keyboard)
else:
self.logger.warning(f"Неизвестный callback data: {call.data}")
def start(self):
while True:
try:
print(self.bot.last_update_id)
self.bot.polling(none_stop=True)
except ConnectionError as e:
self.logger.error(
f"Произошла ошибка (потеря коннекта): {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
except Exception as e:
self.logger.error(f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now()
print('Мы в функции unban_notifier')
today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)
# Черный список
def admin_panel(self, message):
try:
self.logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Бан (Список)")
#item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю
#item3 = types.KeyboardButton("Удалить админа")
item4 = types.KeyboardButton("Разбан (список)")
item5 = types.KeyboardButton("Вернуться в бота")
markup.add(item1, item4, item5)
self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.handle_admin_message)
except Exception as e:
self.logger.error(f"Ошибка при запуске админ панели: {e}")
self.bot.register_next_step_handler(message, self.admin_panel)
def handle_admin_message(self, message):
self.logger.info(f"Получено сообщение от админа: {message.text} (пользователь: {message.from_user.id})")
try:
if message.text == "Бан (Список)":
self.get_last_users(message)
elif message.text == "Разбан (список)":
self.get_banned_users(message)
elif message.text == "Вернуться в бота":
self.start_message(message)
else:
self.logger.warning(f"Неизвестное сообщение от админа: {message.text}")
self.bot.reply_to(message, "Неизвестная команда.")
except Exception as e:
self.logger.error(f"Ошибка при обработке сообщения админа: {e}")
self.bot.reply_to(message, f"Ошибка\n\n {e}")
self.admin_panel(message)
def ban_user(self, message, user_id: int):
self.logger.info(
f"Получена команда от админа на бан пользователя: {message.text} (пользователь: {message.from_user.id})")
user_name = self.BotDB.get_username(user_id=user_id)
ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None}
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Спам")
item2 = types.KeyboardButton("Заебал стикерами")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object)
def ban_user_step_2(self, message, ban_object: dict):
self.logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {ban_object})")
ban_object['message_for_user'] = message.text
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("1")
item2 = types.KeyboardButton("7")
item3 = types.KeyboardButton("30")
item4 = types.KeyboardButton("Навсегда")
markup.add(item1, item2, item3, item4)
self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
f"его в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object)
def ban_user_step_3(self, message, ban_object: dict):
self.logger.info(f"Переход на шаг 3 бана пользователя. Словарь с данными для бана: {ban_object})")
date_to_unban = None
if message.text != 'Навсегда':
date_to_unban = self.add_days_to_date(message.text)
else:
pass
ban_object['date_to_unban'] = date_to_unban
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Подтвердить")
item2 = types.KeyboardButton("Отменить")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}",
parse_mode='html',
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object)
def ban_user_final_step(self, message, ban_object: dict):
self.logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {ban_object})")
if message.text == 'Подтвердить':
exists = self.BotDB.check_user_in_blacklist(ban_object['user_id'])
if exists:
self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.")
self.logger.info(f"Пользователь: {ban_object['user_id']} был заблокирован ранее)")
self.admin_panel(message)
else:
self.BotDB.set_user_blacklist(ban_object['user_id'],
ban_object['user_name'],
ban_object['message_for_user'],
ban_object['date_to_unban'])
self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.")
self.logger.info(f"Пользователь: {ban_object['user_id']} успешно заблокирован)")
self.admin_panel(message)
def get_last_users(self, message):
self.logger.info(
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
list_users = self.BotDB.get_last_users_from_db()
keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard)
def get_banned_users(self, message):
self.logger.info(
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
message_text = self.get_banned_users_list(0)
buttons_list = self.get_banned_users_buttons()
if buttons_list:
k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban')
self.bot.send_message(message.chat.id, message_text, reply_markup=k)
else:
self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет")
self.admin_panel(message)
def start_message(self, message):
try:
self.logger.info(
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name})")
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = open(random.choice(name_stick_hello), 'rb')
self.logger.info(f"Стикер успешно получен из БД")
# logging
if self.LOGS:
self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_sticker(message.chat.id, random_stick_hello)
sleep(0.3)
except Exception as e:
self.logger.error(f"Произошла ошибка при получении стикера. Ошибка: {str(e)}")
if self.LOGS:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
self.logger.info(
f"Получение данных для приветственного сообщения пользователю. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name})")
user_id = message.from_user.id
first_name = message.from_user.first_name
full_name = message.from_user.full_name
is_bot = message.from_user.is_bot
username = message.from_user.username
language_code = message.from_user.language_code
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not self.BotDB.user_exists(user_id):
self.BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
date)
self.BotDB.update_date_for_user(date, user_id)
markup = self.get_reply_keyboard(message)
hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE')
self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup,
disable_web_page_preview=not self.PREVIEW_LINK)
except Exception as e:
self.logger.error(
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
if self.LOGS:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def resend_message_in_group_for_message(self, message):
self.logger.info(
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.id})")
self.bot.forward_message(chat_id=self.GROUP_FOR_MESSAGE,
from_chat_id=message.chat.id,
message_id=message.message_id
)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
self.BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(self.__get_first_name(message), 'QUESTION')
markup = self.get_reply_keyboard(message)
self.bot.send_message(message.chat.id, question, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK,
reply_markup=markup)
def suggest_post(self, message):
try:
self.logger.info(
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.id})")
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS')
self.bot.send_message(message.chat.id, suggest_news, parse_mode='html')
sleep(0.3)
suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2')
self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup)
except Exception as e:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def stickers(self, message):
self.logger.info(
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
self.BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup = self.get_reply_keyboard(message)
try:
self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_message(message.chat.id,
text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
except ApiTelegramException as e:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
def connect_with_admin(self, message):
self.logger.info(
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN')
self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
# logging
if self.LOGS:
self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def end_message(self, message):
try:
self.logger.info(
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = open(random.choice(name_stick_bye), 'rb')
self.bot.send_sticker(message.chat.id, random_stick_bye)
except ApiTelegramException as e:
self.logger.error(
f"Ошибка в функции stickers при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup = types.ReplyKeyboardRemove()
try:
bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE')
self.bot.send_message(message.chat.id, bye_message,
parse_mode='html', reply_markup=markup,
disable_web_page_preview=not self.PREVIEW_LINK)
except Exception as e:
if self.LOGS:
self.logger.error(
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
#TODO: deprecated
def send_to_suggest_2(self, message):
markup = self._get_reply_keyboard_for_post()
try:
if message.content_type == 'text':
post_text = message.text.lower()
if post_text.find('неанон') != -1 or post_text.find('не анон') != -1:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif post_text.find('анон') != -1:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно',
reply_markup=markup
)
else:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif message.content_type == 'photo' and message.media_group_id is None:
post_text_for_photo = message.caption.lower()
if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно',
photo=message.photo[-1].file_id,
reply_markup=markup
)
else:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
# TODO: Не понятна реализация с альбомами от слова совсем. 11.07 Реализация понятна. Нужно отправлять альбомы + сообщение в одном сообщении. Следующее сообщение с пробелом и кнопками
# elif message.content_type == 'photo' and message.media_group_id != None:
# bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id )
else:
pass
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user)
def suggest_router(self, message):
self.logger.info(
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
try:
if message.content_type == 'text':
lower_text = message.text.lower()
post_text, is_anonymous = self._get_text_message(lower_text, message.from_user.full_name,
message.from_user.id)
if is_anonymous:
self._send_text_message(post_text)
else:
self._send_text_message(post_text)
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user)
elif message.content_type == 'photo' and message.media_group_id is None:
lower_caption = message.caption.lower()
post_caption, is_anonymous = self._get_text_message(lower_caption, message.from_user.full_name,
message.from_user.id)
if is_anonymous:
self._send_photo_message(message.photo[-1].file_id, post_caption)
else:
self._send_photo_message(message.photo[-1].file_id, post_caption)
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user)
elif message.media_group_id is not None:
self.bot.send_message(message.chat.id,
'Я пока не умею работать с несколькими файлами. Пришли текст и не более одного фото')
self.bot.register_next_step_handler(message, self.suggest_router)
else:
self.bot.send_message(message.chat.id,
'Я пока не умею работать с таким сообщением. Пришли текст и не более одного фото')
self.bot.register_next_step_handler(message, self.suggest_router)
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@staticmethod
def _get_reply_keyboard_for_post():
markup = types.InlineKeyboardMarkup(row_width=1)
item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish')
item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline')
markup.add(item1, item2)
return markup
@staticmethod
def _get_text_message(post_text: str, first_name: str, username: str):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста
Returns:
Кортеж из двух элементов:
- Сформированный текст сообщения.
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный).
"""
if "неанон" in post_text or "не анон" in post_text:
is_anonymous = False
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
elif "анон" in post_text:
is_anonymous = True
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous
else:
is_anonymous = False
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
def _send_text_message(self, post_text: str):
markup = self._get_reply_keyboard_for_post()
self.bot.send_message(
chat_id=self.GROUP_FOR_POST,
text=post_text,
parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK,
reply_markup=markup
)
def _send_photo_message(self, photo: str, post_text: str):
markup = self._get_reply_keyboard_for_post()
self.bot.send_photo(
chat_id=self.GROUP_FOR_POST,
caption=post_text,
photo=photo,
reply_markup=markup
)
def get_reply_keyboard(self, message):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
item3 = types.KeyboardButton("👋🏼Сказать пока!")
item4 = types.KeyboardButton("🤪Хочу стикеры") if not self.BotDB.get_info_about_stickers(
user_id=message.from_user.id) else None
if item4:
markup.add(item1, item2, item3, item4)
else:
markup.add(item1, item2, item3)
return markup
@staticmethod
def __get_first_name(message):
return message.from_user.first_name
def check_access(self, user_id: int):
"""Проверка прав на совершение действий"""
return self.BotDB.is_admin(user_id)
@staticmethod
def add_days_to_date(days):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=int(days))
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
@staticmethod
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str):
"""
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
Args:
page: Номер текущей страницы.
total_items: Общее количество элементов.
array_items: Лист кортежей. Содержит в себе user_name: user_id
callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
Returns:
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
"""
# Определяем общее количество страниц
total_pages = (total_items + 7 - 1) // 7
page = page
# Создаем список кнопок
buttons = []
# Вычисляем стартовый номер для текущей страницы
start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы
# Кнопки с номерами страниц
for i in range(start_index, min(start_index + 7,
len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы
buttons.append(
types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"))
# Добавляем кнопки "Предыдущая" и "Следующая"
if int(page) > 1:
buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}"))
if page < total_pages:
buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}"))
#Добавляем кнопку назад
buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return"))
# Формируем клавиатуру с 3 кнопками в ряд
keyboard = []
for i in range(0, len(buttons), 3):
keyboard.append(buttons[i:i + 3])
return types.InlineKeyboardMarkup(keyboard)
def get_banned_users_list(self, offset: int):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = self.BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
return message
def get_banned_users_buttons(self):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = self.BotDB.get_banned_users_from_db()
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
return user_ids
def delete_user_blacklist(self, user_id):
return self.BotDB.delete_user_blacklist(user_id=user_id)