Добавлен функционал для работы с S3 хранилищем и обновление контента опубликованных постов

- В `env.example` добавлены настройки для S3 хранилища.
- Обновлен файл зависимостей `requirements.txt`, добавлена библиотека `aioboto3` для работы с S3.
- В `PostRepository` и `AsyncBotDB` реализованы методы для обновления и получения контента опубликованных постов.
- Обновлены обработчики публикации постов для сохранения идентификаторов опубликованных сообщений и их контента.
- Реализована логика сохранения медиафайлов в S3 или на локальный диск в зависимости от конфигурации.
- Обновлены тесты для проверки нового функционала.
This commit is contained in:
2026-01-23 23:19:16 +03:00
parent 42f168f329
commit fecac6091e
14 changed files with 992 additions and 143 deletions

View File

@@ -1,8 +1,10 @@
import os
import sys
from typing import Optional
from dotenv import load_dotenv
from database.async_db import AsyncBotDB
from helper_bot.utils.s3_storage import S3StorageService
class BaseDependencyFactory:
@@ -21,6 +23,7 @@ class BaseDependencyFactory:
self.database = AsyncBotDB(database_path)
self._load_settings_from_env()
self._init_s3_storage()
def _load_settings_from_env(self):
"""Загружает настройки из переменных окружения."""
@@ -48,6 +51,29 @@ class BaseDependencyFactory:
'port': self._parse_int(os.getenv('METRICS_PORT', '8080'))
}
self.settings['S3'] = {
'enabled': self._parse_bool(os.getenv('S3_ENABLED', 'false')),
'endpoint_url': os.getenv('S3_ENDPOINT_URL', ''),
'access_key': os.getenv('S3_ACCESS_KEY', ''),
'secret_key': os.getenv('S3_SECRET_KEY', ''),
'bucket_name': os.getenv('S3_BUCKET_NAME', ''),
'region': os.getenv('S3_REGION', 'us-east-1')
}
def _init_s3_storage(self):
"""Инициализирует S3StorageService если S3 включен."""
self.s3_storage = None
if self.settings['S3']['enabled']:
s3_config = self.settings['S3']
if s3_config['endpoint_url'] and s3_config['access_key'] and s3_config['secret_key'] and s3_config['bucket_name']:
self.s3_storage = S3StorageService(
endpoint_url=s3_config['endpoint_url'],
access_key=s3_config['access_key'],
secret_key=s3_config['secret_key'],
bucket_name=s3_config['bucket_name'],
region=s3_config['region']
)
def _parse_bool(self, value: str) -> bool:
"""Парсит строковое значение в boolean."""
return value.lower() in ('true', '1', 'yes', 'on')
@@ -65,6 +91,10 @@ class BaseDependencyFactory:
def get_db(self) -> AsyncBotDB:
"""Возвращает подключение к базе данных."""
return self.database
def get_s3_storage(self) -> Optional[S3StorageService]:
"""Возвращает S3StorageService если S3 включен, иначе None."""
return self.s3_storage
_global_instance = None

View File

@@ -2,6 +2,8 @@ import html
import os
import random
import time
import tempfile
import asyncio
from datetime import datetime, timedelta
from time import sleep
from typing import List, Dict, Any, Optional, TYPE_CHECKING, Union
@@ -158,17 +160,19 @@ def get_text_message(post_text: str, first_name: str, username: str = None, is_a
@track_time("download_file", "helper_func")
@track_errors("helper_func", "download_file")
@track_file_operations("unknown")
async def download_file(message: types.Message, file_id: str, content_type: str = None) -> Optional[str]:
async def download_file(message: types.Message, file_id: str, content_type: str = None,
s3_storage = None) -> Optional[str]:
"""
Скачивает файл по file_id из Telegram и сохраняет в соответствующую папку.
Скачивает файл по file_id из Telegram и сохраняет в S3 или на локальный диск.
Args:
message: сообщение
file_id: File ID файла
content_type: тип контента (photo, video, audio, voice, video_note)
s3_storage: опциональный S3StorageService для сохранения в S3
Returns:
Путь к сохраненному файлу, если файл был скачан успешно, иначе None
S3 ключ (если s3_storage указан) или локальный путь к файлу, иначе None
"""
start_time = time.time()
@@ -178,51 +182,95 @@ async def download_file(message: types.Message, file_id: str, content_type: str
logger.error("download_file: Неверные параметры - file_id, message или bot отсутствуют")
return None
# Определяем папку по типу контента
type_folders = {
'photo': 'photos',
'video': 'videos',
'audio': 'music',
'voice': 'voice',
'video_note': 'video_notes'
}
folder = type_folders.get(content_type, 'other')
base_path = "files"
full_folder_path = os.path.join(base_path, folder)
# Создаем необходимые папки
os.makedirs(base_path, exist_ok=True)
os.makedirs(full_folder_path, exist_ok=True)
logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}")
# Получаем информацию о файле
file = await message.bot.get_file(file_id)
if not file or not file.file_path:
logger.error(f"download_file: Не удалось получить информацию о файле {file_id}")
return None
# Генерируем уникальное имя файла
# Определяем расширение
original_filename = os.path.basename(file.file_path)
file_extension = os.path.splitext(original_filename)[1] or '.bin'
safe_filename = f"{file_id}{file_extension}"
file_path = os.path.join(full_folder_path, safe_filename)
# Скачиваем файл
await message.bot.download_file(file_path=file.file_path, destination=file_path)
# Проверяем, что файл действительно скачался
if not os.path.exists(file_path):
logger.error(f"download_file: Файл не был скачан - {file_path}")
return None
file_size = os.path.getsize(file_path)
download_time = time.time() - start_time
logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с")
return file_path
if s3_storage:
# Сохраняем в S3
# Скачиваем во временный файл
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension)
temp_path = temp_file.name
temp_file.close()
try:
# Скачиваем из Telegram
await message.bot.download_file(file_path=file.file_path, destination=temp_path)
# Генерируем S3 ключ
s3_key = s3_storage.generate_s3_key(content_type, file_id)
# Загружаем в S3
success = await s3_storage.upload_file(temp_path, s3_key)
# Удаляем временный файл
try:
os.remove(temp_path)
except:
pass
if success:
file_size = file.file_size if hasattr(file, 'file_size') else 0
download_time = time.time() - start_time
logger.info(f"download_file: Файл загружен в S3 - {s3_key}, размер: {file_size} байт, время: {download_time:.2f}с")
return s3_key
else:
logger.error(f"download_file: Не удалось загрузить файл в S3: {s3_key}")
return None
except Exception as e:
# Удаляем временный файл при ошибке
try:
os.remove(temp_path)
except:
pass
download_time = time.time() - start_time
logger.error(f"download_file: Ошибка загрузки файла в S3 {file_id}: {e}, время: {download_time:.2f}с")
return None
else:
# Старая логика - сохраняем на локальный диск
# Определяем папку по типу контента
type_folders = {
'photo': 'photos',
'video': 'videos',
'audio': 'music',
'voice': 'voice',
'video_note': 'video_notes'
}
folder = type_folders.get(content_type, 'other')
base_path = "files"
full_folder_path = os.path.join(base_path, folder)
# Создаем необходимые папки
os.makedirs(base_path, exist_ok=True)
os.makedirs(full_folder_path, exist_ok=True)
logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}")
# Генерируем уникальное имя файла
safe_filename = f"{file_id}{file_extension}"
file_path = os.path.join(full_folder_path, safe_filename)
# Скачиваем файл
await message.bot.download_file(file_path=file.file_path, destination=file_path)
# Проверяем, что файл действительно скачался
if not os.path.exists(file_path):
logger.error(f"download_file: Файл не был скачан - {file_path}")
return None
file_size = os.path.getsize(file_path)
download_time = time.time() - start_time
logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с")
return file_path
except Exception as e:
download_time = time.time() - start_time
@@ -283,11 +331,21 @@ async def prepare_media_group_from_middlewares(album, post_caption: str = ''):
return media_group
async def _save_media_group_background(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int], s3_storage) -> None:
"""Сохраняет медиагруппу в фоне, чтобы не блокировать ответ пользователю"""
try:
success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id, s3_storage)
if not success:
logger.warning(f"_save_media_group_background: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}")
except Exception as e:
logger.error(f"_save_media_group_background: Ошибка при сохранении медиа для медиагруппы {sent_message[-1].message_id}: {e}")
@track_time("add_in_db_media_mediagroup", "helper_func")
@track_errors("helper_func", "add_in_db_media_mediagroup")
@track_media_processing("media_group")
@db_query_time("add_in_db_media_mediagroup", "posts", "insert")
async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None) -> bool:
async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any,
main_post_id: Optional[int] = None, s3_storage = None) -> bool:
"""
Добавляет контент медиа-группы в базу данных
@@ -351,23 +409,31 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db:
logger.debug(f"add_in_db_media_mediagroup: Обрабатываю {content_type} в сообщении {i+1}/{len(sent_message)}")
# Скачиваем файл
file_path = await download_file(message, file_id=file_id, content_type=content_type)
# Получаем s3_storage если не передан
if s3_storage is None:
bdf = get_global_instance()
s3_storage = bdf.get_s3_storage()
# Скачиваем файл (в S3 или на локальный диск)
file_path = await download_file(message, file_id=file_id, content_type=content_type, s3_storage=s3_storage)
if not file_path:
logger.error(f"add_in_db_media_mediagroup: Не удалось скачать файл {file_id} в сообщении {i+1}/{len(sent_message)}")
failed_count += 1
continue
# Добавляем в базу данных
success = await bot_db.add_post_content(post_id, message.message_id, file_path, content_type)
# Для медиагруппы используем post_id (основной пост) как message_id для контента,
# так как FOREIGN KEY требует существования message_id в post_from_telegram_suggest
success = await bot_db.add_post_content(post_id, post_id, file_path, content_type)
if not success:
logger.error(f"add_in_db_media_mediagroup: Не удалось добавить контент в БД для сообщения {i+1}/{len(sent_message)}")
# Удаляем скачанный файл при ошибке БД
try:
os.remove(file_path)
logger.debug(f"add_in_db_media_mediagroup: Удален файл {file_path} после ошибки БД")
except Exception as e:
logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}")
# Удаляем скачанный файл при ошибке БД (только если это локальный файл, не S3)
if file_path.startswith('files/'):
try:
os.remove(file_path)
logger.debug(f"add_in_db_media_mediagroup: Удален файл {file_path} после ошибки БД")
except Exception as e:
logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}")
failed_count += 1
continue
@@ -402,7 +468,7 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db:
@track_media_processing("media_group")
@db_query_time("add_in_db_media", "posts", "insert")
@track_file_operations("media")
async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
async def add_in_db_media(sent_message: types.Message, bot_db: Any, s3_storage = None) -> bool:
"""
Добавляет контент одиночного сообщения в базу данных
@@ -451,8 +517,13 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
logger.debug(f"add_in_db_media: Обрабатываю {content_type} для сообщения {post_id}")
# Скачиваем файл
file_path = await download_file(sent_message, file_id=file_id, content_type=content_type)
# Получаем s3_storage если не передан
if s3_storage is None:
bdf = get_global_instance()
s3_storage = bdf.get_s3_storage()
# Скачиваем файл (в S3 или на локальный диск)
file_path = await download_file(sent_message, file_id=file_id, content_type=content_type, s3_storage=s3_storage)
if not file_path:
logger.error(f"add_in_db_media: Не удалось скачать файл {file_id} для сообщения {post_id}")
return False
@@ -461,12 +532,13 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
success = await bot_db.add_post_content(post_id, sent_message.message_id, file_path, content_type)
if not success:
logger.error(f"add_in_db_media: Не удалось добавить контент в БД для сообщения {post_id}")
# Удаляем скачанный файл при ошибке БД
try:
os.remove(file_path)
logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД")
except Exception as e:
logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}")
# Удаляем скачанный файл при ошибке БД (только если это локальный файл, не S3)
if file_path.startswith('files/'):
try:
os.remove(file_path)
logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД")
except Exception as e:
logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}")
return False
processing_time = time.time() - start_time
@@ -484,7 +556,7 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool:
@track_media_processing("media_group")
@db_query_time("send_media_group_message_to_private_chat", "posts", "insert")
async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message,
media_group: List, bot_db: Any, main_post_id: Optional[int] = None) -> int:
media_group: List, bot_db: Any, main_post_id: Optional[int] = None, s3_storage=None) -> int:
sent_message = await message.bot.send_media_group(
chat_id=chat_id,
media=media_group,
@@ -496,62 +568,93 @@ async def send_media_group_message_to_private_chat(chat_id: int, message: types.
created_at=int(datetime.now().timestamp())
)
await bot_db.add_post(post)
success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id)
if not success:
logger.warning(f"send_media_group_message_to_private_chat: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}")
# Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю
asyncio.create_task(_save_media_group_background(sent_message, bot_db, main_post_id, s3_storage))
message_id = sent_message[-1].message_id
return message_id
@track_time("send_media_group_to_channel", "helper_func")
@track_errors("helper_func", "send_media_group_to_channel")
@track_media_processing("media_group")
async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str):
async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str, s3_storage = None):
"""
Отправляет медиа-группу с подписью к последнему файлу.
Args:
bot: Экземпляр бота aiogram.
chat_id: ID чата для отправки.
post_content: Список кортежей с путями к файлам.
post_content: Список кортежей с путями к файлам (локальные пути или S3 ключи).
post_text: Текст подписи.
s3_storage: опциональный S3StorageService для работы с S3.
"""
logger.info(f"Начинаю отправку медиа-группы в чат {chat_id}, количество файлов: {len(post_content)}")
# Получаем s3_storage если не передан
if s3_storage is None:
bdf = get_global_instance()
s3_storage = bdf.get_s3_storage()
media = []
for i, file_path in enumerate(post_content):
try:
file = FSInputFile(path=file_path[0])
type = file_path[1]
logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path[0]} (тип: {type})")
if type == 'video':
media.append(types.InputMediaVideo(media=file))
elif type == 'photo':
media.append(types.InputMediaPhoto(media=file))
else:
logger.warning(f"Неизвестный тип файла: {type} для {file_path[0]}")
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path[0]}")
return
except Exception as e:
logger.error(f"Ошибка при обработке файла {file_path[0]}: {e}")
return
logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки")
# Добавляем подпись к последнему файлу
if media:
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
media[-1].caption = safe_post_text
logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}")
temp_files = [] # Для хранения путей к временным файлам
try:
await bot.send_media_group(chat_id=chat_id, media=media)
logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}")
except Exception as e:
logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}")
raise
for i, file_path_tuple in enumerate(post_content):
try:
file_path, content_type = file_path_tuple
logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path} (тип: {content_type})")
# Проверяем, это S3 ключ или локальный путь
actual_path = file_path
if s3_storage and not file_path.startswith('files/') and not os.path.exists(file_path):
# Это S3 ключ, скачиваем во временный файл
temp_path = await s3_storage.download_to_temp(file_path)
if not temp_path:
logger.error(f"Не удалось скачать файл из S3: {file_path}")
continue
temp_files.append(temp_path)
actual_path = temp_path
elif not os.path.exists(file_path):
logger.error(f"Файл не найден: {file_path}")
continue
file = FSInputFile(path=actual_path)
if content_type == 'video':
media.append(types.InputMediaVideo(media=file))
elif content_type == 'photo':
media.append(types.InputMediaPhoto(media=file))
else:
logger.warning(f"Неизвестный тип файла: {content_type} для {file_path}")
except FileNotFoundError:
logger.error(f"Файл не найден: {file_path_tuple[0]}")
continue
except Exception as e:
logger.error(f"Ошибка при обработке файла {file_path_tuple[0]}: {e}")
continue
logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки")
# Добавляем подпись к последнему файлу
if media:
# Экранируем post_text для безопасного использования в HTML
safe_post_text = html.escape(str(post_text)) if post_text else ""
media[-1].caption = safe_post_text
logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}")
try:
sent_messages = await bot.send_media_group(chat_id=chat_id, media=media)
logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}, количество сообщений: {len(sent_messages)}")
return sent_messages
except Exception as e:
logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}")
raise
finally:
# Удаляем временные файлы
for temp_file in temp_files:
try:
os.remove(temp_file)
except:
pass
@track_time("send_text_message", "helper_func")
@track_errors("helper_func", "send_text_message")
@@ -575,7 +678,7 @@ async def send_text_message(chat_id, message: types.Message, post_text: str, mar
)
sent_message = await send_with_rate_limit(_send_message, chat_id)
return sent_message.message_id
return sent_message
@track_time("send_photo_message", "helper_func")
@track_errors("helper_func", "send_photo_message")

View File

@@ -0,0 +1,175 @@
"""
Сервис для работы с S3 хранилищем.
"""
import aioboto3
import os
import tempfile
from typing import Optional
from pathlib import Path
from logs.custom_logger import logger
class S3StorageService:
"""Сервис для работы с S3 хранилищем."""
def __init__(self, endpoint_url: str, access_key: str, secret_key: str,
bucket_name: str, region: str = "us-east-1"):
self.endpoint_url = endpoint_url
self.access_key = access_key
self.secret_key = secret_key
self.bucket_name = bucket_name
self.region = region
self.session = aioboto3.Session()
async def upload_file(self, file_path: str, s3_key: str,
content_type: Optional[str] = None) -> bool:
"""Загружает файл в S3."""
try:
async with self.session.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region
) as s3:
extra_args = {}
if content_type:
extra_args['ContentType'] = content_type
await s3.upload_file(
file_path,
self.bucket_name,
s3_key,
ExtraArgs=extra_args
)
logger.info(f"Файл загружен в S3: {s3_key}")
return True
except Exception as e:
logger.error(f"Ошибка загрузки файла в S3 {s3_key}: {e}")
return False
async def upload_fileobj(self, file_obj, s3_key: str,
content_type: Optional[str] = None) -> bool:
"""Загружает файл из объекта в S3."""
try:
async with self.session.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region
) as s3:
extra_args = {}
if content_type:
extra_args['ContentType'] = content_type
await s3.upload_fileobj(
file_obj,
self.bucket_name,
s3_key,
ExtraArgs=extra_args
)
logger.info(f"Файл загружен в S3 из объекта: {s3_key}")
return True
except Exception as e:
logger.error(f"Ошибка загрузки файла в S3 из объекта {s3_key}: {e}")
return False
async def download_file(self, s3_key: str, local_path: str) -> bool:
"""Скачивает файл из S3 на локальный диск."""
try:
async with self.session.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region
) as s3:
# Создаем директорию если её нет
os.makedirs(os.path.dirname(local_path), exist_ok=True)
await s3.download_file(
self.bucket_name,
s3_key,
local_path
)
logger.info(f"Файл скачан из S3: {s3_key} -> {local_path}")
return True
except Exception as e:
logger.error(f"Ошибка скачивания файла из S3 {s3_key}: {e}")
return False
async def download_to_temp(self, s3_key: str) -> Optional[str]:
"""Скачивает файл из S3 во временный файл. Возвращает путь к временному файлу."""
try:
# Определяем расширение из ключа
ext = Path(s3_key).suffix or '.bin'
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext)
temp_path = temp_file.name
temp_file.close()
success = await self.download_file(s3_key, temp_path)
if success:
return temp_path
else:
# Удаляем временный файл при ошибке
try:
os.remove(temp_path)
except:
pass
return None
except Exception as e:
logger.error(f"Ошибка скачивания файла из S3 во временный файл {s3_key}: {e}")
return None
async def file_exists(self, s3_key: str) -> bool:
"""Проверяет существование файла в S3."""
try:
async with self.session.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region
) as s3:
await s3.head_object(Bucket=self.bucket_name, Key=s3_key)
return True
except:
return False
async def delete_file(self, s3_key: str) -> bool:
"""Удаляет файл из S3."""
try:
async with self.session.client(
's3',
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
region_name=self.region
) as s3:
await s3.delete_object(Bucket=self.bucket_name, Key=s3_key)
logger.info(f"Файл удален из S3: {s3_key}")
return True
except Exception as e:
logger.error(f"Ошибка удаления файла из S3 {s3_key}: {e}")
return False
def generate_s3_key(self, content_type: str, file_id: str) -> str:
"""Генерирует S3 ключ для файла. Один и тот же для всех постов с этим file_id."""
type_folders = {
'photo': 'photos',
'video': 'videos',
'audio': 'music',
'voice': 'voice',
'video_note': 'video_notes'
}
folder = type_folders.get(content_type, 'other')
# Определяем расширение из file_id или используем дефолтное
ext = '.jpg' if content_type == 'photo' else \
'.mp4' if content_type == 'video' else \
'.mp3' if content_type == 'audio' else \
'.ogg' if content_type == 'voice' else \
'.mp4' if content_type == 'video_note' else '.bin'
return f"{folder}/{file_id}{ext}"