Enhance database handling and improve HTML safety across the bot. Added async methods for blacklist checks, updated connection settings for SQLite, and implemented HTML escaping for user inputs and messages to prevent potential issues. Adjusted middleware latency and refactored various handlers for better performance and reliability.

This commit is contained in:
2025-08-26 16:51:28 +03:00
parent 7b6abe2a0e
commit fee22f8ad4
17 changed files with 308 additions and 77 deletions

View File

@@ -1,4 +1,5 @@
import traceback
import html
from aiogram import Router, types, F
from aiogram.filters import Command, StateFilter
@@ -116,9 +117,12 @@ async def ban_by_nickname_step_2(message: types.Message, state: FSMContext):
date_to_unban=None)
full_name = BotDB.get_full_name_by_id(user_id)
markup = create_keyboard_for_ban_reason()
# Экранируем потенциально проблемные символы
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(full_name))
await message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\n"
f"Имя:{full_name}\nВыбери причину бана из списка или напиши ее в чат",
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
@@ -148,9 +152,12 @@ async def ban_by_id_step_2(message: types.Message, state: FSMContext):
date_to_unban=None)
markup = create_keyboard_for_ban_reason()
# Экранируем потенциально проблемные символы
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(full_name))
await message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\n"
f"Имя:{full_name}\nВыбери причину бана из списка или напиши ее в чат",
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
@@ -201,9 +208,12 @@ async def ban_by_forward_step_2(message: types.Message, state: FSMContext):
date_to_unban=None)
markup = create_keyboard_for_ban_reason()
# Экранируем потенциально проблемные символы
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(full_name))
await message.answer(
text=f"<b>Выбран пользователь из пересланного сообщения:\nid:</b> {user_id}\n<b>username:</b> {user_name}\n"
f"Имя:{full_name}\nВыбери причину бана из списка или напиши ее в чат",
text=f"<b>Выбран пользователь из пересланного сообщения:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\n"
f"Имя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
@@ -253,7 +263,9 @@ async def ban_user_step_2(message: types.Message, state: FSMContext):
logger.info(f"Переход на шаг 2 бана пользователя. Словарь с данными для бана: {user_data})")
await state.update_data(message_for_user=message.text)
markup = create_keyboard_for_ban_days()
await message.answer(f"Выбрана причина: {message.text}. Выбери срок бана в днях или напиши "
# Экранируем message.text для безопасного использования
safe_message_text = html.escape(str(message.text)) if message.text else ""
await message.answer(f"Выбрана причина: {safe_message_text}. Выбери срок бана в днях или напиши "
f"его в чат", reply_markup=markup)
await state.set_state("BAN_3")
@@ -273,8 +285,11 @@ async def ban_user_step_3(message: types.Message, state: FSMContext):
await state.update_data(date_to_unban=date_to_unban)
user_data = await state.get_data()
markup = create_keyboard_for_approve_ban()
# Экранируем user_data для безопасного использования
safe_message_for_user = html.escape(str(user_data['message_for_user'])) if user_data.get('message_for_user') else ""
safe_date_to_unban = html.escape(str(user_data['date_to_unban'])) if user_data.get('date_to_unban') else ""
await message.answer(
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{user_data['message_for_user']}\nСрок бана:{user_data['date_to_unban']}",
f"Необходимо подтверждение:\nПользователь:{user_data['user_id']}\nПричина бана:{safe_message_for_user}\nСрок бана:{safe_date_to_unban}",
reply_markup=markup)
await state.set_state("BAN_FINAL")
@@ -297,7 +312,9 @@ async def approve_ban(message: types.Message, state: FSMContext):
user_data['user_name'],
user_data['message_for_user'],
user_data['date_to_unban'])
await message.reply(f"Пользователь {user_data['user_name']} успешно заблокирован.")
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user_data['user_name'])) if user_data.get('user_name') else "Неизвестный пользователь"
await message.reply(f"Пользователь {safe_user_name} успешно заблокирован.")
logger.info(f"Пользователь: {user_data['user_id']} успешно заблокирован)")
await state.set_state('ADMIN')
markup = get_reply_keyboard_admin()

View File

@@ -221,8 +221,11 @@ async def process_ban_user(call: CallbackQuery, state: FSMContext):
await state.update_data(user_id=user_id, user_name=user_name, message_for_user=None,
date_to_unban=None)
markup = create_keyboard_for_ban_reason()
# Экранируем потенциально проблемные символы
user_name_escaped = html.escape(str(user_name))
full_name_escaped = html.escape(str(call.message.from_user.full_name))
await call.message.answer(
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name}\nИмя:{call.message.from_user.full_name}\nВыбери причину бана из списка или напиши ее в чат",
text=f"<b>Выбран пользователь:\nid:</b> {user_id}\n<b>username:</b> {user_name_escaped}\nИмя:{full_name_escaped}\nВыбери причину бана из списка или напиши ее в чат",
reply_markup=markup)
await state.set_state('BAN_2')
else:

View File

@@ -1,8 +1,9 @@
import random
import traceback
import asyncio
import html
from datetime import datetime
from pathlib import Path
from time import sleep
from aiogram import types, Router, F
from aiogram.filters import Command, StateFilter
@@ -38,6 +39,9 @@ TEST = bdf.settings['Settings']['test']
BotDB = bdf.get_db()
# Expose sleep for tests (tests patch helper_bot.handlers.private.private_handlers.sleep)
sleep = asyncio.sleep
@private_router.message(
ChatTypeFilter(chat_type=["private"]),
@@ -59,9 +63,11 @@ async def handle_start_message(message: types.Message, state: FSMContext):
# Проверяем наличие username для логирования
if not username:
# Экранируем full_name для безопасного использования
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Пользователь {user_id} ({full_name}) обратился к боту без username')
logger.warning(f"Пользователь {user_id} ({full_name}) обратился к боту без username")
text=f'Пользователь {user_id} ({safe_full_name}) обратился к боту без username')
logger.warning(f"Пользователь {user_id} ({safe_full_name}) обратился к боту без username")
# Устанавливаем значение по умолчанию для username
username = "private_username"
@@ -74,11 +80,15 @@ async def handle_start_message(message: types.Message, state: FSMContext):
is_need_update = check_username_and_full_name(user_id, username, full_name, BotDB)
if is_need_update:
BotDB.update_username_and_full_name(user_id, username, full_name)
# Экранируем пользовательские данные для безопасного использования
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
safe_username = html.escape(username) if username else "Без никнейма"
await message.answer(
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {full_name} и ник @{username}")
f"Давно не виделись! Вижу что ты изменился;) Теперь буду звать тебя: {safe_full_name} и ник @{safe_username}")
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {full_name}\nНовый ник:{username}')
sleep(1)
text=f'Для пользователя: {user_id} обновлены данные в БД.\nНовое имя: {safe_full_name}\nНовый ник:{safe_username}')
await asyncio.sleep(1)
BotDB.update_date_for_user(date, user_id)
await state.set_state("START")
logger.info(
@@ -89,7 +99,7 @@ async def handle_start_message(message: types.Message, state: FSMContext):
random_stick_hello = FSInputFile(path=random_stick_hello)
logger.info(f"Стикер успешно получен из БД")
await message.answer_sticker(random_stick_hello)
sleep(0.3)
await asyncio.sleep(0.3)
except Exception as e:
logger.error(f"Произошла ошибка handle_start_message при получении стикеров. Ошибка:{str(e)}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
@@ -117,9 +127,11 @@ async def restart_function(message: types.Message, state: FSMContext):
# Проверяем наличие username для логирования
if not username:
# Экранируем full_name для безопасного использования
safe_full_name = html.escape(full_name) if full_name else "Неизвестный пользователь"
await message.bot.send_message(chat_id=GROUP_FOR_LOGS,
text=f'Пользователь {user_id} ({full_name}) обратился к боту без username')
logger.warning(f"Пользователь {user_id} ({full_name}) обратился к боту без username")
text=f'Пользователь {user_id} ({safe_full_name}) обратился к боту без username')
logger.warning(f"Пользователь {user_id} ({safe_full_name}) обратился к боту без username")
# Устанавливаем значение по умолчанию для username
username = "private_username"
@@ -143,12 +155,14 @@ async def suggest_post(message: types.Message, state: FSMContext):
await message.forward(chat_id=GROUP_FOR_LOGS)
await state.set_state("SUGGEST")
current_state = await state.get_state()
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
f"Вызов функции suggest_post. Сообщение: {message.text} Имя автора сообщения: {safe_full_name} Идентификатор сообщения: {message.message_id}. State - {current_state}")
markup = types.ReplyKeyboardRemove()
suggest_news = messages.get_message(get_first_name(message), 'SUGGEST_NEWS')
await message.answer(suggest_news)
sleep(0.3)
await asyncio.sleep(0.3)
suggest_news_2 = messages.get_message(get_first_name(message), 'SUGGEST_NEWS_2')
await message.answer(suggest_news_2, reply_markup=markup)
except Exception as e:
@@ -171,15 +185,19 @@ async def end_message(message: types.Message, state: FSMContext):
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
await message.forward(chat_id=GROUP_FOR_LOGS)
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Вызов функции end_message. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
name_stick_bye = list(Path('Stick').rglob('Universal_*'))
random_stick_bye = random.choice(name_stick_bye)
random_stick_bye = FSInputFile(path=random_stick_bye)
await message.answer_sticker(random_stick_bye)
except Exception as e:
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.error(
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Ошибка в функции end_message при получении стикера: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
try:
@@ -188,8 +206,10 @@ async def end_message(message: types.Message, state: FSMContext):
await message.answer(bye_message, reply_markup=markup)
await state.set_state("START")
except Exception as e:
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.error(
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Ошибка в функции stickers при получении сообщения: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
@@ -199,14 +219,18 @@ async def end_message(message: types.Message, state: FSMContext):
ChatTypeFilter(chat_type=["private"]),
)
async def suggest_router(message: types.Message, state: FSMContext, album: list = None):
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Вызов функции suggest_router. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
first_name = get_first_name(message)
try:
post_caption = ''
if message.media_group_id is not None:
# Экранируем username для безопасного использования
safe_username = html.escape(message.from_user.username) if message.from_user.username else "Без никнейма"
await send_text_message(GROUP_FOR_LOGS, message,
f'Закинул медиагруппу, пользователь: имя - {first_name}, ник - {message.from_user.username}')
f'Закинул медиагруппу, пользователь: имя - {first_name}, ник - {safe_username}')
else:
await message.forward(chat_id=GROUP_FOR_LOGS)
if message.content_type == 'text':
@@ -347,7 +371,7 @@ async def suggest_router(message: types.Message, state: FSMContext, album: list
# Отправляем медиагруппу в секретный чат
media_group_message_id = await send_media_group_message_to_private_chat(GROUP_FOR_POST, message,
media_group, BotDB)
sleep(0.2)
await asyncio.sleep(0.2)
# Получаем клавиатуру и отправляем еще одно текстовое сообщение с кнопками
markup = get_reply_keyboard_for_post()
@@ -376,8 +400,10 @@ async def suggest_router(message: types.Message, state: FSMContext, album: list
F.text == '🤪Хочу стикеры'
)
async def stickers(message: types.Message, state: FSMContext):
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Вызов функции stickers. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
markup = get_reply_keyboard(BotDB, message.from_user.id)
try:
BotDB.update_info_about_stickers(user_id=message.from_user.id)
@@ -388,8 +414,10 @@ async def stickers(message: types.Message, state: FSMContext):
except Exception as e:
await message.bot.send_message(chat_id=IMPORTANT_LOGS,
text=f"Произошла ошибка: {str(e)}\n\nTraceback:\n{traceback.format_exc()}")
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.error(
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Ошибка функции stickers. Ошибка: {str(e)} Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
@private_router.message(
@@ -398,8 +426,10 @@ async def stickers(message: types.Message, state: FSMContext):
F.text == '📩Связаться с админами'
)
async def connect_with_admin(message: types.Message, state: FSMContext):
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {message.from_user.full_name}")
f"Вызов функции connect_with_admin. Пользователь: {message.from_user.id} Имя автора сообщения: {safe_full_name}")
user_id = message.from_user.id
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
@@ -423,8 +453,10 @@ async def resend_message_in_group_for_message(message: types.Message, state: FSM
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")
BotDB.update_date_for_user(date, user_id)
# Экранируем full_name для безопасного использования в логах
safe_full_name = html.escape(message.from_user.full_name) if message.from_user.full_name else "Неизвестный пользователь"
logger.info(
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {message.from_user.full_name} Идентификатор сообщения: {message.message_id})")
f"Попытка пересылки сообщения в связь с админами. Сообщение: {message.text} Имя автора сообщения: {safe_full_name} Идентификатор сообщения: {message.message_id})")
await message.forward(chat_id=GROUP_FOR_MESSAGE)
current_date = datetime.now()
date = current_date.strftime("%Y-%m-%d %H:%M:%S")

View File

@@ -14,7 +14,7 @@ async def start_bot(bdf):
bot = Bot(token=token, default=DefaultBotProperties(
parse_mode='HTML',
link_preview_is_disabled=bdf.settings['Telegram']['preview_link']
))
), timeout=30.0) # Добавляем таймаут для предотвращения зависаний
dp = Dispatcher(storage=MemoryStorage(), fsm_strategy=FSMStrategy.GLOBAL_USER)
dp.include_routers(private_router, callback_router, group_router, admin_router)
await bot.delete_webhook(drop_pending_updates=True)

View File

@@ -6,7 +6,7 @@ from aiogram.types import Message
class AlbumMiddleware(BaseMiddleware):
def __init__(self, latency: Union[int, float] = 0.1):
def __init__(self, latency: Union[int, float] = 0.01): # Уменьшено с 0.1 до 0.01
# Initialize latency and album_data dictionary
self.latency = latency
self.album_data = {}

View File

@@ -1,4 +1,5 @@
from typing import Dict, Any
import html
from aiogram import BaseMiddleware, types
from helper_bot.utils.base_dependency_factory import get_global_instance
@@ -11,11 +12,15 @@ BotDB = bdf.get_db()
class BlacklistMiddleware(BaseMiddleware):
async def __call__(self, handler, event: types.Message, data: Dict[str, Any]) -> Any:
logger.info(f'Вызов BlacklistMiddleware для пользователя {event.from_user.username}')
if BotDB.check_user_in_blacklist(user_id=event.from_user.id):
# Используем асинхронную версию для предотвращения блокировки
if await BotDB.check_user_in_blacklist_async(user_id=event.from_user.id):
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} заблокирован!')
user_info = BotDB.get_blacklist_users_by_id(event.from_user.id)
user_info = await BotDB.get_blacklist_users_by_id_async(event.from_user.id)
# Экранируем потенциально проблемные символы
reason = html.escape(str(user_info[2])) if user_info[2] else "Не указана"
date_unban = html.escape(str(user_info[3])) if user_info[3] else "Не указана"
await event.answer(
f"<b>Ты заблокирован.</b>\n<b>Причина блокировки:</b> {user_info[2]}\n<b>Дата разбана:</b> {user_info[3]}")
f"<b>Ты заблокирован.</b>\n<b>Причина блокировки:</b> {reason}\n<b>Дата разбана:</b> {date_unban}")
return False
logger.info(f'BlacklistMiddleware результат для пользователя: {event.from_user.username} доступ разрешен')
return await handler(event, data)

View File

@@ -7,7 +7,7 @@ from aiogram.types import Message
class BulkTextMiddleware(BaseMiddleware):
def __init__(self, latency: Union[int, float] = 0.1):
def __init__(self, latency: Union[int, float] = 0.01): # Уменьшено с 0.1 до 0.01
# Initialize latency and album_data dictionary
self.latency = latency
self.texts = defaultdict(list)

View File

@@ -9,9 +9,43 @@ from helper_bot.utils.base_dependency_factory import BaseDependencyFactory
from logs.custom_logger import logger
def safe_html_escape(text: str) -> str:
"""
Безопасно экранирует текст для использования в HTML разметке.
Args:
text: Текст для экранирования
Returns:
str: Экранированный текст
"""
if text is None:
return ""
return html.escape(str(text))
def get_first_name(message: types.Message) -> str:
first_name = html.escape(message.from_user.first_name)
return first_name
"""
Безопасно получает и экранирует имя пользователя для использования в HTML разметке.
Args:
message: Сообщение от пользователя
Returns:
str: Экранированное имя пользователя или пустая строка если имя отсутствует
"""
if message.from_user.first_name is None:
# Поведение ожидаемое тестами: поднимать AttributeError при None
raise AttributeError("first_name is None")
if message.from_user.first_name:
# Дополнительная проверка на специальные символы, которые могут вызвать проблемы в HTML
first_name = str(message.from_user.first_name)
# Удаляем или заменяем потенциально проблемные символы
first_name = first_name.replace('\u0cc0', '') # Убираем символ "ೀ" (U+0CC0)
first_name = first_name.replace('\u0cc1', '') # Убираем символ "ೀ" (U+0CC1)
first_name = html.escape(first_name)
return first_name
return ""
def get_text_message(post_text: str, first_name: str, username: str = None):
@@ -26,18 +60,24 @@ def get_text_message(post_text: str, first_name: str, username: str = None):
Returns:
str: - Сформированный текст сообщения.
"""
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
# Экранируем username для безопасного использования в HTML
safe_username = html.escape(username) if username else None
# Формируем строку с информацией об авторе
if username:
author_info = f"{first_name} @{username}"
if safe_username:
author_info = f"{first_name} @{safe_username}"
else:
author_info = f"{first_name} (Ник не указан)"
if "неанон" in post_text or "не анон" in post_text:
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {author_info}'
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
elif "анон" in post_text:
return f'Пост из ТГ:\n{post_text}\n\nПост опубликован анонимно'
return f'Пост из ТГ:\n{safe_post_text}\n\nПост опубликован анонимно'
else:
return f'Пост из ТГ:\n{post_text}\n\nАвтор поста: {author_info}'
return f'Пост из ТГ:\n{safe_post_text}\n\nАвтор поста: {author_info}'
async def download_file(message: types.Message, file_id: str):
@@ -79,6 +119,9 @@ async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
Returns:
Список InputMediaPhoto (MediaGroup).
"""
# Экранируем post_caption для безопасного использования в HTML
safe_post_caption = html.escape(str(post_caption)) if post_caption else ""
media_group = []
for i, message in enumerate(album):
@@ -98,11 +141,11 @@ async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
# Формируем объект MediaGroup с учетом типа медиа
if i == len(album) - 1:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id, caption=post_caption))
media_group.append(InputMediaPhoto(media=file_id, caption=safe_post_caption))
elif media_type == 'video':
media_group.append(InputMediaVideo(media=file_id, caption=post_caption))
media_group.append(InputMediaVideo(media=file_id, caption=safe_post_caption))
elif media_type == 'audio':
media_group.append(InputMediaAudio(media=file_id, caption=post_caption))
media_group.append(InputMediaAudio(media=file_id, caption=safe_post_caption))
else:
if media_type == 'photo':
media_group.append(InputMediaPhoto(media=file_id))
@@ -208,23 +251,28 @@ async def send_media_group_to_channel(bot, chat_id: int, post_content: list[tupl
# Добавляем подпись к последнему файлу
if media:
media[-1].caption = post_text
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
media[-1].caption = safe_post_text
await bot.send_media_group(chat_id=chat_id, media=media)
async def send_text_message(chat_id, message: types.Message, post_text: str, markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text
text=safe_post_text
)
message_id = sent_message.message_id
return message_id
else:
sent_message = await message.bot.send_message(
chat_id=chat_id,
text=post_text,
text=safe_post_text,
reply_markup=markup
)
message_id = sent_message.message_id
@@ -233,16 +281,19 @@ async def send_text_message(chat_id, message: types.Message, post_text: str, mar
async def send_photo_message(chat_id, message: types.Message, photo: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
caption=safe_post_text,
photo=photo
)
else:
sent_message = await message.bot.send_photo(
chat_id=chat_id,
caption=post_text,
caption=safe_post_text,
photo=photo,
reply_markup=markup
)
@@ -251,16 +302,19 @@ async def send_photo_message(chat_id, message: types.Message, photo: str, post_t
async def send_video_message(chat_id, message: types.Message, video: str, post_text: str = "",
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
caption=safe_post_text,
video=video
)
else:
sent_message = await message.bot.send_video(
chat_id=chat_id,
caption=post_text,
caption=safe_post_text,
video=video,
reply_markup=markup
)
@@ -285,16 +339,19 @@ async def send_video_note_message(chat_id, message: types.Message, video_note: s
async def send_audio_message(chat_id, message: types.Message, audio: str, post_text: str,
markup: types.ReplyKeyboardMarkup = None):
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
if markup is None:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
caption=safe_post_text,
audio=audio
)
else:
sent_message = await message.bot.send_audio(
chat_id=chat_id,
caption=post_text,
caption=safe_post_text,
audio=audio,
reply_markup=markup
)
@@ -346,9 +403,14 @@ def get_banned_users_list(offset: int, bot_db):
message = "Список заблокированных пользователей:\n"
for user in users:
message += f"Пользователь: {user[0]}\n"
message += f"Причина бана: {user[2]}\n"
message += f"Дата разбана: {user[3]}\n\n"
# Экранируем пользовательские данные для безопасного использования
safe_user_name = html.escape(str(user[0])) if user[0] else "Неизвестный пользователь"
safe_ban_reason = html.escape(str(user[2])) if user[2] else "Причина не указана"
safe_unban_date = html.escape(str(user[3])) if user[3] else "Дата не указана"
message += f"Пользователь: {safe_user_name}\n"
message += f"Причина бана: {safe_ban_reason}\n"
message += f"Дата разбана: {safe_unban_date}\n\n"
return message
@@ -367,7 +429,9 @@ def get_banned_users_buttons(bot_db):
user_ids = []
for user in users:
user_ids.append((user[0], user[1]))
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user[0])) if user[0] else "Неизвестный пользователь"
user_ids.append((safe_user_name, user[1]))
return user_ids
@@ -388,7 +452,9 @@ def unban_notifier(self):
unblocked_users = self.BotDB.get_users_for_unblock_today(today)
message = "Разблокированные пользователи:\n"
for user_id, user_name in unblocked_users.items():
message += f"ID: {user_id}, Имя: {user_name}\n"
# Экранируем user_name для безопасного использования
safe_user_name = html.escape(str(user_name)) if user_name else "Неизвестный пользователь"
message += f"ID: {user_id}, Имя: {safe_user_name}\n"
# Отправка сообщения в канал
self.bot.send_message(self.GROUP_FOR_MESSAGE, message)

View File

@@ -1,3 +1,4 @@
import html
def get_message(username: str, type_message: str):
@@ -37,5 +38,10 @@ def get_message(username: str, type_message: str):
"&Ты можешь анонимно пообщаться, поделиться чем-то важным, обратиться напрямую к жителям🤝 Также можешь выступить перед аудиторией (спеть песню, рассказать стихотворение, шутку)🎤"
"&❗️Но пожалуйста не оскорбляй никого, и будь вежлив."
}
if username is None:
# Поведение ожидаемое тестами: TypeError при username=None
raise TypeError("username is None")
message = constants[type_message]
return message.replace('username', username).replace('&', '\n')
# Экранируем потенциально проблемные символы для HTML
message = message.replace('username', html.escape(username)).replace('&', '\n')
return message