migrate to aiogram 3.10.0

This commit is contained in:
KatykhinAA
2024-07-14 21:22:33 +03:00
parent 24ac638433
commit 1a0344d0e8
47 changed files with 1145 additions and 846 deletions

1
.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
/database/tg-bot-database

BIN
Stick/Hello_11.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

BIN
Stick/Hello_12.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

BIN
Stick/Hello_13.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 23 KiB

BIN
Stick/Universal_11.webp Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

0
helper_bot/__init__.py Normal file
View File

View File

@@ -0,0 +1,15 @@
from typing import Union
from aiogram.filters import BaseFilter
from aiogram.types import Message
class ChatTypeFilter(BaseFilter): # [1]
def __init__(self, chat_type: Union[str, list]): # [2]
self.chat_type = chat_type
async def __call__(self, message: Message) -> bool: # [3]
if isinstance(self.chat_type, str):
return message.chat.type == self.chat_type
else:
return message.chat.type in self.chat_type

View File

@@ -0,0 +1 @@
from .main import admin_router

View File

@@ -0,0 +1,143 @@
import traceback
from aiogram import Router, types, F
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.main import get_reply_keyboard_admin, create_keyboard_with_pagination, \
create_keyboard_for_ban_days, create_keyboard_for_approve_ban
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import check_access, add_days_to_date, get_banned_users_buttons, get_banned_users_list
from logs.custom_logger import Logger
from database.db import BotDB
admin_router = Router()
#Инициализируем логгер
admin_logger = Logger(name='admin_handler')
logger = admin_logger.get_logger()
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database')
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
Command('admin')
)
async def admin_panel(message: types.Message, state: FSMContext):
try:
if check_access(message.from_user.id):
await state.set_state("ADMIN")
logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
markup = get_reply_keyboard_admin()
await message.answer("Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
else:
await message.answer('Доступ запрещен, досвидания!')
except Exception as e:
logger.error(f"Ошибка при запуске админ панели: {e}")
await message.bot.send_message(IMPORTANT_LOGS,
f'Ошибка в функции admin_panel {e}. Traceback: {traceback.format_exc()}')
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("ADMIN"),
F.text == 'Бан (Список)'
)
async def get_last_users(message: types.Message):
logger.info(
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
list_users = BotDB.get_last_users_from_db()
keyboard = create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
await message.answer(text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard)
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("ADMIN"),
F.text == 'Разбан (список)'
)
async def get_banned_users(message):
logger.info(
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
message_text = get_banned_users_list(0)
buttons_list = get_banned_users_buttons()
if buttons_list:
k = create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unlock')
await message.answer(text=message_text, reply_markup=k)
else:
await message.answer(text="В списке забанненых пользователей никого нет")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_2")
)
async def ban_user_step_2(message: types.Message, state: FSMContext):
user_data = await state.get_data()
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
await state.update_data(message_for_user=message.text)
markup = create_keyboard_for_ban_days()
await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
f"его в чат", reply_markup=markup)
await state.set_state("BAN_3")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_3")
)
async def ban_user_step_3(message: types.Message, state: FSMContext):
logger.info(f"ban_user_step_3. Расчет даты разбана. Входные данные {message.text}")
if message.text != 'Навсегда':
count_days = int(message.text)
date_to_unban = add_days_to_date(count_days)
else:
date_to_unban = None
logger.info(f"ban_user_step_3. Расчет даты разбана. date_to_unban: {date_to_unban}")
await state.update_data(date_to_unban=date_to_unban)
user_data = await state.get_data()
markup = create_keyboard_for_approve_ban()
await message.answer(
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
reply_markup=markup)
await state.set_state("BAN_FINAL")
@admin_router.message(
ChatTypeFilter(chat_type=["private"]),
StateFilter("BAN_FINAL")
)
async def approve_ban(message: types.Message, state: FSMContext):
user_data = await state.get_data()
logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {user_data})")
if message.text == 'Подтвердить':
exists = BotDB.check_user_in_blacklist(user_data['user_id'])
if exists:
await message.reply(f"Пользователь уже был заблокирован ранее.")
logger.info(f"Пользователь: {user_data['user_id']} был заблокирован ранее)")
await state.set_state('ADMIN')
else:
BotDB.set_user_blacklist(user_data['user_id'],
user_data['user_name'],
user_data['message_for_user'],
user_data['date_to_unban'])
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()
await message.answer('Вернулись в меню', reply_markup=markup)

View File

@@ -0,0 +1 @@
from .main import callback_router

View File

@@ -0,0 +1,164 @@
import traceback
from aiogram import Router, F, types
from aiogram.fsm.context import FSMContext
from aiogram.types import CallbackQuery
from database.db import BotDB
from helper_bot.keyboards.main import create_keyboard_with_pagination, get_reply_keyboard_admin, \
create_keyboard_for_ban_reason
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import send_text_message, send_photo_message, get_banned_users_list, \
get_banned_users_buttons, delete_user_blacklist, get_help_message_id, send_media_group_message
from logs.custom_logger import Logger
callback_router = Router()
#Инициализируем логгер
callback_logger = Logger(name='callback_logger')
logger = callback_logger.get_logger()
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database')
@callback_router.callback_query(
F.data == "publish"
)
async def post_for_group(call: CallbackQuery, state: FSMContext):
logger.info(
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
if call.data == 'publish' and call.message.content_type == 'text' and call.message.text != "^":
try:
await send_text_message(MAIN_PUBLIC, call.message, call.message.text)
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
logger.info(f'Текст сообщения опубликован в канале {MAIN_PUBLIC}.')
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
except Exception as e:
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при публикации текста в канал {MAIN_PUBLIC}: {str(e)}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
elif call.data == 'publish' and call.message.content_type == 'photo':
try:
print(f'CALLMESSAGE - {call.message.text}')
await send_photo_message(MAIN_PUBLIC, call.message, call.message.photo[-1].file_id, call.message.caption)
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
logger.info(f'Пост с фото опубликован в канале {MAIN_PUBLIC}.')
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
except Exception as e:
await call.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при публикации фотографии в канал {MAIN_PUBLIC}: {str(e)}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
elif call.data == 'publish' and call.message.text == "^":
user_data = await state.get_data()
media_group_message_id = get_help_message_id(call.message.message_id, user_data)
await call.bot.copy_message(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST,message_id=media_group_message_id, reply_markup=None)
#await call.bot.copy_messages(chat_id=MAIN_PUBLIC, from_chat_id=GROUP_FOR_POST, message_ids=[media_group_message_id, media_group_message_id-1])
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
print(user_data['media_group_message_id'])
print(user_data['help_message_id'])
await call.answer(text='Выложено!', show_alert=True, cache_time=3)
@callback_router.callback_query(
F.data == "decline"
)
async def decline_post_for_group(call: CallbackQuery, state: FSMContext):
logger.info(
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
try:
if call.message.content_type == 'text' and call.message.text != "^":
await call.bot.delete_message(chat_id=GROUP_FOR_POST, message_id=call.message.message_id)
logger.info(
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
await call.answer(text='Отклонено!', show_alert=True, cache_time=3)
if call.message.text == '^':
user_data = await state.get_data()
media_group_message_id = get_help_message_id(call.message.message_id, user_data)
await call.bot.delete_message(chat_id=MAIN_PUBLIC, message_id=media_group_message_id)
except Exception as e:
await call.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(f'Ошибка при удалении сообщения в группе {GROUP_FOR_POST}: {str(e)}')
await call.answer(text='Что-то пошло не так!', show_alert=True, cache_time=3)
@callback_router.callback_query(
F.data.contains('ban')
)
async def process_ban_user(call: CallbackQuery, state: FSMContext):
user_id = call.data[4:]
logger.info(
f"Вызов функции process_ban_user. Данные callback: {call.data} пользователь: {user_id}")
user_name = BotDB.get_username(user_id=user_id)
if user_name:
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
date_to_unban=None)
markup = create_keyboard_for_ban_reason()
await call.message.answer(
text=f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
else:
markup = get_reply_keyboard_admin()
await call.message.answer(text='Пользователь с таким ID не найден в базе', markup=markup)
await state.set_state('ADMIN')
@callback_router.callback_query(
F.data.contains('unlock')
)
async def process_unlock_user(call: CallbackQuery):
user_id = call.data[7:]
delete_user_blacklist(user_id)
logger.info(f"Разблокирован пользователь с ID: {user_id}")
username = BotDB.get_username(user_id)
await call.answer(f'Пользователь разблокирован {username}', show_alert=True)
@callback_router.callback_query(
F.data == 'return'
)
async def return_to_main_menu(call: CallbackQuery):
await call.message.delete()
logger.info(f"Запуск админ панели для пользователя: {call.message.from_user.id}")
markup = get_reply_keyboard_admin()
await call.message.answer("Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
@callback_router.callback_query(
F.data.contains('page')
)
async def change_page(call: CallbackQuery):
page_number = int(call.data[5:])
logger.info(f"Переход на страницу {page_number}")
if call.message.text == 'Список пользователей которые последними обращались к боту':
list_users = BotDB.get_last_users_from_db()
#TODO: Здесь где-то надо добавить обработку ошибки IndexError: list index out of range
keyboard = create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
'ban')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
reply_markup=keyboard)
else:
#Готовим сообщения
message_user = get_banned_users_list(int(page_number) * 7 - 7)
await call.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=message_user)
#Готовим клавиатуру
buttons = get_banned_users_buttons()
keyboard = create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unlock')
await call.bot.edit_message_reply_markup(chat_id=call.message.chat.id, message_id=call.message.message_id,
reply_markup=keyboard)

View File

@@ -0,0 +1 @@
from .main import group_router

View File

@@ -0,0 +1,54 @@
from aiogram import Router, types
from aiogram.fsm.context import FSMContext
from database.db import BotDB
from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import send_text_message
from logs.custom_logger import Logger
group_router = Router()
#Инициализируем логгер
group_logger = Logger(name='group_logger')
logger = group_logger.get_logger()
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database')
@group_router.message(
ChatTypeFilter(chat_type=["group", "supergroup"]),
)
async def handle_message(message: types.Message, state: FSMContext):
"""Функция ответа админа пользователю через закрытый чат"""
logger.info(
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
markup = get_reply_keyboard_leave_chat()
message_id = 0
try:
message_id = message.reply_to_message.message_id
except AttributeError as e:
await message.answer('Блять, выдели сообщение!')
logger.warning(
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа. Ошибка {str(e)}')
message_from_admin = message.text
try:
chat_id = BotDB.get_user_by_message_id(message_id)
await send_text_message(chat_id, message, message_from_admin, markup)
await state.set_state("CHAT")
logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id} на сообщение {message_id}')
except TypeError as e:
await message.answer('Не могу найти кому ответить в базе, проебали сообщение.')
logger.error(
f'Ошибка при поиске пользователя в базе для ответа на сообщение: {message.text} в группе {message.chat.title} (ID сообщения: {message.message_id}) Ошибка: {str(e)}')

View File

@@ -0,0 +1 @@
from .main import private_router

View File

@@ -0,0 +1,281 @@
import random
import traceback
from datetime import datetime
from pathlib import Path
from time import sleep
from aiogram import types, Router, F
from aiogram.filters import Command, StateFilter
from aiogram.fsm.context import FSMContext
from aiogram.types import FSInputFile
from helper_bot.filters.main import ChatTypeFilter
from helper_bot.keyboards import get_reply_keyboard, get_reply_keyboard_for_post
from helper_bot.keyboards.main import get_reply_keyboard_leave_chat
from helper_bot.middlewares.text_middleware import AlbumMiddleware
from helper_bot.utils import messages
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from helper_bot.utils.helper_func import get_first_name, get_text_message, send_text_message, send_photo_message, \
process_photo_album, send_media_group_message
from logs.custom_logger import Logger
from database.db import BotDB
private_router = Router()
private_router.message.middleware(AlbumMiddleware())
#Инициализируем логгер
private_logger = Logger(name='private_handler')
logger = private_logger.get_logger()
bdf = BaseDependencyFactory()
GROUP_FOR_POST = bdf.settings['Telegram']['group_for_posts']
GROUP_FOR_MESSAGE = bdf.settings['Telegram']['group_for_message']
MAIN_PUBLIC = bdf.settings['Telegram']['main_public']
GROUP_FOR_LOGS = bdf.settings['Telegram']['group_for_logs']
IMPORTANT_LOGS = bdf.settings['Telegram']['important_logs']
PREVIEW_LINK = bdf.settings['Telegram']['preview_link']
LOGS = bdf.settings['Settings']['logs']
TEST = bdf.settings['Settings']['test']
BotDB = BotDB('database/tg-bot-database')
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
Command("start")
)
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == 'Вернуться в бота'
)
async def handle_start_message(message: types.Message, state: FSMContext):
try:
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("START")
logger.info(
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} "
f"Имя автора сообщения: {message.from_user.full_name})")
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = random.choice(name_stick_hello)
random_stick_hello = FSInputFile(path=random_stick_hello)
logger.info(f"Стикер успешно получен из БД")
await message.answer_sticker(random_stick_hello)
sleep(0.3)
except Exception as e:
logger.error(f"Произошла ошибка handle_start_message. Ошибка:{str(e)}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
current_state = await state.get_state()
logger.info(
f"Получение данных для приветственного сообщения пользователю. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} State - {current_state}")
user_id = message.from_user.id
first_name = message.from_user.first_name
full_name = message.from_user.full_name
is_bot = message.from_user.is_bot
username = message.from_user.username
language_code = message.from_user.language_code
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not BotDB.user_exists(user_id):
BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
date)
BotDB.update_date_for_user(date, user_id)
markup = get_reply_keyboard(BotDB, message.from_user.id)
hello_message = messages.get_message(get_first_name(message), 'HELLO_MESSAGE')
await message.answer(hello_message, reply_markup=markup)
except Exception as e:
logger.error(
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
await message.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
F.text == '📢Предложить свой пост'
)
async def suggest_post(message: types.Message, state: FSMContext):
try:
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("SUGGEST")
current_state = await state.get_state()
logger.info(
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
await message.answer(suggest_news)
sleep(0.3)
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
await message.answer(suggest_news_2, reply_markup=markup)
except Exception as e:
await message.bot.send_message(IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '👋🏼Сказать пока!'
)
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == 'Выйти из чата'
)
async def end_message(message: types.Message, state: FSMContext):
try:
logger.info(
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = random.choice(name_stick_bye)
random_stick_bye = FSInputFile(path=random_stick_bye)
await message.answer_sticker(random_stick_bye)
except Exception as e:
logger.error(
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
markup = types.ReplyKeyboardRemove()
bye_message = messages.get_message(get_first_name(message), 'BYE_MESSAGE')
await message.answer(bye_message, reply_markup=markup)
await state.set_state("START")
except Exception as e:
logger.error(
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message(
StateFilter("SUGGEST"),
ChatTypeFilter(chat_type=["private"]),
)
async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
logger.info(
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
try:
if message.content_type == 'text':
lower_text = message.text.lower()
post_text, is_anonymous = get_text_message(lower_text, message.from_user.full_name,
message.from_user.username)
markup = get_reply_keyboard_for_post()
if is_anonymous:
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
else:
await send_text_message(GROUP_FOR_POST, message, post_text, markup)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.content_type == 'photo' and message.media_group_id is None:
lower_caption = message.caption.lower()
markup = get_reply_keyboard_for_post()
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
message.from_user.username)
#TODO: тут какая-то шляпа
if is_anonymous:
await send_photo_message(GROUP_FOR_POST, message,
message.photo[-1].file_id, post_caption, markup)
else:
await send_photo_message(GROUP_FOR_POST, message,
message.photo[-1].file_id, post_caption, markup)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
elif message.media_group_id is not None:
post_caption = " "
if album[0].caption:
lower_caption = album[0].caption.lower()
post_caption, is_anonymous = get_text_message(lower_caption, message.from_user.full_name,
message.from_user.username)
media_group = process_photo_album(album, post_caption)
media_group_message_id = await send_media_group_message(GROUP_FOR_POST, message,
media_group)
sleep(0.2)
markup = get_reply_keyboard_for_post()
help_message_id = await send_text_message(GROUP_FOR_POST, message, "^", markup)
await state.update_data(media_group_message_id=media_group_message_id, help_message_id=help_message_id)
markup_for_user = get_reply_keyboard(BotDB, message.from_user.id)
success_send_message = messages.get_message(get_first_name(message), 'SUCCESS_SEND_MESSAGE')
await message.answer(success_send_message, reply_markup=markup_for_user)
await state.set_state("START")
else:
await message.bot.send_message(message.chat.id,
'Я пока не умею работать с таким сообщением. Пришли текст и фото/фоты(ы)')
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
F.text == '🤪Хочу стикеры'
)
async def stickers(message: types.Message, state: FSMContext):
logger.info(
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
markup = get_reply_keyboard(BotDB, message.from_user.id)
try:
BotDB.update_info_about_stickers(user_id=message.from_user.id)
await message.forward(chat_id=GROUP_FOR_LOGS)
await message.answer(text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
await state.set_state("START")
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
@private_router.message(
StateFilter("START"),
ChatTypeFilter(chat_type=["private"]),
F.text == '📩Связаться с админами'
)
async def connect_with_admin(message: types.Message, state: FSMContext):
logger.info(
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
admin_message = messages.get_message(get_first_name(message), 'CONNECT_WITH_ADMIN')
await message.answer(admin_message, parse_mode="html")
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("PRE_CHAT")
@private_router.message(
StateFilter("PRE_CHAT"),
ChatTypeFilter(chat_type=["private"]),
)
@private_router.message(
StateFilter("CHAT"),
ChatTypeFilter(chat_type=["private"]),
)
async def resend_message_in_group_for_message(message: types.Message, state: FSMContext):
logger.info(
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
await message.forward(chat_id=GROUP_FOR_MESSAGE)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(get_first_name(message), 'QUESTION')
user_state = await state.get_state()
if user_state == "PRE_CHAT":
markup = get_reply_keyboard(BotDB, message.from_user.id)
await message.answer(question, reply_markup=markup)
await state.set_state("START")
elif user_state == "CHAT":
markup = get_reply_keyboard_leave_chat()
await message.answer(question, reply_markup=markup)
# @private_router.message(
# ChatTypeFilter(chat_type=["private"])
# )
# async def default(message: types.Message, state: FSMContext):
# markup = get_reply_keyboard(BotDB, message.from_user.id)
# await message.answer('Кажется ты заблудился. Держи клавиатуру, твое состояние сброшено на начало', reply_markup=markup)
# await state.set_state("START")

View File

@@ -1,793 +0,0 @@
from datetime import datetime, timedelta
import random
import traceback
from enum import Enum
from pathlib import Path
from time import sleep
import telebot
from telebot import types
from telebot.apihelper import ApiTelegramException
from telebot.types import InputMediaPhoto
import messages
from logs.custom_logger import Logger
#Инициализируем логгер
bot_logger = Logger(name='bot')
class State(Enum):
START = "START"
SUGGEST = "SUGGEST"
ADMIN = "ADMIN"
CHAT = "CHAT"
PRE_CHAT = "PRE_CHAT"
class TelegramHelperBot:
def __init__(self, dependency_factory):
self.state = State.START
self.BotDB = dependency_factory.get_database()
self.settings = dependency_factory.get_settings()
token = self.settings['Telegram']['bot_token']
self.GROUP_FOR_POST = self.settings['Telegram']['group_for_posts']
self.GROUP_FOR_MESSAGE = self.settings['Telegram']['group_for_message']
self.MAIN_PUBLIC = self.settings['Telegram']['main_public']
self.GROUP_FOR_LOGS = self.settings['Telegram']['group_for_logs']
self.IMPORTANT_LOGS = self.settings['Telegram']['important_logs']
self.PREVIEW_LINK = self.settings['Telegram']['preview_link']
self.LOGS = self.settings['Settings']['logs']
self.TEST = self.settings['Settings']['test']
self.bot = telebot.TeleBot(token)
self.logger = bot_logger.get_logger()
# Router for user
@self.bot.message_handler(func=lambda message: True, chat_types=['private'])
def handle_message(message):
self.logger.info(
f'Получено сообщение: {message.text} от пользователя: {message.from_user.full_name} id юзера: {message.chat.id}')
if self.BotDB.check_user_in_blacklist(message.from_user.id):
attribute = self.BotDB.get_blacklist_users_by_id(message.from_user.id)
self.bot.send_message(message.chat.id,
f'<b>Ты заблокирован\nПричина блокировки:</b> {attribute[2]}\n<b>Дата разблокировки:</b> {attribute[3]}',
parse_mode='HTML')
self.logger.info(f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) заблокирован')
return
if self.state == State.START:
if message.text == '/start':
self.start_message(message)
elif message.text == '📢Предложить свой пост':
self.suggest_post(message)
self.state = State.SUGGEST
elif message.text == '🤪Хочу стикеры':
self.stickers(message)
self.state = State.START
elif message.text == '📩Связаться с админами':
self.connect_with_admin(message)
self.state = State.PRE_CHAT
elif message.text == '👋🏼Сказать пока!':
self.end_message(message)
self.state = State.START
elif message.text == 'Выйти из чата':
self.end_message(message)
elif message.text == '/admin':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
self.state = State.ADMIN
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вошел в админ-панель')
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
self.logger.info(
f'Пользователю {message.from_user.full_name} (ID: {message.chat.id}) отказано в доступе к админ-панели')
elif message.text == '/state':
self.bot.send_message(message.chat.id,
f'Твой state == {self.state.value}')
else:
self.bot.send_message(message.chat.id,
#TODO: Здесь раньше был /state
"Не понимаю где ты находишься. Нажми /start, и я перезапущусь")
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) отправил непонятное сообщение: {message.text}')
if self.state == State.SUGGEST:
self.bot.register_next_step_handler(message, self.suggest_router)
self.state = State.START
if message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
if self.state == State.PRE_CHAT:
self.bot.register_next_step_handler(message, self.resend_message_in_group_for_message)
self.state = State.START
if message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
if self.state == State.CHAT:
if message.text == 'Выйти из чата':
self.state = State.START
self.end_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вышел из чата')
elif message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
else:
self.resend_message_in_group_for_message(message)
if self.state == State.ADMIN:
if message == '/admin' or message == '/restart' or message == 'Вернуться в админку':
access = self.check_access(message.from_user.id)
if access:
self.admin_panel(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вошел в админ-панель')
else:
self.bot.send_message(message.chat.id, 'Доступ запрещен, досвидания!')
self.logger.info(
f'Пользователю {message.from_user.full_name} (ID: {message.chat.id}) отказано в доступе к админ-панели')
if message.text == '/start':
self.state = State.START
self.start_message(message)
self.logger.info(
f'Пользователь {message.from_user.full_name} (ID: {message.chat.id}) вернулся в главное меню')
@self.bot.message_handler(func=lambda message: True, chat_types=['group'])
def handle_message(message):
"""Функция ответа админа пользователю через закрытый чат"""
self.logger.info(
f'Получено сообщение в группе {message.chat.title} (ID: {message.chat.id}) от пользователя {message.from_user.full_name} (ID: {message.from_user.id}): "{message.text}"')
self.state = State.CHAT
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Выйти из чата")
markup.add(item1)
message_id = 0
try:
message_id = message.reply_to_message.id
except AttributeError:
self.bot.send_message(message.chat.id, f'Блять, выдели сообщение!')
self.logger.warning(
f'В группе {message.chat.title} (ID: {message.chat.id}) админ не выделил сообщение для ответа.')
message_from_admin = message.text
try:
chat_id = self.BotDB.get_user_by_message_id(message_id)
self.bot.send_message(chat_id, message_from_admin, reply_markup=markup)
self.logger.info(f'Ответ админа "{message.text}" отправлен пользователю с ID: {chat_id}.')
except TypeError:
self.bot.send_message(message.chat.id, f'Не могу найти кому ответить в базе, проебали сообщение.')
self.logger.error(
f'Ошибка при поиске пользователя в базе для ответа: {message.text} в группе {message.chat.title} (ID: {message.chat.id})')
# Админка
@self.bot.callback_query_handler(func=lambda call: call.data in ['publish', 'decline'])
def post_for_group(call):
self.logger.info(
f'Получен callback-запрос с данными: {call.data} от пользователя {call.from_user.full_name} (ID: {call.from_user.id})')
if call.data == 'publish' and call.message.content_type == 'text':
try:
self.bot.send_message(chat_id=self.MAIN_PUBLIC, text=call.message.text)
self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id)
self.logger.info(f'Текст сообщения опубликован в канале {self.MAIN_PUBLIC}.')
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(f'Ошибка при публикации текста в канал {self.MAIN_PUBLIC}: {str(e)}')
elif call.data == 'publish' and call.message.content_type == 'photo':
try:
self.bot.send_photo(
chat_id=self.MAIN_PUBLIC,
caption=call.message.caption,
photo=call.message.photo[-1].file_id,
)
self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id)
self.logger.info(f'Пост с фото опубликован в канале {self.MAIN_PUBLIC}.')
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(f'Ошибка при публикации фотографии в канал {self.MAIN_PUBLIC}: {str(e)}')
elif call.data == 'decline':
try:
self.bot.delete_message(chat_id=self.GROUP_FOR_POST, message_id=call.message.message_id)
self.logger.info(
f'Сообщение отклонено админом {call.from_user.full_name} (ID: {call.from_user.id}).')
except Exception as e:
if self.LOGS:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(f'Ошибка при удалении сообщения в группе {self.GROUP_FOR_POST}: {str(e)}')
@self.bot.callback_query_handler(func=lambda call: True)
def pagination(call):
if call.data[:3] == 'ban':
user_id = call.data[4:]
self.logger.info(f"Бан пользователя с ID: {user_id}")
self.ban_user(call.message, user_id)
if call.data == 'return':
self.logger.info(f"Возврат в админ панель")
self.bot.delete_message(call.message.chat.id, call.message.message_id)
self.admin_panel(call.message)
if call.data[:5] == 'unban':
user_id = call.data[6:]
self.delete_user_blacklist(user_id)
msg = f'Успешно удалено.'
self.bot.send_message(chat_id=call.message.chat.id, text=msg)
self.logger.info(f"Разблокирован пользователь с ID: {user_id}")
elif call.data[:4] == 'page':
page_number = int(call.data[5:])
self.logger.info(f"Переход на страницу {page_number}")
if call.message.text == 'Список пользователей которые последними обращались к боту':
list_users = self.BotDB.get_last_users_from_db()
keyboard = self.create_keyboard_with_pagination(int(page_number), len(list_users), list_users,
'ban')
self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
reply_markup=keyboard)
elif "Список заблокированных пользователей".lower() in call.message.text.lower():
#Готовим сообщения
message_user = self.get_banned_users_list(int(page_number) * 7 - 7)
self.bot.edit_message_text(chat_id=call.message.chat.id, message_id=call.message.message_id,
text=message_user)
#Готовим клавиатуру
buttons = self.get_banned_users_buttons()
keyboard = self.create_keyboard_with_pagination(int(call.data[5:]), len(buttons), buttons, 'unban')
self.bot.edit_message_reply_markup(call.message.chat.id, call.message.message_id,
reply_markup=keyboard)
else:
self.logger.warning(f"Неизвестный callback data: {call.data}")
def start(self):
while True:
try:
print(self.bot.last_update_id)
self.bot.polling(none_stop=True)
except ConnectionError as e:
self.logger.error(
f"Произошла ошибка (потеря коннекта): {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
except Exception as e:
self.logger.error(f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now()
print('Мы в функции unban_notifier')
today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)
# Черный список
def admin_panel(self, message):
try:
self.logger.info(f"Запуск админ панели для пользователя: {message.from_user.id}")
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Бан (Список)")
#item2 = types.KeyboardButton("Добавить админа") #TODO: Когда-нибудь потом доделаю
#item3 = types.KeyboardButton("Удалить админа")
item4 = types.KeyboardButton("Разбан (список)")
item5 = types.KeyboardButton("Вернуться в бота")
markup.add(item1, item4, item5)
self.bot.send_message(message.chat.id, "Добро пожаловать в админку. Выбери что хочешь:",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.handle_admin_message)
except Exception as e:
self.logger.error(f"Ошибка при запуске админ панели: {e}")
self.bot.register_next_step_handler(message, self.admin_panel)
def handle_admin_message(self, message):
self.logger.info(f"Получено сообщение от админа: {message.text} (пользователь: {message.from_user.id})")
try:
if message.text == "Бан (Список)":
self.get_last_users(message)
elif message.text == "Разбан (список)":
self.get_banned_users(message)
elif message.text == "Вернуться в бота":
self.start_message(message)
else:
self.logger.warning(f"Неизвестное сообщение от админа: {message.text}")
self.bot.reply_to(message, "Неизвестная команда.")
except Exception as e:
self.logger.error(f"Ошибка при обработке сообщения админа: {e}")
self.bot.reply_to(message, f"Ошибка\n\n {e}")
self.admin_panel(message)
def ban_user(self, message, user_id: int):
self.logger.info(
f"Получена команда от админа на бан пользователя: {message.text} (пользователь: {message.from_user.id})")
user_name = self.BotDB.get_username(user_id=user_id)
ban_object = {'user_id': user_id, 'user_name': user_name, 'message_for_user': None, 'date_to_unban': None}
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Спам")
item2 = types.KeyboardButton("Заебал стикерами")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Выбран пользователь: {user_id}. Выбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_2, ban_object)
def ban_user_step_2(self, message, ban_object: dict):
self.logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {ban_object})")
ban_object['message_for_user'] = message.text
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("1")
item2 = types.KeyboardButton("7")
item3 = types.KeyboardButton("30")
item4 = types.KeyboardButton("Навсегда")
markup.add(item1, item2, item3, item4)
self.bot.send_message(message.chat.id, f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
f"его в чат",
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_step_3, ban_object)
def ban_user_step_3(self, message, ban_object: dict):
self.logger.info(f"Переход на шаг 3 бана пользователя. Словарь с данными для бана: {ban_object})")
date_to_unban = None
if message.text != 'Навсегда':
date_to_unban = self.add_days_to_date(message.text)
else:
pass
ban_object['date_to_unban'] = date_to_unban
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("Подтвердить")
item2 = types.KeyboardButton("Отменить")
markup.add(item1, item2)
self.bot.send_message(message.chat.id,
f"Необходимо подтверждение:\nПользователь:{ban_object['user_id']}\nПричина бана:{ban_object['message_for_user']}.\nСрок бана:{ban_object['date_to_unban']}",
parse_mode='html',
reply_markup=markup)
self.bot.register_next_step_handler(message, self.ban_user_final_step, ban_object)
def ban_user_final_step(self, message, ban_object: dict):
self.logger.info(f"Переход на финальный шаг бана пользователя. Словарь с данными для бана: {ban_object})")
if message.text == 'Подтвердить':
exists = self.BotDB.check_user_in_blacklist(ban_object['user_id'])
if exists:
self.bot.reply_to(message, f"Пользователь уже был заблокирован ранее.")
self.logger.info(f"Пользователь: {ban_object['user_id']} был заблокирован ранее)")
self.admin_panel(message)
else:
self.BotDB.set_user_blacklist(ban_object['user_id'],
ban_object['user_name'],
ban_object['message_for_user'],
ban_object['date_to_unban'])
self.bot.reply_to(message, f"Пользователь {ban_object['user_name']} успешно заблокирован.")
self.logger.info(f"Пользователь: {ban_object['user_id']} успешно заблокирован)")
self.admin_panel(message)
def get_last_users(self, message):
self.logger.info(
f"Попытка получения списка последних пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
list_users = self.BotDB.get_last_users_from_db()
keyboard = self.create_keyboard_with_pagination(1, len(list_users), list_users, 'ban')
self.bot.send_message(chat_id=message.chat.id, text="Список пользователей которые последними обращались к боту",
reply_markup=keyboard)
def get_banned_users(self, message):
self.logger.info(
f"Попытка получения списка заблокированных пользователей. Текст сообщения: {message.text} Имя автора сообщения: {message.from_user.full_name})")
message_text = self.get_banned_users_list(0)
buttons_list = self.get_banned_users_buttons()
if buttons_list:
k = self.create_keyboard_with_pagination(1, len(buttons_list), buttons_list, 'unban')
self.bot.send_message(message.chat.id, message_text, reply_markup=k)
else:
self.bot.send_message(message.chat.id, "В списке забанненых пользователей никого нет")
self.admin_panel(message)
def start_message(self, message):
try:
self.logger.info(
f"Формирование приветственного сообщения для пользователя. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name})")
name_stick_hello = list(Path('Stick').rglob('Hello_*'))
random_stick_hello = open(random.choice(name_stick_hello), 'rb')
self.logger.info(f"Стикер успешно получен из БД")
# logging
if self.LOGS:
self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_sticker(message.chat.id, random_stick_hello)
sleep(0.3)
except Exception as e:
self.logger.error(f"Произошла ошибка при получении стикера. Ошибка: {str(e)}")
if self.LOGS:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
self.logger.info(
f"Получение данных для приветственного сообщения пользователю. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name})")
user_id = message.from_user.id
first_name = message.from_user.first_name
full_name = message.from_user.full_name
is_bot = message.from_user.is_bot
username = message.from_user.username
language_code = message.from_user.language_code
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
if not self.BotDB.user_exists(user_id):
self.BotDB.add_new_user_in_db(user_id, first_name, full_name, username, is_bot, language_code, date,
date)
self.BotDB.update_date_for_user(date, user_id)
markup = self.get_reply_keyboard(message)
hello_message = messages.get_message(self.__get_first_name(message), 'HELLO_MESSAGE')
self.bot.send_message(message.chat.id, hello_message, parse_mode='html', reply_markup=markup,
disable_web_page_preview=not self.PREVIEW_LINK)
except Exception as e:
self.logger.error(
f"Произошла ошибка при отправке приветственного сообщения для пользователя {message.from_user.id} Имя: {message.from_user.full_name}. Ошибка: {str(e)}")
if self.LOGS:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def resend_message_in_group_for_message(self, message):
self.logger.info(
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.id})")
self.bot.forward_message(chat_id=self.GROUP_FOR_MESSAGE,
from_chat_id=message.chat.id,
message_id=message.message_id
)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
self.BotDB.add_new_message_in_db(message.text, message.from_user.id, message.message_id + 1, date)
question = messages.get_message(self.__get_first_name(message), 'QUESTION')
markup = self.get_reply_keyboard(message)
self.bot.send_message(message.chat.id, question, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK,
reply_markup=markup)
def suggest_post(self, message):
try:
self.logger.info(
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.id})")
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS')
self.bot.send_message(message.chat.id, suggest_news, parse_mode='html')
sleep(0.3)
suggest_news_2 = messages.get_message(self.__get_first_name(message), 'SUGGEST_NEWS_2')
self.bot.send_message(message.chat.id, suggest_news_2, parse_mode='html', reply_markup=markup)
except Exception as e:
self.bot.send_message(self.IMPORTANT_LOGS,
f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
def stickers(self, message):
self.logger.info(
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
self.BotDB.update_info_about_stickers(user_id=message.from_user.id)
markup = self.get_reply_keyboard(message)
try:
self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
self.bot.send_message(message.chat.id,
text='Хорошо, лови, добавить можно отсюда: https://t.me/addstickers/love_biysk',
reply_markup=markup)
except ApiTelegramException as e:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
self.logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
def connect_with_admin(self, message):
self.logger.info(
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
connect_with_admin = messages.get_message(self.__get_first_name(message), 'CONNECT_WITH_ADMIN')
self.bot.send_message(message.chat.id, connect_with_admin, parse_mode="html")
# logging
if self.LOGS:
self.bot.forward_message(chat_id=self.GROUP_FOR_LOGS,
from_chat_id=message.chat.id,
message_id=message.message_id)
def end_message(self, message):
try:
self.logger.info(
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = open(random.choice(name_stick_bye), 'rb')
self.bot.send_sticker(message.chat.id, random_stick_bye)
except ApiTelegramException as e:
self.logger.error(
f"Ошибка в функции stickers при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup = types.ReplyKeyboardRemove()
try:
bye_message = messages.get_message(self.__get_first_name(message), 'BYE_MESSAGE')
self.bot.send_message(message.chat.id, bye_message,
parse_mode='html', reply_markup=markup,
disable_web_page_preview=not self.PREVIEW_LINK)
except Exception as e:
if self.LOGS:
self.logger.error(
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
#TODO: deprecated
def send_to_suggest_2(self, message):
markup = self._get_reply_keyboard_for_post()
try:
if message.content_type == 'text':
post_text = message.text.lower()
if post_text.find('неанон') != -1 or post_text.find('не анон') != -1:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif post_text.find('анон') != -1:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
text=f'Пост из ТГ:\n{message.text}\n\nПост опубликован анонимно',
reply_markup=markup
)
else:
self.bot.send_message(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
text=f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
reply_markup=markup
)
elif message.content_type == 'photo' and message.media_group_id is None:
post_text_for_photo = message.caption.lower()
if post_text_for_photo.find('неанон') != -1 or post_text_for_photo.find('не анон') != -1:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
elif post_text_for_photo.find('анон') != -1 or post_text_for_photo.find('анон') != -1:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nПост опубликован анонимно',
photo=message.photo[-1].file_id,
reply_markup=markup
)
else:
self.bot.send_photo(
# TODO: GROUP_FOR_POST
chat_id=self.GROUP_FOR_POST,
caption=f'Пост из ТГ:\n{post_text_for_photo}\n\nАвтор поста: {message.from_user.first_name} @{message.from_user.username}',
photo=message.photo[-1].file_id,
reply_markup=markup
)
# TODO: Не понятна реализация с альбомами от слова совсем. 11.07 Реализация понятна. Нужно отправлять альбомы + сообщение в одном сообщении. Следующее сообщение с пробелом и кнопками
# elif message.content_type == 'photo' and message.media_group_id != None:
# bot.forward_message(chat_id=IMPORTANT_LOGS, from_chat_id=message.chat.id, message_id=message.message_id )
else:
pass
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user)
def suggest_router(self, message):
self.logger.info(
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
try:
if message.content_type == 'text':
lower_text = message.text.lower()
post_text, is_anonymous = self._get_text_message(lower_text, message.from_user.full_name,
message.from_user.id)
if is_anonymous:
self._send_text_message(post_text)
else:
self._send_text_message(post_text)
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user)
elif message.content_type == 'photo' and message.media_group_id is None:
lower_caption = message.caption.lower()
post_caption, is_anonymous = self._get_text_message(lower_caption, message.from_user.full_name,
message.from_user.id)
if is_anonymous:
self._send_photo_message(message.photo[-1].file_id, post_caption)
else:
self._send_photo_message(message.photo[-1].file_id, post_caption)
markup_for_user = self.get_reply_keyboard(message)
success_send_message = messages.get_message(self.__get_first_name(message), 'SUCCESS_SEND_MESSAGE')
self.bot.send_message(message.chat.id, success_send_message, parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK, reply_markup=markup_for_user)
elif message.media_group_id is not None:
self.bot.send_message(message.chat.id,
'Я пока не умею работать с несколькими файлами. Пришли текст и не более одного фото')
self.bot.register_next_step_handler(message, self.suggest_router)
else:
self.bot.send_message(message.chat.id,
'Я пока не умею работать с таким сообщением. Пришли текст и не более одного фото')
self.bot.register_next_step_handler(message, self.suggest_router)
except Exception as e:
if self.LOGS:
self.bot.send_message(chat_id=self.IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@staticmethod
def _get_reply_keyboard_for_post():
markup = types.InlineKeyboardMarkup(row_width=1)
item1 = types.InlineKeyboardButton("Опубликовать", callback_data='publish')
item2 = types.InlineKeyboardButton("Отклонить", callback_data='decline')
markup.add(item1, item2)
return markup
@staticmethod
def _get_text_message(post_text: str, first_name: str, username: str):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста
Returns:
Кортеж из двух элементов:
- Сформированный текст сообщения.
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный).
"""
if "неанон" in post_text or "не анон" in post_text:
is_anonymous = False
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
elif "анон" in post_text:
is_anonymous = True
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous
else:
is_anonymous = False
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
def _send_text_message(self, post_text: str):
markup = self._get_reply_keyboard_for_post()
self.bot.send_message(
chat_id=self.GROUP_FOR_POST,
text=post_text,
parse_mode='html',
disable_web_page_preview=not self.PREVIEW_LINK,
reply_markup=markup
)
def _send_photo_message(self, photo: str, post_text: str):
markup = self._get_reply_keyboard_for_post()
self.bot.send_photo(
chat_id=self.GROUP_FOR_POST,
caption=post_text,
photo=photo,
reply_markup=markup
)
def get_reply_keyboard(self, message):
markup = types.ReplyKeyboardMarkup(resize_keyboard=True, one_time_keyboard=True)
item1 = types.KeyboardButton("📢Предложить свой пост")
item2 = types.KeyboardButton("📩Связаться с админами")
item3 = types.KeyboardButton("👋🏼Сказать пока!")
item4 = types.KeyboardButton("🤪Хочу стикеры") if not self.BotDB.get_info_about_stickers(
user_id=message.from_user.id) else None
if item4:
markup.add(item1, item2, item3, item4)
else:
markup.add(item1, item2, item3)
return markup
@staticmethod
def __get_first_name(message):
return message.from_user.first_name
def check_access(self, user_id: int):
"""Проверка прав на совершение действий"""
return self.BotDB.is_admin(user_id)
@staticmethod
def add_days_to_date(days):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=int(days))
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
@staticmethod
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str):
"""
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
Args:
page: Номер текущей страницы.
total_items: Общее количество элементов.
array_items: Лист кортежей. Содержит в себе user_name: user_id
callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
Returns:
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
"""
# Определяем общее количество страниц
total_pages = (total_items + 7 - 1) // 7
page = page
# Создаем список кнопок
buttons = []
# Вычисляем стартовый номер для текущей страницы
start_index = (page - 1) * 7 #тут было +1, убрал, потому что на текстовом массиве выходит за пределы
# Кнопки с номерами страниц
for i in range(start_index, min(start_index + 7,
len(array_items))): #тут было len(array_items) +1, убрал, потому что на текстовом массиве выходит за пределы
buttons.append(
types.InlineKeyboardButton(f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"))
# Добавляем кнопки "Предыдущая" и "Следующая"
if int(page) > 1:
buttons.insert(6, types.InlineKeyboardButton("⬅️ Предыдущая", callback_data=f"page_{page - 1}"))
if page < total_pages:
buttons.append(types.InlineKeyboardButton("➡️ Следующая", callback_data=f"page_{page + 1}"))
#Добавляем кнопку назад
buttons.append(types.InlineKeyboardButton("🏠 Назад", callback_data="return"))
# Формируем клавиатуру с 3 кнопками в ряд
keyboard = []
for i in range(0, len(buttons), 3):
keyboard.append(buttons[i:i + 3])
return types.InlineKeyboardMarkup(keyboard)
def get_banned_users_list(self, offset: int):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = self.BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
return message
def get_banned_users_buttons(self):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = self.BotDB.get_banned_users_from_db()
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
return user_ids
def delete_user_blacklist(self, user_id):
return self.BotDB.delete_user_blacklist(user_id=user_id)

View File

@@ -0,0 +1 @@
from .main import get_reply_keyboard_for_post, get_reply_keyboard

Binary file not shown.

View File

@@ -0,0 +1,110 @@
from aiogram import types
from aiogram.utils.keyboard import ReplyKeyboardBuilder, InlineKeyboardBuilder
def get_reply_keyboard_for_post():
builder = InlineKeyboardBuilder()
builder.row(types.InlineKeyboardButton(
text="Опубликовать", callback_data="publish"),
types.InlineKeyboardButton(
text="Отклонить", callback_data="decline")
)
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def get_reply_keyboard(BotDB, user_id):
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="📢Предложить свой пост"))
builder.add(types.KeyboardButton(text="📩Связаться с админами"))
builder.add(types.KeyboardButton(text="👋🏼Сказать пока!"))
if not BotDB.get_info_about_stickers(user_id=user_id):
builder.add(types.KeyboardButton(text="🤪Хочу стикеры"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def get_reply_keyboard_leave_chat():
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Выйти из чата"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def get_reply_keyboard_admin():
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Бан (Список)"))
builder.add(types.KeyboardButton(text="Разбан (список)"))
builder.add(types.KeyboardButton(text="Вернуться в бота"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def create_keyboard_with_pagination(page: int, total_items: int, array_items: list[tuple[any, any]], callback: str):
"""
Создает клавиатуру с пагинацией для заданного набора элементов и устанавливает необходимый callback
Args:
page: Номер текущей страницы.
total_items: Общее количество элементов.
array_items: Лист кортежей. Содержит в себе user_name: user_id
callback: Действие в коллбеке. Вернет callback вида ({callback}_{user_id})
Returns:
InlineKeyboardMarkup: Клавиатура с кнопками пагинации.
"""
# Определяем общее количество страниц
total_pages = (total_items + 9 - 1) // 9
# Создаем билдер для клавиатуры
keyboard = InlineKeyboardBuilder()
# TODO: Тут поправить на 9 объектов, а не 7. Клавиатуру переделал
# Вычисляем стартовый номер для текущей страницы
start_index = (page - 1) * 9
# Кнопки с номерами страниц
for i in range(start_index, min(start_index + 9, len(array_items))):
keyboard.add(types.InlineKeyboardButton(
text=f"{array_items[i][0]}", callback_data=f"{callback}_{array_items[i][1]}"
))
keyboard.adjust(3)
next_button = types.InlineKeyboardButton(
text="➡️ Следующая", callback_data=f"page_{page + 1}"
)
prev_button = types.InlineKeyboardButton(
text="⬅️ Предыдущая", callback_data=f"page_{page - 1}"
)
keyboard.row(prev_button, next_button)
home_button = types.InlineKeyboardButton(
text="🏠 Назад", callback_data="return")
keyboard.row(home_button)
k = keyboard.as_markup()
return k
def create_keyboard_for_ban_reason():
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Спам"))
builder.add(types.KeyboardButton(text="Заебал стикерами"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def create_keyboard_for_ban_days():
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="1"))
builder.add(types.KeyboardButton(text="7"))
builder.add(types.KeyboardButton(text="30"))
builder.add(types.KeyboardButton(text="Навсегда"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup
def create_keyboard_for_approve_ban():
builder = ReplyKeyboardBuilder()
builder.add(types.KeyboardButton(text="Подтвердить"))
builder.add(types.KeyboardButton(text="Отменить"))
markup = builder.as_markup(resize_keyboard=True, one_time_keyboard=True)
return markup

21
helper_bot/main.py Normal file
View File

@@ -0,0 +1,21 @@
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.fsm.storage.memory import MemoryStorage
from aiogram.fsm.strategy import FSMStrategy
from helper_bot.handlers.admin import admin_router
from helper_bot.handlers.callback import callback_router
from helper_bot.handlers.group import group_router
from helper_bot.handlers.private import private_router
async def start_bot(bdf):
token = bdf.settings['Telegram']['bot_token']
bot = Bot(token=token, default=DefaultBotProperties(
parse_mode='HTML',
link_preview_is_disabled=bdf.settings['Telegram']['preview_link']
))
dp = Dispatcher(storage=MemoryStorage(), fsm_strategy=FSMStrategy.GLOBAL_USER)
dp.include_routers(private_router, callback_router, group_router, admin_router)
await bot.delete_webhook(drop_pending_updates=True)
await dp.start_polling(bot, skip_updates=True)

View File

View File

@@ -0,0 +1,46 @@
import asyncio
from collections import defaultdict
from typing import Any, Dict, Union
from aiogram import BaseMiddleware
from aiogram.types import Message
class BulkTextMiddleware(BaseMiddleware):
def __init__(self, latency: Union[int, float] = 0.1):
# Initialize latency and album_data dictionary
self.latency = latency
self.texts = defaultdict(list)
#
async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any:
"""
Main middleware logic.
"""
# # If the event has no media_group_id, pass it to the handler immediately
key = (event.chat.id, event.from_user.id)
if not event.text:
return await handler(event, data)
self.texts[key].append(event)
total_before = len(self.texts[key])
# # Wait for a specified latency period
await asyncio.sleep(self.latency)
#
# # Check the total number of messages after the latency
total_after = len(self.texts[key])
#
# # If new messages were added during the latency, exit
if total_before != total_after:
return
#
# # Sort the album messages by message_id and add to data
msg_texts = self.texts[key]
msg_texts.sort(key=lambda x: x.message_id)
data["texts"] = ''.join([msg.text for msg in msg_texts])
#
# Remove the media group from tracking to free up memory
del self.texts[key]
# # Call the original event handler
return await handler(event, data)
#

View File

@@ -0,0 +1,61 @@
import asyncio
from typing import Any, Dict, Union
from aiogram import BaseMiddleware
from aiogram.types import Message
class AlbumMiddleware(BaseMiddleware):
def __init__(self, latency: Union[int, float] = 0.1):
# Initialize latency and album_data dictionary
self.latency = latency
self.album_data = {}
#
def collect_album_messages(self, event: Message):
"""
Collect messages of the same media group.
"""
# # Check if media_group_id exists in album_data
if event.media_group_id not in self.album_data:
# # Create a new entry for the media group
self.album_data[event.media_group_id] = {"messages": []}
#
# # Append the new message to the media group
self.album_data[event.media_group_id]["messages"].append(event)
#
# # Return the total number of messages in the current media group
return len(self.album_data[event.media_group_id]["messages"])
#
async def __call__(self, handler, event: Message, data: Dict[str, Any]) -> Any:
"""
Main middleware logic.
"""
# # If the event has no media_group_id, pass it to the handler immediately
if not event.media_group_id:
return await handler(event, data)
#
# # Collect messages of the same media group
total_before = self.collect_album_messages(event)
#
# # Wait for a specified latency period
await asyncio.sleep(self.latency)
#
# # Check the total number of messages after the latency
total_after = len(self.album_data[event.media_group_id]["messages"])
#
# # If new messages were added during the latency, exit
if total_before != total_after:
return
#
# # Sort the album messages by message_id and add to data
album_messages = self.album_data[event.media_group_id]["messages"]
album_messages.sort(key=lambda x: x.message_id)
data["album"] = album_messages
#
# # Remove the media group from tracking to free up memory
del self.album_data[event.media_group_id]
# # Call the original event handler
return await handler(event, data)
#

View File

@@ -0,0 +1 @@
from .state import StateUser

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,25 @@
import configparser
import os
import sys
class BaseDependencyFactory:
def __init__(self):
# Загрузка настроек из settings.ini
config_path = os.path.join(sys.path[0], 'settings.ini')
self.config = configparser.ConfigParser()
self.config.read(config_path)
self.settings = {}
for section in self.config.sections():
self.settings[section] = {}
for key in self.config[section]:
# Преобразование значений в соответствующий тип
if key == 'PREVIEW_LINK':
self.settings[section][key] = self.config.getboolean(section, key)
elif key == 'LOGS' or key == 'TEST':
self.settings[section][key] = self.config.getboolean(section, key)
else:
self.settings[section][key] = self.config.get(section, key)
def get_settings(self):
return self.settings

View File

@@ -0,0 +1,191 @@
from datetime import datetime, timedelta
from aiogram import types
from aiogram.types import InputMediaPhoto
from helper_bot.keyboards import get_reply_keyboard_for_post
from database.db import BotDB
BotDB = BotDB('database/tg-bot-database')
def get_first_name(message: types.Message) -> str:
return message.from_user.first_name
def get_text_message(post_text: str, first_name: str, username: str):
"""
Форматирует текст сообщения для публикации в зависимости от наличия ключевых слов "анон" и "неанон".
Args:
post_text: Текст сообщения
first_name: Имя автора поста
username: Юзернейм автора поста
Returns:
Кортеж из двух элементов:
- Сформированный текст сообщения.
- Флаг, указывающий, является ли пост анонимным (True - анонимный, False - не анонимный).
"""
if "неанон" in post_text or "не анон" in post_text:
is_anonymous = False
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
elif "анон" in post_text:
is_anonymous = True
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно', is_anonymous
else:
is_anonymous = False
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {first_name} @{username}', is_anonymous
def process_photo_album(album, post_caption: str = ''):
"""
Создает список InputMediaPhoto для альбома.
Args:
album: Album объект из Telegram API.
post_caption: Текст подписи к первому фото.
Returns:
Список InputMediaPhoto.
"""
photo_media = []
for i, message in enumerate(album):
if i == 0:
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id, caption=post_caption))
else:
photo_media.append(InputMediaPhoto(media=message.photo[-1].file_id))
return photo_media
async def send_media_group_message(chat_id: int, message: types.Message, media_group: list[InputMediaPhoto]):
sent_message = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
)
message_id = sent_message[-1].message_id
return message_id
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
if markup is None:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text
)
message_id = sent_message.message_id
return message_id
else:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text,
reply_markup=markup
)
message_id = sent_message.message_id
return message_id
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str, markup: types.ReplyKeyboardMarkup = None):
if markup is None:
await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo
)
else:
await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
photo=photo,
reply_markup=markup
)
def check_access(user_id: int):
"""Проверка прав на совершение действий"""
return BotDB.is_admin(user_id)
def add_days_to_date(days: int):
"""Прибавляет указанное количество дней к текущей дате и возвращает дату в формате DD-MM-YYYY."""
current_date = datetime.now()
future_date = current_date + timedelta(days=days)
formatted_date = future_date.strftime("%d-%m-%Y")
return formatted_date
def get_banned_users_list(offset: int):
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db_with_limits(limit=7, offset=offset)
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
return message
def get_banned_users_buttons():
"""
Возвращает сообщение со списком пользователей и словарь с ником + идентификатором
Args:
offset: отступ для запроса в базу данных
Returns:
message - текст сообщения
user_ids - лист кортежей [(user_name: user_id)]
"""
users = BotDB.get_banned_users_from_db()
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
return user_ids
def get_help_message_id(media_group_message_id: int, data: dict) -> int:
"""
Получает идентификатор сообщения помощи по идентификатору сообщения группы.
Args:
media_group_message_id: Идентификатор сообщения группы
data: Словарь с данными.
Returns:
Идентификатор сообщения помощи.
"""
if 'help_message_id' in data and 'media_group_message_id' in data:
return data['media_group_message_id']
else:
return 0
def delete_user_blacklist(user_id: int):
return BotDB.delete_user_blacklist(user_id=user_id)
def unban_notifier(self):
# Получение сегодняшней даты в формате DD-MM-YYYY
current_date = datetime.now()
print('Мы в функции unban_notifier')
today = current_date.strftime("%d-%m-%Y")
# Получение списка разблокированных пользователей
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)

13
helper_bot/utils/state.py Normal file
View File

@@ -0,0 +1,13 @@
from aiogram.fsm.state import StatesGroup, State
class StateUser(StatesGroup):
START = State()
SUGGEST = State()
ADMIN = State()
CHAT = State()
PRE_CHAT = State()
BAN_2 = State()
BAN_3 = State()
BAN_4 = State()
BAN_FINAL = State()

50
main.py
View File

@@ -1,50 +0,0 @@
import configparser
import os
import sys
from database.db import BotDB
from helper_bot.helper_bot import TelegramHelperBot
from logs.custom_logger import Logger
#TODO: Добавить проверку можно ли отвечать пользователю? Сейчас если у него скрыто лс, ему похоже не приходят сообщения
#TODO Подумать над реализацией функционала с поступлениями в колледжи
#TODO: Покрыть все логированием и ошибками корректными. Ерроры кидать в чат.
#TODO: Покрыть все тестами
class BaseDependencyFactory:
def __init__(self):
# Загрузка настроек из settings.ini
self.logger = Logger('main')
config_path = os.path.join(sys.path[0], 'settings.ini')
self.config = configparser.ConfigParser()
self.config.read(config_path)
self.BotDB = BotDB('database/tg-bot-database')
self.settings = {}
for section in self.config.sections():
self.settings[section] = {}
for key in self.config[section]:
# Преобразование значений в соответствующий тип
if key == 'PREVIEW_LINK':
self.settings[section][key] = self.config.getboolean(section, key)
elif key == 'LOGS' or key == 'TEST':
self.settings[section][key] = self.config.getboolean(section, key)
else:
self.settings[section][key] = self.config.get(section, key)
def get_settings(self):
return self.settings
def get_database(self):
return self.BotDB
if __name__ == "__main__":
# Запускаем тг бота
bot = TelegramHelperBot(BaseDependencyFactory())
bot.start()
#scheduler = BackgroundScheduler()
#scheduler.add_job(bot.unban_notifier(), 'cron', hour=22, minute=9)
#scheduler.start()

View File

@@ -1,5 +1,5 @@
APScheduler==3.10.4
certifi==2024.7.4
certifi~=2024.6.2
charset-normalizer==3.3.2
coverage==7.5.4
exceptiongroup==1.2.1
@@ -8,11 +8,15 @@ iniconfig==2.0.0
loguru==0.7.2
packaging==24.1
pluggy==1.5.0
pyTelegramBotAPI==4.20.0
pytest==8.2.2
pytz==2024.1
requests==2.32.3
six==1.16.0
tomli==2.0.1
tzlocal==5.2
urllib3==2.2.2
urllib3~=2.2.1
pip~=23.2.1
attrs~=23.2.0
typing_extensions~=4.12.2
aiohttp~=3.9.5
aiogram~=3.10.0

7
run_helper.py Normal file
View File

@@ -0,0 +1,7 @@
import asyncio
from helper_bot.main import start_bot
from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
if __name__ == '__main__':
asyncio.run(start_bot(BaseDependencyFactory()))