diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8cf4112 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/database/tg-bot-database diff --git a/Stick/Hello_11.webp b/Stick/Hello_11.webp new file mode 100644 index 0000000..8c4d84a Binary files /dev/null and b/Stick/Hello_11.webp differ diff --git a/Stick/Hello_12.webp b/Stick/Hello_12.webp new file mode 100644 index 0000000..2f8434e Binary files /dev/null and b/Stick/Hello_12.webp differ diff --git a/Stick/Hello_13.webp b/Stick/Hello_13.webp new file mode 100644 index 0000000..2acb03c Binary files /dev/null and b/Stick/Hello_13.webp differ diff --git a/Stick/Universal_11.webp b/Stick/Universal_11.webp new file mode 100644 index 0000000..dfd373b Binary files /dev/null and b/Stick/Universal_11.webp differ diff --git a/helper_bot/__init__.py b/helper_bot/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/helper_bot/filters/main.py b/helper_bot/filters/main.py new file mode 100644 index 0000000..021e287 --- /dev/null +++ b/helper_bot/filters/main.py @@ -0,0 +1,15 @@ +from typing import Union + +from aiogram.filters import BaseFilter +from aiogram.types import Message + + +class ChatTypeFilter(BaseFilter): # [1] + def __init__(self, chat_type: Union[str, list]): # [2] + self.chat_type = chat_type + + async def __call__(self, message: Message) -> bool: # [3] + if isinstance(self.chat_type, str): + return message.chat.type == self.chat_type + else: + return message.chat.type in self.chat_type diff --git a/helper_bot/handlers/admin/__init__.py b/helper_bot/handlers/admin/__init__.py new file mode 100644 index 0000000..a93c964 --- /dev/null +++ b/helper_bot/handlers/admin/__init__.py @@ -0,0 +1 @@ +from .main import admin_router \ No newline at end of file diff --git a/helper_bot/handlers/admin/__pycache__/__init__.cpython-312.pyc b/helper_bot/handlers/admin/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..9ab05ed Binary files /dev/null and b/helper_bot/handlers/admin/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/handlers/admin/__pycache__/main.cpython-312.pyc b/helper_bot/handlers/admin/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..b9ab95a Binary files /dev/null and b/helper_bot/handlers/admin/__pycache__/main.cpython-312.pyc differ diff --git a/helper_bot/handlers/admin/main.py b/helper_bot/handlers/admin/main.py new file mode 100644 index 0000000..3d2d093 --- /dev/null +++ b/helper_bot/handlers/admin/main.py @@ -0,0 +1,143 @@ +import traceback + +from aiogram import Router, types, F +from aiogram.filters import Command, StateFilter +from aiogram.fsm.context import FSMContext + +from helper_bot.filters.main import ChatTypeFilter +from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \ + create_keyboard_for_ban_days, create_keyboard_for_approve_ban +from helper_bot.utils.base_dependency_factory import BaseDependencyFactory +from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list +from logs.custom_logger import Logger + +from database.db import BotDB + +admin_router = Router() + +#Инициализируем логгер +admin_logger = Logger(name='admin_handler') +logger = admin_logger.get_logger() + +bdf = BaseDependencyFactory() +GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] +GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] +MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] +GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] +IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] +PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] +LOGS = bdf.settings['Settings']['logs'] +TEST = bdf.settings['Settings']['test'] + +BotDB = BotDB('database/tg-bot-database') + + +@admin_router.message( + ChatTypeFilter(chat_type=["private"]), + Command('admin') +) +async def admin_panel(message: types.Message, state: FSMContext): + try: + if check_access(message.from_user.id): + await state.set_state("ADMIN") + logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}") + markup = get_reply_keyboard_admin() + await message.answer("Добро пожаловать в админку. Выбери что хочешь:", + reply_markup=markup) + else: + await message.answer('Доступ запрещен, досвидания!') + except Exception as e: + logger.error(f"Ошибка при запуске админ панели: {e}") + await message.bot.send_message(IMPORTANT_LOGS, + f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}') + + +@admin_router.message( + ChatTypeFilter(chat_type=["private"]), + StateFilter("ADMIN"), + F.text == 'Бан (Список)' +) +async def get_last_users(message: types.Message): + logger.info( + f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") + list_users = BotDB.get_last_users_from_db() + keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') + await message.answer(text="Список пользователей которые последними обращались к боту", + reply_markup=keyboard) + + +@admin_router.message( + ChatTypeFilter(chat_type=["private"]), + StateFilter("ADMIN"), + F.text == 'Разбан (список)' +) +async def get_banned_users(message): + logger.info( + f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") + message_text = get_banned_users_list(0) + buttons_list = get_banned_users_buttons() + if buttons_list: + k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock') + await message.answer(text=message_text, reply_markup=k) + else: + await message.answer(text="В списке забанненых пользователей никого нет") + + +@admin_router.message( + ChatTypeFilter(chat_type=["private"]), + StateFilter("BAN_2") +) +async def ban_user_step_2(message: types.Message, state: FSMContext): + user_data = await state.get_data() + logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})") + await state.update_data(message_for_user=message.text) + markup = create_keyboard_for_ban_days() + await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " + f"его в чат", reply_markup=markup) + await state.set_state("BAN_3") + + +@admin_router.message( + ChatTypeFilter(chat_type=["private"]), + StateFilter("BAN_3") +) +async def ban_user_step_3(message: types.Message, state: FSMContext): + logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}") + if message.text != 'Навсегда': + count_days = int(message.text) + date_to_unban = add_days_to_date(count_days) + else: + date_to_unban = None + logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}") + await state.update_data(date_to_unban=date_to_unban) + user_data = await state.get_data() + markup = create_keyboard_for_approve_ban() + await message.answer( + f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}", + reply_markup=markup) + await state.set_state("BAN_FINAL") + + +@admin_router.message( + ChatTypeFilter(chat_type=["private"]), + StateFilter("BAN_FINAL") +) +async def approve_ban(message: types.Message, state: FSMContext): + user_data = await state.get_data() + logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})") + if message.text == 'Подтвердить': + exists = BotDB.check_user_in_blacklist(user_data['user_id']) + if exists: + await message.reply(f"Пользователь уже был заблокирован ранее.") + logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)") + await state.set_state('ADMIN') + else: + BotDB.set_user_blacklist(user_data['user_id'], + user_data['user_name'], + user_data['message_for_user'], + user_data['date_to_unban']) + await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.") + logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)") + await state.set_state('ADMIN') + markup = get_reply_keyboard_admin() + await message.answer('Вернулись в меню', reply_markup=markup) diff --git a/helper_bot/handlers/callback/__init__.py b/helper_bot/handlers/callback/__init__.py new file mode 100644 index 0000000..4474944 --- /dev/null +++ b/helper_bot/handlers/callback/__init__.py @@ -0,0 +1 @@ +from .main import callback_router diff --git a/helper_bot/handlers/callback/__pycache__/__init__.cpython-312.pyc b/helper_bot/handlers/callback/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..0089e8a Binary files /dev/null and b/helper_bot/handlers/callback/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/handlers/callback/__pycache__/main.cpython-312.pyc b/helper_bot/handlers/callback/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..90d4174 Binary files /dev/null and b/helper_bot/handlers/callback/__pycache__/main.cpython-312.pyc differ diff --git a/helper_bot/handlers/callback/main.py b/helper_bot/handlers/callback/main.py new file mode 100644 index 0000000..69620db --- /dev/null +++ b/helper_bot/handlers/callback/main.py @@ -0,0 +1,164 @@ +import traceback + +from aiogram import Router, F, types +from aiogram.fsm.context import FSMContext +from aiogram.types import CallbackQuery + +from database.db import BotDB +from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \ + create_keyboard_for_ban_reason +from helper_bot.utils.base_dependency_factory import BaseDependencyFactory +from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \ + get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message +from logs.custom_logger import Logger + +callback_router = Router() + +#Инициализируем логгер +callback_logger = Logger(name='callback_logger') +logger = callback_logger.get_logger() + +bdf = BaseDependencyFactory() +GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] +GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] +MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] +GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] +IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] +PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] +LOGS = bdf.settings['Settings']['logs'] +TEST = bdf.settings['Settings']['test'] + +BotDB = BotDB('database/tg-bot-database') + + +@callback_router.callback_query( + F.data == "publish" +) +async def post_for_group(call: CallbackQuery, state: FSMContext): + logger.info( + f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') + if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^": + try: + await send_text_message(MAIN_PUBLIC, call.message, call.message.text) + await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) + logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.') + await call.answer(text='Выложено!', show_alert=True, cache_time=3) + except Exception as e: + await call.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}') + await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) + elif call.data == 'publish' and call.message.content_type == 'photo': + try: + print(f'CALLMESSAGE - {call.message.text}') + await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption) + await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) + logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.') + await call.answer(text='Выложено!', show_alert=True, cache_time=3) + except Exception as e: + await call.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}') + await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) + elif call.data == 'publish' and call.message.text == "^": + user_data = await state.get_data() + media_group_message_id = get_help_message_id(call.message.message_id, user_data) + await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST,message_id=media_group_message_id, reply_markup=None) + #await call.bot.copy_messages(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST, message_ids=[media_group_message_id, media_group_message_id-1]) + await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id) + print(user_data['media_group_message_id']) + print(user_data['help_message_id']) + await call.answer(text='Выложено!', show_alert=True, cache_time=3) + +@callback_router.callback_query( + F.data == "decline" +) +async def decline_post_for_group(call: CallbackQuery, state: FSMContext): + logger.info( + f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') + try: + if call.message.content_type == 'text' and call.message.text != "^": + await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id) + logger.info( + f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).') + await call.answer(text='Отклонено!', show_alert=True, cache_time=3) + if call.message.text == '^': + user_data = await state.get_data() + media_group_message_id = get_help_message_id(call.message.message_id, user_data) + await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id) + except Exception as e: + await call.bot.send_message(IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}') + await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3) + + +@callback_router.callback_query( + F.data.contains('ban') +) +async def process_ban_user(call: CallbackQuery, state: FSMContext): + user_id = call.data[4:] + logger.info( + f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}") + user_name = BotDB.get_username(user_id=user_id) + if user_name: + await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None, + date_to_unban=None) + markup = create_keyboard_for_ban_reason() + await call.message.answer( + text=f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат", + reply_markup=markup) + await state.set_state('BAN_2') + else: + markup = get_reply_keyboard_admin() + await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup) + await state.set_state('ADMIN') + + +@callback_router.callback_query( + F.data.contains('unlock') +) +async def process_unlock_user(call: CallbackQuery): + user_id = call.data[7:] + delete_user_blacklist(user_id) + logger.info(f"Разблокирован пользователь с ID: {user_id}") + username = BotDB.get_username(user_id) + await call.answer(f'Пользователь разблокирован {username}', show_alert=True) + + +@callback_router.callback_query( + F.data == 'return' +) +async def return_to_main_menu(call: CallbackQuery): + await call.message.delete() + logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}") + markup = get_reply_keyboard_admin() + await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:", + reply_markup=markup) + + +@callback_router.callback_query( + F.data.contains('page') +) +async def change_page(call: CallbackQuery): + page_number = int(call.data[5:]) + logger.info(f"Переход на страницу {page_number}") + if call.message.text == 'Список пользователей которые последними обращались к боту': + list_users = BotDB.get_last_users_from_db() + #TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range + keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users, + 'ban') + + await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id, + reply_markup=keyboard) + else: + #Готовим сообщения + message_user = get_banned_users_list(int(page_number) * 7 - 7) + await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, + text=message_user) + + #Готовим клавиатуру + buttons = get_banned_users_buttons() + keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock') + await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id, + reply_markup=keyboard) diff --git a/helper_bot/handlers/group/__init__.py b/helper_bot/handlers/group/__init__.py new file mode 100644 index 0000000..7ef7e16 --- /dev/null +++ b/helper_bot/handlers/group/__init__.py @@ -0,0 +1 @@ +from .main import group_router diff --git a/helper_bot/handlers/group/__pycache__/__init__.cpython-312.pyc b/helper_bot/handlers/group/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..995ec3a Binary files /dev/null and b/helper_bot/handlers/group/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/handlers/group/__pycache__/main.cpython-312.pyc b/helper_bot/handlers/group/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..271faac Binary files /dev/null and b/helper_bot/handlers/group/__pycache__/main.cpython-312.pyc differ diff --git a/helper_bot/handlers/group/main.py b/helper_bot/handlers/group/main.py new file mode 100644 index 0000000..0d2b4d9 --- /dev/null +++ b/helper_bot/handlers/group/main.py @@ -0,0 +1,54 @@ +from aiogram import Router, types +from aiogram.fsm.context import FSMContext + +from database.db import BotDB +from helper_bot.filters.main import ChatTypeFilter +from helper_bot.keyboards.main import get_reply_keyboard_leave_chat +from helper_bot.utils.base_dependency_factory import BaseDependencyFactory +from helper_bot.utils.helper_func import send_text_message +from logs.custom_logger import Logger + +group_router = Router() + +#Инициализируем логгер +group_logger = Logger(name='group_logger') +logger = group_logger.get_logger() + +bdf = BaseDependencyFactory() +GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] +GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] +MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] +GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] +IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] +PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] +LOGS = bdf.settings['Settings']['logs'] +TEST = bdf.settings['Settings']['test'] + +BotDB = BotDB('database/tg-bot-database') + + +@group_router.message( + ChatTypeFilter(chat_type=["group", "supergroup"]), +) +async def handle_message(message: types.Message, state: FSMContext): + """Функция ответа админа пользователю через закрытый чат""" + logger.info( + f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"') + markup = get_reply_keyboard_leave_chat() + message_id = 0 + try: + message_id = message.reply_to_message.message_id + except AttributeError as e: + await message.answer('Блять, выдели сообщение!') + logger.warning( + f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}') + message_from_admin = message.text + try: + chat_id = BotDB.get_user_by_message_id(message_id) + await send_text_message(chat_id, message, message_from_admin, markup) + await state.set_state("CHAT") + logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}') + except TypeError as e: + await message.answer('Не могу найти кому ответить в базе, проебали сообщение.') + logger.error( + f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}') diff --git a/helper_bot/handlers/private/__init__.py b/helper_bot/handlers/private/__init__.py new file mode 100644 index 0000000..1460a04 --- /dev/null +++ b/helper_bot/handlers/private/__init__.py @@ -0,0 +1 @@ +from .main import private_router diff --git a/helper_bot/handlers/private/__pycache__/__init__.cpython-312.pyc b/helper_bot/handlers/private/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..896afb2 Binary files /dev/null and b/helper_bot/handlers/private/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/handlers/private/__pycache__/main.cpython-312.pyc b/helper_bot/handlers/private/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..741a33d Binary files /dev/null and b/helper_bot/handlers/private/__pycache__/main.cpython-312.pyc differ diff --git a/helper_bot/handlers/private/main.py b/helper_bot/handlers/private/main.py new file mode 100644 index 0000000..46ff613 --- /dev/null +++ b/helper_bot/handlers/private/main.py @@ -0,0 +1,281 @@ +import random +import traceback +from datetime import datetime +from pathlib import Path +from time import sleep + +from aiogram import types, Router, F +from aiogram.filters import Command, StateFilter +from aiogram.fsm.context import FSMContext +from aiogram.types import FSInputFile + +from helper_bot.filters.main import ChatTypeFilter +from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post +from helper_bot.keyboards.main import get_reply_keyboard_leave_chat +from helper_bot.middlewares.text_middleware import AlbumMiddleware +from helper_bot.utils import messages +from helper_bot.utils.base_dependency_factory import BaseDependencyFactory +from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \ + process_photo_album, send_media_group_message +from logs.custom_logger import Logger + +from database.db import BotDB + +private_router = Router() + +private_router.message.middleware(AlbumMiddleware()) + +#Инициализируем логгер +private_logger = Logger(name='private_handler') +logger = private_logger.get_logger() + +bdf = BaseDependencyFactory() +GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts'] +GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message'] +MAIN_PUBLIC = bdf.settings['Telegram']['main_public'] +GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs'] +IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs'] +PREVIEW_LINK = bdf.settings['Telegram']['preview_link'] +LOGS = bdf.settings['Settings']['logs'] +TEST = bdf.settings['Settings']['test'] + +BotDB = BotDB('database/tg-bot-database') + + +@private_router.message( + ChatTypeFilter(chat_type=["private"]), + Command("start") +) +@private_router.message( + ChatTypeFilter(chat_type=["private"]), + F.text == 'Вернуться в бота' +) +async def handle_start_message(message: types.Message, state: FSMContext): + try: + await message.forward(chat_id=GROUP_FOR_LOGS) + await state.set_state("START") + logger.info( + f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} " + f"Имя автора сообщения: {message.from_user.full_name})") + name_stick_hello = list(Path('Stick').rglob('Hello_*')) + random_stick_hello = random.choice(name_stick_hello) + random_stick_hello = FSInputFile(path=random_stick_hello) + logger.info(f"Стикер успешно получен из БД") + await message.answer_sticker(random_stick_hello) + sleep(0.3) + except Exception as e: + logger.error(f"Произошла ошибка handle_start_message. Ошибка:{str(e)}") + await message.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + try: + current_state = await state.get_state() + logger.info( + f"Получение данных для приветственного сообщения пользователю. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} State - {current_state}") + user_id = message.from_user.id + first_name = message.from_user.first_name + full_name = message.from_user.full_name + is_bot = message.from_user.is_bot + username = message.from_user.username + language_code = message.from_user.language_code + current_date = datetime.now() + date = current_date.strftime("%Y-%m-%d %H:%M:%S") + if not BotDB.user_exists(user_id): + BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, + date) + BotDB.update_date_for_user(date, user_id) + markup = get_reply_keyboard(BotDB, message.from_user.id) + hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE') + await message.answer(hello_message, reply_markup=markup) + except Exception as e: + logger.error( + f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}") + await message.bot.send_message(IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + +@private_router.message( + StateFilter("START"), + ChatTypeFilter(chat_type=["private"]), + F.text == '📢Предложить свой пост' +) +async def suggest_post(message: types.Message, state: FSMContext): + try: + await message.forward(chat_id=GROUP_FOR_LOGS) + await state.set_state("SUGGEST") + current_state = await state.get_state() + logger.info( + f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}") + markup = types.ReplyKeyboardRemove() + suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS') + await message.answer(suggest_news) + sleep(0.3) + suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2') + await message.answer(suggest_news_2, reply_markup=markup) + except Exception as e: + await message.bot.send_message(IMPORTANT_LOGS, + f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + +@private_router.message( + ChatTypeFilter(chat_type=["private"]), + F.text == '👋🏼Сказать пока!' +) +@private_router.message( + ChatTypeFilter(chat_type=["private"]), + F.text == 'Выйти из чата' +) +async def end_message(message: types.Message, state: FSMContext): + try: + logger.info( + f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + name_stick_bye = list(Path('Stick').rglob('Universal_*')) + random_stick_bye = random.choice(name_stick_bye) + random_stick_bye = FSInputFile(path=random_stick_bye) + await message.answer_sticker(random_stick_bye) + except Exception as e: + logger.error( + f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + await message.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + try: + markup = types.ReplyKeyboardRemove() + bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE') + await message.answer(bye_message, reply_markup=markup) + await state.set_state("START") + except Exception as e: + logger.error( + f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + await message.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + +@private_router.message( + StateFilter("SUGGEST"), + ChatTypeFilter(chat_type=["private"]), +) +async def suggest_router(message: types.Message, state: FSMContext, album: list = None): + logger.info( + f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + try: + if message.content_type == 'text': + lower_text = message.text.lower() + post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name, + message.from_user.username) + markup = get_reply_keyboard_for_post() + if is_anonymous: + await send_text_message(GROUP_FOR_POST, message, post_text, markup) + else: + await send_text_message(GROUP_FOR_POST, message, post_text, markup) + markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) + success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') + await message.answer(success_send_message, reply_markup=markup_for_user) + await state.set_state("START") + elif message.content_type == 'photo' and message.media_group_id is None: + lower_caption = message.caption.lower() + markup = get_reply_keyboard_for_post() + post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name, + message.from_user.username) + #TODO: тут какая-то шляпа + if is_anonymous: + await send_photo_message(GROUP_FOR_POST, message, + message.photo[-1].file_id, post_caption, markup) + else: + await send_photo_message(GROUP_FOR_POST, message, + message.photo[-1].file_id, post_caption, markup) + markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) + success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') + await message.answer(success_send_message, reply_markup=markup_for_user) + await state.set_state("START") + elif message.media_group_id is not None: + post_caption = " " + if album[0].caption: + lower_caption = album[0].caption.lower() + post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name, + message.from_user.username) + media_group = process_photo_album(album, post_caption) + media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message, + media_group) + sleep(0.2) + markup = get_reply_keyboard_for_post() + help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup) + await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id) + + markup_for_user = get_reply_keyboard(BotDB, message.from_user.id) + success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE') + await message.answer(success_send_message, reply_markup=markup_for_user) + await state.set_state("START") + else: + await message.bot.send_message(message.chat.id, + 'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)') + except Exception as e: + await message.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + + +@private_router.message( + ChatTypeFilter(chat_type=["private"]), + F.text == '🤪Хочу стикеры' +) +async def stickers(message: types.Message, state: FSMContext): + logger.info( + f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + markup = get_reply_keyboard(BotDB, message.from_user.id) + try: + BotDB.update_info_about_stickers(user_id=message.from_user.id) + await message.forward(chat_id=GROUP_FOR_LOGS) + await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', + reply_markup=markup) + await state.set_state("START") + except Exception as e: + await message.bot.send_message(chat_id=IMPORTANT_LOGS, + text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") + logger.error( + f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + + +@private_router.message( + StateFilter("START"), + ChatTypeFilter(chat_type=["private"]), + F.text == '📩Связаться с админами' +) +async def connect_with_admin(message: types.Message, state: FSMContext): + logger.info( + f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") + admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN') + await message.answer(admin_message, parse_mode="html") + await message.forward(chat_id=GROUP_FOR_LOGS) + await state.set_state("PRE_CHAT") + + +@private_router.message( + StateFilter("PRE_CHAT"), + ChatTypeFilter(chat_type=["private"]), +) +@private_router.message( + StateFilter("CHAT"), + ChatTypeFilter(chat_type=["private"]), +) +async def resend_message_in_group_for_message(message: types.Message, state: FSMContext): + logger.info( + f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})") + await message.forward(chat_id=GROUP_FOR_MESSAGE) + current_date = datetime.now() + date = current_date.strftime("%Y-%m-%d %H:%M:%S") + BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) + question = messages.get_message(get_first_name(message), 'QUESTION') + user_state = await state.get_state() + if user_state == "PRE_CHAT": + markup = get_reply_keyboard(BotDB, message.from_user.id) + await message.answer(question, reply_markup=markup) + await state.set_state("START") + elif user_state == "CHAT": + markup = get_reply_keyboard_leave_chat() + await message.answer(question, reply_markup=markup) + +# @private_router.message( +# ChatTypeFilter(chat_type=["private"]) +# ) +# async def default(message: types.Message, state: FSMContext): +# markup = get_reply_keyboard(BotDB, message.from_user.id) +# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup) +# await state.set_state("START") diff --git a/helper_bot/helper_bot.py b/helper_bot/helper_bot.py deleted file mode 100644 index 8010dfc..0000000 --- a/helper_bot/helper_bot.py +++ /dev/null @@ -1,793 +0,0 @@ -from datetime import datetime, timedelta -import random -import traceback -from enum import Enum -from pathlib import Path -from time import sleep -import telebot -from telebot import types -from telebot.apihelper import ApiTelegramException -from telebot.types import InputMediaPhoto - -import messages -from logs.custom_logger import Logger - -#Инициализируем логгер -bot_logger = Logger(name='bot') - - -class State(Enum): - START = "START" - SUGGEST = "SUGGEST" - ADMIN = "ADMIN" - CHAT = "CHAT" - PRE_CHAT = "PRE_CHAT" - - -class TelegramHelperBot: - def __init__(self, dependency_factory): - self.state = State.START - self.BotDB = dependency_factory.get_database() - self.settings = dependency_factory.get_settings() - token = self.settings['Telegram']['bot_token'] - self.GROUP_FOR_POST = self.settings['Telegram']['group_for_posts'] - self.GROUP_FOR_MESSAGE = self.settings['Telegram']['group_for_message'] - self.MAIN_PUBLIC = self.settings['Telegram']['main_public'] - self.GROUP_FOR_LOGS = self.settings['Telegram']['group_for_logs'] - self.IMPORTANT_LOGS = self.settings['Telegram']['important_logs'] - self.PREVIEW_LINK = self.settings['Telegram']['preview_link'] - self.LOGS = self.settings['Settings']['logs'] - self.TEST = self.settings['Settings']['test'] - self.bot = telebot.TeleBot(token) - self.logger = bot_logger.get_logger() - - # Router for user - @self.bot.message_handler(func=lambda message: True, chat_types=['private']) - def handle_message(message): - self.logger.info( - f'Получено сообщение: {message.text} от пользователя: {message.from_user.full_name} id юзера: {message.chat.id}') - if self.BotDB.check_user_in_blacklist(message.from_user.id): - attribute = self.BotDB.get_blacklist_users_by_id(message.from_user.id) - self.bot.send_message(message.chat.id, - f'Ты заблокирован\nПричина блокировки: {attribute[2]}\nДата разблокировки: {attribute[3]}', - parse_mode='HTML') - self.logger.info(f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) заблокирован') - return - if self.state == State.START: - if message.text == '/start': - self.start_message(message) - elif message.text == '📢Предложить свой пост': - self.suggest_post(message) - self.state = State.SUGGEST - elif message.text == '🤪Хочу стикеры': - self.stickers(message) - self.state = State.START - elif message.text == '📩Связаться с админами': - self.connect_with_admin(message) - self.state = State.PRE_CHAT - elif message.text == '👋🏼Сказать пока!': - self.end_message(message) - self.state = State.START - elif message.text == 'Выйти из чата': - self.end_message(message) - elif message.text == '/admin': - access = self.check_access(message.from_user.id) - if access: - self.admin_panel(message) - self.state = State.ADMIN - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вошел в админ-панель') - else: - self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') - self.logger.info( - f'Пользователю {message.from_user.full_name} (ID: {message.chat.id}) отказано в доступе к админ-панели') - elif message.text == '/state': - self.bot.send_message(message.chat.id, - f'Твой state == {self.state.value}') - else: - self.bot.send_message(message.chat.id, - #TODO: Здесь раньше был /state - "Не понимаю где ты находишься. Нажми /start, и я перезапущусь") - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) отправил непонятное сообщение: {message.text}') - - if self.state == State.SUGGEST: - self.bot.register_next_step_handler(message, self.suggest_router) - self.state = State.START - if message.text == '/start': - self.state = State.START - self.start_message(message) - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню') - if self.state == State.PRE_CHAT: - self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message) - self.state = State.START - if message.text == '/start': - self.state = State.START - self.start_message(message) - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню') - - if self.state == State.CHAT: - if message.text == 'Выйти из чата': - self.state = State.START - self.end_message(message) - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вышел из чата') - elif message.text == '/start': - self.state = State.START - self.start_message(message) - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню') - else: - self.resend_message_in_group_for_message(message) - - if self.state == State.ADMIN: - if message == '/admin' or message == '/restart' or message == 'Вернуться в админку': - access = self.check_access(message.from_user.id) - if access: - self.admin_panel(message) - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вошел в админ-панель') - else: - self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!') - self.logger.info( - f'Пользователю {message.from_user.full_name} (ID: {message.chat.id}) отказано в доступе к админ-панели') - if message.text == '/start': - self.state = State.START - self.start_message(message) - self.logger.info( - f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню') - - @self.bot.message_handler(func=lambda message: True, chat_types=['group']) - def handle_message(message): - """Функция ответа админа пользователю через закрытый чат""" - self.logger.info( - f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"') - self.state = State.CHAT - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Выйти из чата") - markup.add(item1) - message_id = 0 - try: - message_id = message.reply_to_message.id - except AttributeError: - self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!') - self.logger.warning( - f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа.') - message_from_admin = message.text - try: - chat_id = self.BotDB.get_user_by_message_id(message_id) - self.bot.send_message(chat_id, message_from_admin, reply_markup=markup) - self.logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id}.') - except TypeError: - self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.') - self.logger.error( - f'Ошибка при поиске пользователя в базе для ответа: {message.text} в группе {message.chat.title} (ID: {message.chat.id})') - - # Админка - @self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline']) - def post_for_group(call): - self.logger.info( - f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})') - if call.data == 'publish' and call.message.content_type == 'text': - try: - self.bot.send_message(chat_id=self.MAIN_PUBLIC, text=call.message.text) - self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) - self.logger.info(f'Текст сообщения опубликован в канале {self.MAIN_PUBLIC}.') - except Exception as e: - if self.LOGS: - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - self.logger.error(f'Ошибка при публикации текста в канал {self.MAIN_PUBLIC}: {str(e)}') - elif call.data == 'publish' and call.message.content_type == 'photo': - try: - self.bot.send_photo( - chat_id=self.MAIN_PUBLIC, - caption=call.message.caption, - photo=call.message.photo[-1].file_id, - ) - self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) - self.logger.info(f'Пост с фото опубликован в канале {self.MAIN_PUBLIC}.') - except Exception as e: - if self.LOGS: - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - self.logger.error(f'Ошибка при публикации фотографии в канал {self.MAIN_PUBLIC}: {str(e)}') - elif call.data == 'decline': - try: - self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id) - self.logger.info( - f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).') - except Exception as e: - if self.LOGS: - self.bot.send_message(self.IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - self.logger.error(f'Ошибка при удалении сообщения в группе {self.GROUP_FOR_POST}: {str(e)}') - - @self.bot.callback_query_handler(func=lambda call: True) - def pagination(call): - if call.data[:3] == 'ban': - user_id = call.data[4:] - self.logger.info(f"Бан пользователя с ID: {user_id}") - self.ban_user(call.message, user_id) - if call.data == 'return': - self.logger.info(f"Возврат в админ панель") - self.bot.delete_message(call.message.chat.id, call.message.message_id) - self.admin_panel(call.message) - if call.data[:5] == 'unban': - user_id = call.data[6:] - self.delete_user_blacklist(user_id) - msg = f'Успешно удалено.' - self.bot.send_message(chat_id=call.message.chat.id, text=msg) - self.logger.info(f"Разблокирован пользователь с ID: {user_id}") - elif call.data[:4] == 'page': - page_number = int(call.data[5:]) - self.logger.info(f"Переход на страницу {page_number}") - if call.message.text == 'Список пользователей которые последними обращались к боту': - list_users = self.BotDB.get_last_users_from_db() - keyboard = self.create_keyboard_with_pagination(int(page_number), len(list_users), list_users, - 'ban') - self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, - reply_markup=keyboard) - elif "Список заблокированных пользователей".lower() in call.message.text.lower(): - #Готовим сообщения - message_user = self.get_banned_users_list(int(page_number) * 7 - 7) - self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id, - text=message_user) - - #Готовим клавиатуру - buttons = self.get_banned_users_buttons() - keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban') - self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id, - reply_markup=keyboard) - else: - self.logger.warning(f"Неизвестный callback data: {call.data}") - - def start(self): - while True: - try: - print(self.bot.last_update_id) - self.bot.polling(none_stop=True) - except ConnectionError as e: - self.logger.error( - f"Произошла ошибка (потеря коннекта): {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - except Exception as e: - self.logger.error(f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - def unban_notifier(self): - # Получение сегодняшней даты в формате DD-MM-YYYY - current_date = datetime.now() - print('Мы в функции unban_notifier') - today = current_date.strftime("%d-%m-%Y") - # Получение списка разблокированных пользователей - unblocked_users = self.BotDB.get_users_for_unblock_today(today) - message = "Разблокированные пользователи:\n" - for user_id, user_name in unblocked_users.items(): - message += f"ID: {user_id}, Имя: {user_name}\n" - - # Отправка сообщения в канал - self.bot.send_message(self.GROUP_FOR_MESSAGE, message) - - # Черный список - def admin_panel(self, message): - try: - self.logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}") - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Бан (Список)") - #item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю - #item3 = types.KeyboardButton("Удалить админа") - item4 = types.KeyboardButton("Разбан (список)") - item5 = types.KeyboardButton("Вернуться в бота") - markup.add(item1, item4, item5) - self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:", - reply_markup=markup) - self.bot.register_next_step_handler(message, self.handle_admin_message) - except Exception as e: - self.logger.error(f"Ошибка при запуске админ панели: {e}") - self.bot.register_next_step_handler(message, self.admin_panel) - - def handle_admin_message(self, message): - self.logger.info(f"Получено сообщение от админа: {message.text} (пользователь: {message.from_user.id})") - try: - if message.text == "Бан (Список)": - self.get_last_users(message) - elif message.text == "Разбан (список)": - self.get_banned_users(message) - elif message.text == "Вернуться в бота": - self.start_message(message) - else: - self.logger.warning(f"Неизвестное сообщение от админа: {message.text}") - self.bot.reply_to(message, "Неизвестная команда.") - except Exception as e: - self.logger.error(f"Ошибка при обработке сообщения админа: {e}") - self.bot.reply_to(message, f"Ошибка\n\n {e}") - self.admin_panel(message) - - def ban_user(self, message, user_id: int): - self.logger.info( - f"Получена команда от админа на бан пользователя: {message.text} (пользователь: {message.from_user.id})") - user_name = self.BotDB.get_username(user_id=user_id) - ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None} - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Спам") - item2 = types.KeyboardButton("Заебал стикерами") - markup.add(item1, item2) - self.bot.send_message(message.chat.id, - f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат", - reply_markup=markup) - self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object) - - def ban_user_step_2(self, message, ban_object: dict): - self.logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {ban_object})") - ban_object['message_for_user'] = message.text - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("1") - item2 = types.KeyboardButton("7") - item3 = types.KeyboardButton("30") - item4 = types.KeyboardButton("Навсегда") - markup.add(item1, item2, item3, item4) - self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши " - f"его в чат", - reply_markup=markup) - self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object) - - def ban_user_step_3(self, message, ban_object: dict): - self.logger.info(f"Переход на шаг 3 бана пользователя. Словарь с данными для бана: {ban_object})") - date_to_unban = None - if message.text != 'Навсегда': - date_to_unban = self.add_days_to_date(message.text) - else: - pass - ban_object['date_to_unban'] = date_to_unban - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("Подтвердить") - item2 = types.KeyboardButton("Отменить") - markup.add(item1, item2) - self.bot.send_message(message.chat.id, - f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}", - parse_mode='html', - reply_markup=markup) - self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object) - - def ban_user_final_step(self, message, ban_object: dict): - self.logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {ban_object})") - if message.text == 'Подтвердить': - exists = self.BotDB.check_user_in_blacklist(ban_object['user_id']) - if exists: - self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.") - self.logger.info(f"Пользователь: {ban_object['user_id']} был заблокирован ранее)") - self.admin_panel(message) - else: - self.BotDB.set_user_blacklist(ban_object['user_id'], - ban_object['user_name'], - ban_object['message_for_user'], - ban_object['date_to_unban']) - self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.") - self.logger.info(f"Пользователь: {ban_object['user_id']} успешно заблокирован)") - self.admin_panel(message) - - def get_last_users(self, message): - self.logger.info( - f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") - list_users = self.BotDB.get_last_users_from_db() - keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban') - self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту", - reply_markup=keyboard) - - def get_banned_users(self, message): - self.logger.info( - f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})") - message_text = self.get_banned_users_list(0) - buttons_list = self.get_banned_users_buttons() - if buttons_list: - k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban') - self.bot.send_message(message.chat.id, message_text, reply_markup=k) - else: - self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет") - self.admin_panel(message) - - def start_message(self, message): - try: - self.logger.info( - f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name})") - name_stick_hello = list(Path('Stick').rglob('Hello_*')) - random_stick_hello = open(random.choice(name_stick_hello), 'rb') - self.logger.info(f"Стикер успешно получен из БД") - # logging - if self.LOGS: - self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - self.bot.send_sticker(message.chat.id, random_stick_hello) - sleep(0.3) - except Exception as e: - self.logger.error(f"Произошла ошибка при получении стикера. Ошибка: {str(e)}") - if self.LOGS: - self.bot.send_message(self.IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - try: - self.logger.info( - f"Получение данных для приветственного сообщения пользователю. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name})") - user_id = message.from_user.id - first_name = message.from_user.first_name - full_name = message.from_user.full_name - is_bot = message.from_user.is_bot - username = message.from_user.username - language_code = message.from_user.language_code - current_date = datetime.now() - date = current_date.strftime("%Y-%m-%d %H:%M:%S") - if not self.BotDB.user_exists(user_id): - self.BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date, - date) - self.BotDB.update_date_for_user(date, user_id) - markup = self.get_reply_keyboard(message) - hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE') - self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup, - disable_web_page_preview=not self.PREVIEW_LINK) - except Exception as e: - self.logger.error( - f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}") - if self.LOGS: - self.bot.send_message(self.IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - def resend_message_in_group_for_message(self, message): - self.logger.info( - f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.id})") - self.bot.forward_message(chat_id=self.GROUP_FOR_MESSAGE, - from_chat_id=message.chat.id, - message_id=message.message_id - ) - current_date = datetime.now() - date = current_date.strftime("%Y-%m-%d %H:%M:%S") - self.BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date) - question = messages.get_message(self.__get_first_name(message), 'QUESTION') - markup = self.get_reply_keyboard(message) - self.bot.send_message(message.chat.id, question, parse_mode='html', - disable_web_page_preview=not self.PREVIEW_LINK, - reply_markup=markup) - - def suggest_post(self, message): - try: - self.logger.info( - f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.id})") - markup = types.ReplyKeyboardRemove() - suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS') - self.bot.send_message(message.chat.id, suggest_news, parse_mode='html') - sleep(0.3) - suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2') - self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup) - except Exception as e: - self.bot.send_message(self.IMPORTANT_LOGS, - f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - def stickers(self, message): - self.logger.info( - f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - self.BotDB.update_info_about_stickers(user_id=message.from_user.id) - markup = self.get_reply_keyboard(message) - try: - self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - self.bot.send_message(message.chat.id, - text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk', - reply_markup=markup) - except ApiTelegramException as e: - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - self.logger.error( - f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - - def connect_with_admin(self, message): - self.logger.info( - f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN') - self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html") - # logging - if self.LOGS: - self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS, - from_chat_id=message.chat.id, - message_id=message.message_id) - - def end_message(self, message): - try: - self.logger.info( - f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - name_stick_bye = list(Path('Stick').rglob('Universal_*')) - random_stick_bye = open(random.choice(name_stick_bye), 'rb') - self.bot.send_sticker(message.chat.id, random_stick_bye) - except ApiTelegramException as e: - self.logger.error( - f"Ошибка в функции stickers при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - if self.LOGS: - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - markup = types.ReplyKeyboardRemove() - try: - bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE') - self.bot.send_message(message.chat.id, bye_message, - parse_mode='html', reply_markup=markup, - disable_web_page_preview=not self.PREVIEW_LINK) - except Exception as e: - if self.LOGS: - self.logger.error( - f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - #TODO: deprecated - def send_to_suggest_2(self, message): - markup = self._get_reply_keyboard_for_post() - try: - if message.content_type == 'text': - post_text = message.text.lower() - if post_text.find('неанон') != -1 or post_text.find('не анон') != -1: - self.bot.send_message( - # TODO: GROUP_FOR_POST - chat_id=self.GROUP_FOR_POST, - text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - reply_markup=markup - ) - elif post_text.find('анон') != -1: - self.bot.send_message( - # TODO: GROUP_FOR_POST - chat_id=self.GROUP_FOR_POST, - text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно', - reply_markup=markup - ) - else: - self.bot.send_message( - # TODO: GROUP_FOR_POST - chat_id=self.GROUP_FOR_POST, - text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - reply_markup=markup - ) - elif message.content_type == 'photo' and message.media_group_id is None: - post_text_for_photo = message.caption.lower() - if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1: - self.bot.send_photo( - # TODO: GROUP_FOR_POST - chat_id=self.GROUP_FOR_POST, - caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - photo=message.photo[-1].file_id, - reply_markup=markup - ) - elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1: - self.bot.send_photo( - # TODO: GROUP_FOR_POST - chat_id=self.GROUP_FOR_POST, - caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно', - photo=message.photo[-1].file_id, - reply_markup=markup - ) - else: - self.bot.send_photo( - # TODO: GROUP_FOR_POST - chat_id=self.GROUP_FOR_POST, - caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}', - photo=message.photo[-1].file_id, - reply_markup=markup - ) - # TODO: Не понятна реализация с альбомами от слова совсем. 11.07 Реализация понятна. Нужно отправлять альбомы + сообщение в одном сообщении. Следующее сообщение с пробелом и кнопками - # elif message.content_type == 'photo' and message.media_group_id != None: - # bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id ) - else: - pass - except Exception as e: - if self.LOGS: - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - markup_for_user = self.get_reply_keyboard(message) - success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') - self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', - disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user) - - def suggest_router(self, message): - self.logger.info( - f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}") - try: - if message.content_type == 'text': - lower_text = message.text.lower() - post_text, is_anonymous = self._get_text_message(lower_text, message.from_user.full_name, - message.from_user.id) - if is_anonymous: - self._send_text_message(post_text) - else: - self._send_text_message(post_text) - markup_for_user = self.get_reply_keyboard(message) - success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') - self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', - disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user) - elif message.content_type == 'photo' and message.media_group_id is None: - lower_caption = message.caption.lower() - post_caption, is_anonymous = self._get_text_message(lower_caption, message.from_user.full_name, - message.from_user.id) - if is_anonymous: - self._send_photo_message(message.photo[-1].file_id, post_caption) - else: - self._send_photo_message(message.photo[-1].file_id, post_caption) - markup_for_user = self.get_reply_keyboard(message) - success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE') - self.bot.send_message(message.chat.id, success_send_message, parse_mode='html', - disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user) - elif message.media_group_id is not None: - self.bot.send_message(message.chat.id, - 'Я пока не умею работать с несколькими файлами. Пришли текст и не более одного фото') - self.bot.register_next_step_handler(message, self.suggest_router) - else: - self.bot.send_message(message.chat.id, - 'Я пока не умею работать с таким сообщением. Пришли текст и не более одного фото') - self.bot.register_next_step_handler(message, self.suggest_router) - except Exception as e: - if self.LOGS: - self.bot.send_message(chat_id=self.IMPORTANT_LOGS, - text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}") - - @staticmethod - def _get_reply_keyboard_for_post(): - markup = types.InlineKeyboardMarkup(row_width=1) - item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish') - item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline') - markup.add(item1, item2) - return markup - - @staticmethod - def _get_text_message(post_text: str, first_name: str, username: str): - """ - Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон". - - Args: - post_text: Текст сообщения - first_name: Имя автора поста - username: Юзернейм автора поста - - Returns: - Кортеж из двух элементов: - - Сформированный текст сообщения. - - Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный). - """ - if "неанон" in post_text or "не анон" in post_text: - is_anonymous = False - return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous - elif "анон" in post_text: - is_anonymous = True - return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous - else: - is_anonymous = False - return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous - - def _send_text_message(self, post_text: str): - markup = self._get_reply_keyboard_for_post() - self.bot.send_message( - chat_id=self.GROUP_FOR_POST, - text=post_text, - parse_mode='html', - disable_web_page_preview=not self.PREVIEW_LINK, - reply_markup=markup - ) - - def _send_photo_message(self, photo: str, post_text: str): - markup = self._get_reply_keyboard_for_post() - self.bot.send_photo( - chat_id=self.GROUP_FOR_POST, - caption=post_text, - photo=photo, - reply_markup=markup - ) - - def get_reply_keyboard(self, message): - markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True) - item1 = types.KeyboardButton("📢Предложить свой пост") - item2 = types.KeyboardButton("📩Связаться с админами") - item3 = types.KeyboardButton("👋🏼Сказать пока!") - item4 = types.KeyboardButton("🤪Хочу стикеры") if not self.BotDB.get_info_about_stickers( - user_id=message.from_user.id) else None - - if item4: - markup.add(item1, item2, item3, item4) - else: - markup.add(item1, item2, item3) - - return markup - - @staticmethod - def __get_first_name(message): - return message.from_user.first_name - - def check_access(self, user_id: int): - """Проверка прав на совершение действий""" - return self.BotDB.is_admin(user_id) - - @staticmethod - def add_days_to_date(days): - """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" - current_date = datetime.now() - future_date = current_date + timedelta(days=int(days)) - formatted_date = future_date.strftime("%d-%m-%Y") - return formatted_date - - @staticmethod - def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str): - """ - Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback - - Args: - page: Номер текущей страницы. - total_items: Общее количество элементов. - array_items: Лист кортежей. Содержит в себе user_name: user_id - callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) - - Returns: - InlineKeyboardMarkup: Клавиатура с кнопками пагинации. - """ - - # Определяем общее количество страниц - total_pages = (total_items + 7 - 1) // 7 - - page = page - # Создаем список кнопок - buttons = [] - # Вычисляем стартовый номер для текущей страницы - start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы - # Кнопки с номерами страниц - for i in range(start_index, min(start_index + 7, - len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы - buttons.append( - types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}")) - - # Добавляем кнопки "Предыдущая" и "Следующая" - if int(page) > 1: - buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}")) - if page < total_pages: - buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}")) - #Добавляем кнопку назад - buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return")) - # Формируем клавиатуру с 3 кнопками в ряд - keyboard = [] - for i in range(0, len(buttons), 3): - keyboard.append(buttons[i:i + 3]) - return types.InlineKeyboardMarkup(keyboard) - - def get_banned_users_list(self, offset: int): - """ - Возвращает сообщение со списком пользователей и словарь с ником + идентификатором - - Args: - offset: отступ для запроса в базу данных - - Returns: - message - текст сообщения - user_ids - лист кортежей [(user_name: user_id)] - """ - users = self.BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset) - message = "Список заблокированных пользователей:\n" - - for user in users: - message += f"Пользователь: {user[0]}\n" - message += f"Причина бана: {user[2]}\n" - message += f"Дата разбана: {user[3]}\n\n" - return message - - def get_banned_users_buttons(self): - """ - Возвращает сообщение со списком пользователей и словарь с ником + идентификатором - - Args: - offset: отступ для запроса в базу данных - - Returns: - message - текст сообщения - user_ids - лист кортежей [(user_name: user_id)] - """ - users = self.BotDB.get_banned_users_from_db() - user_ids = [] - - for user in users: - user_ids.append((user[0], user[1])) - return user_ids - - def delete_user_blacklist(self, user_id): - return self.BotDB.delete_user_blacklist(user_id=user_id) diff --git a/helper_bot/keyboards/__init__.py b/helper_bot/keyboards/__init__.py new file mode 100644 index 0000000..734fb54 --- /dev/null +++ b/helper_bot/keyboards/__init__.py @@ -0,0 +1 @@ +from .main import get_reply_keyboard_for_post, get_reply_keyboard \ No newline at end of file diff --git a/helper_bot/keyboards/__pycache__/__init__.cpython-312.pyc b/helper_bot/keyboards/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..128ff0f Binary files /dev/null and b/helper_bot/keyboards/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/keyboards/__pycache__/main.cpython-312.pyc b/helper_bot/keyboards/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..b628788 Binary files /dev/null and b/helper_bot/keyboards/__pycache__/main.cpython-312.pyc differ diff --git a/helper_bot/keyboards/main.py b/helper_bot/keyboards/main.py new file mode 100644 index 0000000..41e238a --- /dev/null +++ b/helper_bot/keyboards/main.py @@ -0,0 +1,110 @@ +from aiogram import types +from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder + + +def get_reply_keyboard_for_post(): + builder = InlineKeyboardBuilder() + builder.row(types.InlineKeyboardButton( + text="Опубликовать", callback_data="publish"), + types.InlineKeyboardButton( + text="Отклонить", callback_data="decline") + ) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup + + +def get_reply_keyboard(BotDB, user_id): + builder = ReplyKeyboardBuilder() + builder.add(types.KeyboardButton(text="📢Предложить свой пост")) + builder.add(types.KeyboardButton(text="📩Связаться с админами")) + builder.add(types.KeyboardButton(text="👋🏼Сказать пока!")) + if not BotDB.get_info_about_stickers(user_id=user_id): + builder.add(types.KeyboardButton(text="🤪Хочу стикеры")) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup + + +def get_reply_keyboard_leave_chat(): + builder = ReplyKeyboardBuilder() + builder.add(types.KeyboardButton(text="Выйти из чата")) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup + + +def get_reply_keyboard_admin(): + builder = ReplyKeyboardBuilder() + builder.add(types.KeyboardButton(text="Бан (Список)")) + builder.add(types.KeyboardButton(text="Разбан (список)")) + builder.add(types.KeyboardButton(text="Вернуться в бота")) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup + + +def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str): + """ + Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback + + Args: + page: Номер текущей страницы. + total_items: Общее количество элементов. + array_items: Лист кортежей. Содержит в себе user_name: user_id + callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id}) + + Returns: + InlineKeyboardMarkup: Клавиатура с кнопками пагинации. + """ + + # Определяем общее количество страниц + total_pages = (total_items + 9 - 1) // 9 + + # Создаем билдер для клавиатуры + keyboard = InlineKeyboardBuilder() + # TODO: Тут поправить на 9 объектов, а не 7. Клавиатуру переделал + # Вычисляем стартовый номер для текущей страницы + start_index = (page - 1) * 9 + + # Кнопки с номерами страниц + for i in range(start_index, min(start_index + 9, len(array_items))): + keyboard.add(types.InlineKeyboardButton( + text=f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}" + )) + keyboard.adjust(3) + + next_button = types.InlineKeyboardButton( + text="➡️ Следующая", callback_data=f"page_{page + 1}" + ) + prev_button = types.InlineKeyboardButton( + text="⬅️ Предыдущая", callback_data=f"page_{page - 1}" + ) + keyboard.row(prev_button, next_button) + home_button = types.InlineKeyboardButton( + text="🏠 Назад", callback_data="return") + keyboard.row(home_button) + k = keyboard.as_markup() + return k + + +def create_keyboard_for_ban_reason(): + builder = ReplyKeyboardBuilder() + builder.add(types.KeyboardButton(text="Спам")) + builder.add(types.KeyboardButton(text="Заебал стикерами")) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup + + +def create_keyboard_for_ban_days(): + builder = ReplyKeyboardBuilder() + builder.add(types.KeyboardButton(text="1")) + builder.add(types.KeyboardButton(text="7")) + builder.add(types.KeyboardButton(text="30")) + builder.add(types.KeyboardButton(text="Навсегда")) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup + + +def create_keyboard_for_approve_ban(): + builder = ReplyKeyboardBuilder() + builder.add(types.KeyboardButton(text="Подтвердить")) + builder.add(types.KeyboardButton(text="Отменить")) + markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True) + return markup diff --git a/helper_bot/main.py b/helper_bot/main.py new file mode 100644 index 0000000..4451d4a --- /dev/null +++ b/helper_bot/main.py @@ -0,0 +1,21 @@ +from aiogram import Bot, Dispatcher +from aiogram.client.default import DefaultBotProperties +from aiogram.fsm.storage.memory import MemoryStorage +from aiogram.fsm.strategy import FSMStrategy + +from helper_bot.handlers.admin import admin_router +from helper_bot.handlers.callback import callback_router +from helper_bot.handlers.group import group_router +from helper_bot.handlers.private import private_router + + +async def start_bot(bdf): + token = bdf.settings['Telegram']['bot_token'] + bot = Bot(token=token, default=DefaultBotProperties( + parse_mode='HTML', + link_preview_is_disabled=bdf.settings['Telegram']['preview_link'] + )) + dp = Dispatcher(storage=MemoryStorage(), fsm_strategy=FSMStrategy.GLOBAL_USER) + dp.include_routers(private_router, callback_router, group_router, admin_router) + await bot.delete_webhook(drop_pending_updates=True) + await dp.start_polling(bot, skip_updates=True) diff --git a/helper_bot/middlewares/__init__.py b/helper_bot/middlewares/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/helper_bot/middlewares/__pycache__/__init__.cpython-312.pyc b/helper_bot/middlewares/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..dfd7fe0 Binary files /dev/null and b/helper_bot/middlewares/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/middlewares/__pycache__/text_middleware.cpython-312.pyc b/helper_bot/middlewares/__pycache__/text_middleware.cpython-312.pyc new file mode 100644 index 0000000..f9fecc1 Binary files /dev/null and b/helper_bot/middlewares/__pycache__/text_middleware.cpython-312.pyc differ diff --git a/helper_bot/middlewares/album_middleware.py b/helper_bot/middlewares/album_middleware.py new file mode 100644 index 0000000..b2bc72a --- /dev/null +++ b/helper_bot/middlewares/album_middleware.py @@ -0,0 +1,46 @@ +import asyncio +from collections import defaultdict +from typing import Any, Dict, Union + +from aiogram import BaseMiddleware +from aiogram.types import Message + + +class BulkTextMiddleware(BaseMiddleware): + def __init__(self, latency: Union[int, float] = 0.1): + # Initialize latency and album_data dictionary + self.latency = latency + self.texts = defaultdict(list) + + # + async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any: + """ + Main middleware logic. + """ + # # If the event has no media_group_id, pass it to the handler immediately + key = (event.chat.id, event.from_user.id) + if not event.text: + return await handler(event, data) + + self.texts[key].append(event) + total_before = len(self.texts[key]) + # # Wait for a specified latency period + await asyncio.sleep(self.latency) + # + # # Check the total number of messages after the latency + total_after = len(self.texts[key]) + # + # # If new messages were added during the latency, exit + if total_before != total_after: + return + # + # # Sort the album messages by message_id and add to data + msg_texts = self.texts[key] + msg_texts.sort(key=lambda x: x.message_id) + data["texts"] = ''.join([msg.text for msg in msg_texts]) + # + # Remove the media group from tracking to free up memory + del self.texts[key] + # # Call the original event handler + return await handler(event, data) +# diff --git a/helper_bot/middlewares/text_middleware.py b/helper_bot/middlewares/text_middleware.py new file mode 100644 index 0000000..627d2bd --- /dev/null +++ b/helper_bot/middlewares/text_middleware.py @@ -0,0 +1,61 @@ +import asyncio +from typing import Any, Dict, Union + +from aiogram import BaseMiddleware +from aiogram.types import Message + + +class AlbumMiddleware(BaseMiddleware): + def __init__(self, latency: Union[int, float] = 0.1): + # Initialize latency and album_data dictionary + self.latency = latency + self.album_data = {} + + # + def collect_album_messages(self, event: Message): + """ + Collect messages of the same media group. + """ + # # Check if media_group_id exists in album_data + if event.media_group_id not in self.album_data: + # # Create a new entry for the media group + self.album_data[event.media_group_id] = {"messages": []} + # + # # Append the new message to the media group + self.album_data[event.media_group_id]["messages"].append(event) + # + # # Return the total number of messages in the current media group + return len(self.album_data[event.media_group_id]["messages"]) + + # + async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any: + """ + Main middleware logic. + """ + # # If the event has no media_group_id, pass it to the handler immediately + if not event.media_group_id: + return await handler(event, data) + # + # # Collect messages of the same media group + total_before = self.collect_album_messages(event) + # + # # Wait for a specified latency period + await asyncio.sleep(self.latency) + # + # # Check the total number of messages after the latency + total_after = len(self.album_data[event.media_group_id]["messages"]) + # + # # If new messages were added during the latency, exit + if total_before != total_after: + return + # + # # Sort the album messages by message_id and add to data + album_messages = self.album_data[event.media_group_id]["messages"] + album_messages.sort(key=lambda x: x.message_id) + data["album"] = album_messages + # + # # Remove the media group from tracking to free up memory + del self.album_data[event.media_group_id] + # # Call the original event handler + return await handler(event, data) +# diff --git a/helper_bot/utils/__init__.py b/helper_bot/utils/__init__.py new file mode 100644 index 0000000..b9f7655 --- /dev/null +++ b/helper_bot/utils/__init__.py @@ -0,0 +1 @@ +from .state import StateUser diff --git a/helper_bot/utils/__pycache__/__init__.cpython-312.pyc b/helper_bot/utils/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..d401561 Binary files /dev/null and b/helper_bot/utils/__pycache__/__init__.cpython-312.pyc differ diff --git a/helper_bot/utils/__pycache__/base_dependency_factory.cpython-312.pyc b/helper_bot/utils/__pycache__/base_dependency_factory.cpython-312.pyc new file mode 100644 index 0000000..38de5d5 Binary files /dev/null and b/helper_bot/utils/__pycache__/base_dependency_factory.cpython-312.pyc differ diff --git a/helper_bot/utils/__pycache__/helper_func.cpython-312.pyc b/helper_bot/utils/__pycache__/helper_func.cpython-312.pyc new file mode 100644 index 0000000..8132270 Binary files /dev/null and b/helper_bot/utils/__pycache__/helper_func.cpython-312.pyc differ diff --git a/helper_bot/utils/__pycache__/messages.cpython-312.pyc b/helper_bot/utils/__pycache__/messages.cpython-312.pyc new file mode 100644 index 0000000..fd1fdd4 Binary files /dev/null and b/helper_bot/utils/__pycache__/messages.cpython-312.pyc differ diff --git a/helper_bot/utils/__pycache__/state.cpython-312.pyc b/helper_bot/utils/__pycache__/state.cpython-312.pyc new file mode 100644 index 0000000..8227ac4 Binary files /dev/null and b/helper_bot/utils/__pycache__/state.cpython-312.pyc differ diff --git a/helper_bot/utils/base_dependency_factory.py b/helper_bot/utils/base_dependency_factory.py new file mode 100644 index 0000000..283d00d --- /dev/null +++ b/helper_bot/utils/base_dependency_factory.py @@ -0,0 +1,25 @@ +import configparser +import os +import sys + + +class BaseDependencyFactory: + def __init__(self): + # Загрузка настроек из settings.ini + config_path = os.path.join(sys.path[0], 'settings.ini') + self.config = configparser.ConfigParser() + self.config.read(config_path) + self.settings = {} + for section in self.config.sections(): + self.settings[section] = {} + for key in self.config[section]: + # Преобразование значений в соответствующий тип + if key == 'PREVIEW_LINK': + self.settings[section][key] = self.config.getboolean(section, key) + elif key == 'LOGS' or key == 'TEST': + self.settings[section][key] = self.config.getboolean(section, key) + else: + self.settings[section][key] = self.config.get(section, key) + + def get_settings(self): + return self.settings diff --git a/helper_bot/utils/helper_func.py b/helper_bot/utils/helper_func.py new file mode 100644 index 0000000..deeacfc --- /dev/null +++ b/helper_bot/utils/helper_func.py @@ -0,0 +1,191 @@ +from datetime import datetime, timedelta + +from aiogram import types +from aiogram.types import InputMediaPhoto + +from helper_bot.keyboards import get_reply_keyboard_for_post +from database.db import BotDB + +BotDB = BotDB('database/tg-bot-database') + + +def get_first_name(message: types.Message) -> str: + return message.from_user.first_name + + +def get_text_message(post_text: str, first_name: str, username: str): + """ + Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон". + + Args: + post_text: Текст сообщения + first_name: Имя автора поста + username: Юзернейм автора поста + + Returns: + Кортеж из двух элементов: + - Сформированный текст сообщения. + - Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный). + """ + if "неанон" in post_text or "не анон" in post_text: + is_anonymous = False + return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous + elif "анон" in post_text: + is_anonymous = True + return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous + else: + is_anonymous = False + return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous + + +def process_photo_album(album, post_caption: str = ''): + """ + Создает список InputMediaPhoto для альбома. + + Args: + album: Album объект из Telegram API. + post_caption: Текст подписи к первому фото. + + Returns: + Список InputMediaPhoto. + """ + photo_media = [] + for i, message in enumerate(album): + if i == 0: + photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id, caption=post_caption)) + else: + photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id)) + return photo_media + + +async def send_media_group_message(chat_id: int, message: types.Message, media_group: list[InputMediaPhoto]): + sent_message = await message.bot.send_media_group( + chat_id=chat_id, + media=media_group, + ) + message_id = sent_message[-1].message_id + return message_id + + +async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None): + if markup is None: + sent_message = await message.bot.send_message( + chat_id=chat_id, + text=post_text + ) + message_id = sent_message.message_id + return message_id + else: + sent_message = await message.bot.send_message( + chat_id=chat_id, + text=post_text, + reply_markup=markup + ) + message_id = sent_message.message_id + return message_id + + +async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None): + if markup is None: + await message.bot.send_photo( + chat_id=chat_id, + caption=post_text, + photo=photo + ) + else: + await message.bot.send_photo( + chat_id=chat_id, + caption=post_text, + photo=photo, + reply_markup=markup + ) + + +def check_access(user_id: int): + """Проверка прав на совершение действий""" + return BotDB.is_admin(user_id) + + +def add_days_to_date(days: int): + """Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY.""" + current_date = datetime.now() + future_date = current_date + timedelta(days=days) + formatted_date = future_date.strftime("%d-%m-%Y") + return formatted_date + + +def get_banned_users_list(offset: int): + """ + Возвращает сообщение со списком пользователей и словарь с ником + идентификатором + + Args: + offset: отступ для запроса в базу данных + + Returns: + message - текст сообщения + user_ids - лист кортежей [(user_name: user_id)] + """ + users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset) + message = "Список заблокированных пользователей:\n" + + for user in users: + message += f"Пользователь: {user[0]}\n" + message += f"Причина бана: {user[2]}\n" + message += f"Дата разбана: {user[3]}\n\n" + return message + + +def get_banned_users_buttons(): + """ + Возвращает сообщение со списком пользователей и словарь с ником + идентификатором + + Args: + offset: отступ для запроса в базу данных + + Returns: + message - текст сообщения + user_ids - лист кортежей [(user_name: user_id)] + """ + users = BotDB.get_banned_users_from_db() + user_ids = [] + + for user in users: + user_ids.append((user[0], user[1])) + return user_ids + + +def get_help_message_id(media_group_message_id: int, data: dict) -> int: + """ + Получает идентификатор сообщения помощи по идентификатору сообщения группы. + + Args: + media_group_message_id: Идентификатор сообщения группы + data: Словарь с данными. + + Returns: + Идентификатор сообщения помощи. + """ + + if 'help_message_id' in data and 'media_group_message_id' in data: + return data['media_group_message_id'] + else: + return 0 + + +def delete_user_blacklist(user_id: int): + return BotDB.delete_user_blacklist(user_id=user_id) + + +def unban_notifier(self): + # Получение сегодняшней даты в формате DD-MM-YYYY + current_date = datetime.now() + print('Мы в функции unban_notifier') + today = current_date.strftime("%d-%m-%Y") + # Получение списка разблокированных пользователей + unblocked_users = self.BotDB.get_users_for_unblock_today(today) + message = "Разблокированные пользователи:\n" + for user_id, user_name in unblocked_users.items(): + message += f"ID: {user_id}, Имя: {user_name}\n" + + # Отправка сообщения в канал + self.bot.send_message(self.GROUP_FOR_MESSAGE, message) \ No newline at end of file diff --git a/messages.py b/helper_bot/utils/messages.py similarity index 100% rename from messages.py rename to helper_bot/utils/messages.py diff --git a/helper_bot/utils/state.py b/helper_bot/utils/state.py new file mode 100644 index 0000000..6d4e320 --- /dev/null +++ b/helper_bot/utils/state.py @@ -0,0 +1,13 @@ +from aiogram.fsm.state import StatesGroup, State + + +class StateUser(StatesGroup): + START = State() + SUGGEST = State() + ADMIN = State() + CHAT = State() + PRE_CHAT = State() + BAN_2 = State() + BAN_3 = State() + BAN_4 = State() + BAN_FINAL = State() diff --git a/main.py b/main.py deleted file mode 100644 index 24eba19..0000000 --- a/main.py +++ /dev/null @@ -1,50 +0,0 @@ -import configparser -import os -import sys -from database.db import BotDB -from helper_bot.helper_bot import TelegramHelperBot -from logs.custom_logger import Logger - - -#TODO: Добавить проверку можно ли отвечать пользователю? Сейчас если у него скрыто лс, ему похоже не приходят сообщения -#TODO Подумать над реализацией функционала с поступлениями в колледжи -#TODO: Покрыть все логированием и ошибками корректными. Ерроры кидать в чат. -#TODO: Покрыть все тестами - - -class BaseDependencyFactory: - def __init__(self): - # Загрузка настроек из settings.ini - self.logger = Logger('main') - config_path = os.path.join(sys.path[0], 'settings.ini') - self.config = configparser.ConfigParser() - self.config.read(config_path) - self.BotDB = BotDB('database/tg-bot-database') - - self.settings = {} - for section in self.config.sections(): - self.settings[section] = {} - for key in self.config[section]: - # Преобразование значений в соответствующий тип - if key == 'PREVIEW_LINK': - self.settings[section][key] = self.config.getboolean(section, key) - elif key == 'LOGS' or key == 'TEST': - self.settings[section][key] = self.config.getboolean(section, key) - else: - self.settings[section][key] = self.config.get(section, key) - - def get_settings(self): - return self.settings - - def get_database(self): - return self.BotDB - - -if __name__ == "__main__": - # Запускаем тг бота - bot = TelegramHelperBot(BaseDependencyFactory()) - bot.start() - - #scheduler = BackgroundScheduler() - #scheduler.add_job(bot.unban_notifier(), 'cron', hour=22, minute=9) - #scheduler.start() diff --git a/requirements.txt b/requirements.txt index 0715972..e07318e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ APScheduler==3.10.4 -certifi==2024.7.4 +certifi~=2024.6.2 charset-normalizer==3.3.2 coverage==7.5.4 exceptiongroup==1.2.1 @@ -8,11 +8,15 @@ iniconfig==2.0.0 loguru==0.7.2 packaging==24.1 pluggy==1.5.0 -pyTelegramBotAPI==4.20.0 pytest==8.2.2 pytz==2024.1 requests==2.32.3 six==1.16.0 tomli==2.0.1 tzlocal==5.2 -urllib3==2.2.2 +urllib3~=2.2.1 +pip~=23.2.1 +attrs~=23.2.0 +typing_extensions~=4.12.2 +aiohttp~=3.9.5 +aiogram~=3.10.0 \ No newline at end of file diff --git a/run_helper.py b/run_helper.py new file mode 100644 index 0000000..ad7b9df --- /dev/null +++ b/run_helper.py @@ -0,0 +1,7 @@ +import asyncio +from helper_bot.main import start_bot +from helper_bot.utils.base_dependency_factory import BaseDependencyFactory + + +if __name__ == '__main__': + asyncio.run(start_bot(BaseDependencyFactory()))