diff --git a/database/async_db.py b/database/async_db.py index 07973eb..b48f689 100644 --- a/database/async_db.py +++ b/database/async_db.py @@ -147,6 +147,22 @@ class AsyncBotDB: """Алиас для get_post_content_from_telegram_by_last_id (используется callback-сервисом).""" return await self.get_post_content_from_telegram_by_last_id(helper_message_id) + async def get_post_content_by_message_id(self, message_id: int) -> List[Tuple[str, str]]: + """Получает контент одиночного поста по message_id.""" + return await self.factory.posts.get_post_content_by_message_id(message_id) + + async def update_published_message_id(self, original_message_id: int, published_message_id: int): + """Обновляет published_message_id для опубликованного поста.""" + await self.factory.posts.update_published_message_id(original_message_id, published_message_id) + + async def add_published_post_content(self, published_message_id: int, content_path: str, content_type: str): + """Добавляет контент опубликованного поста.""" + return await self.factory.posts.add_published_post_content(published_message_id, content_path, content_type) + + async def get_published_post_content(self, published_message_id: int) -> List[Tuple[str, str]]: + """Получает контент опубликованного поста.""" + return await self.factory.posts.get_published_post_content(published_message_id) + async def get_post_text_from_telegram_by_last_id(self, last_post_id: int) -> Optional[str]: """Получает текст поста по helper_text_message_id.""" return await self.factory.posts.get_post_text_by_helper_id(last_post_id) diff --git a/database/repositories/post_repository.py b/database/repositories/post_repository.py index 9c03c3a..fe183ae 100644 --- a/database/repositories/post_repository.py +++ b/database/repositories/post_repository.py @@ -19,11 +19,19 @@ class PostRepository(DatabaseConnection): created_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'suggest', is_anonymous INTEGER, + published_message_id INTEGER, FOREIGN KEY (author_id) REFERENCES our_users (user_id) ON DELETE CASCADE ) ''' await self._execute_query(post_query) + # Добавляем поле published_message_id если его нет (для существующих БД) + try: + await self._execute_query('ALTER TABLE post_from_telegram_suggest ADD COLUMN published_message_id INTEGER') + except Exception: + # Поле уже существует, игнорируем ошибку + pass + # Таблица контента постов content_query = ''' CREATE TABLE IF NOT EXISTS content_post_from_telegram ( @@ -47,6 +55,26 @@ class PostRepository(DatabaseConnection): ''' await self._execute_query(link_query) + # Таблица контента опубликованных постов + published_content_query = ''' + CREATE TABLE IF NOT EXISTS published_post_content ( + published_message_id INTEGER NOT NULL, + content_name TEXT NOT NULL, + content_type TEXT, + published_at INTEGER NOT NULL, + PRIMARY KEY (published_message_id, content_name) + ) + ''' + await self._execute_query(published_content_query) + + # Создаем индексы + try: + await self._execute_query('CREATE INDEX IF NOT EXISTS idx_published_post_content_message_id ON published_post_content(published_message_id)') + await self._execute_query('CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_published ON post_from_telegram_suggest(published_message_id)') + except Exception: + # Индексы уже существуют, игнорируем ошибку + pass + self.logger.info("Таблицы для постов созданы") async def add_post(self, post: TelegramPost) -> None: @@ -174,6 +202,20 @@ class PostRepository(DatabaseConnection): self.logger.info(f"Получен контент поста: {len(post_content)} элементов") return post_content + async def get_post_content_by_message_id(self, message_id: int) -> List[Tuple[str, str]]: + """Получает контент одиночного поста по message_id.""" + query = """ + SELECT cpft.content_name, cpft.content_type + FROM post_from_telegram_suggest pft + JOIN message_link_to_content mltc ON pft.message_id = mltc.post_id + JOIN content_post_from_telegram cpft ON cpft.message_id = mltc.message_id + WHERE pft.message_id = ? AND pft.helper_text_message_id IS NULL + """ + post_content = await self._execute_query_with_result(query, (message_id,)) + + self.logger.info(f"Получен контент одиночного поста: {len(post_content)} элементов для message_id={message_id}") + return post_content + async def get_post_text_by_helper_id(self, helper_message_id: int) -> Optional[str]: """Получает текст поста по helper_text_message_id.""" query = "SELECT text FROM post_from_telegram_suggest WHERE helper_text_message_id = ?" @@ -252,3 +294,40 @@ class PostRepository(DatabaseConnection): self.logger.info(f"Получены текст и is_anonymous для helper_message_id={helper_message_id}") return text, is_anonymous return None, None + + async def update_published_message_id(self, original_message_id: int, published_message_id: int) -> None: + """Обновляет published_message_id для опубликованного поста.""" + query = "UPDATE post_from_telegram_suggest SET published_message_id = ? WHERE message_id = ?" + await self._execute_query(query, (published_message_id, original_message_id)) + self.logger.info(f"Обновлен published_message_id: {original_message_id} -> {published_message_id}") + + async def add_published_post_content( + self, published_message_id: int, content_path: str, content_type: str + ) -> bool: + """Добавляет контент опубликованного поста.""" + try: + from datetime import datetime + published_at = int(datetime.now().timestamp()) + + query = """ + INSERT OR IGNORE INTO published_post_content + (published_message_id, content_name, content_type, published_at) + VALUES (?, ?, ?, ?) + """ + await self._execute_query(query, (published_message_id, content_path, content_type, published_at)) + self.logger.info(f"Добавлен контент опубликованного поста: published_message_id={published_message_id}, type={content_type}") + return True + except Exception as e: + self.logger.error(f"Ошибка при добавлении контента опубликованного поста: {e}") + return False + + async def get_published_post_content(self, published_message_id: int) -> List[Tuple[str, str]]: + """Получает контент опубликованного поста.""" + query = """ + SELECT content_name, content_type + FROM published_post_content + WHERE published_message_id = ? + """ + post_content = await self._execute_query_with_result(query, (published_message_id,)) + self.logger.info(f"Получен контент опубликованного поста: {len(post_content)} элементов для published_message_id={published_message_id}") + return post_content diff --git a/database/schema.sql b/database/schema.sql index da9e9aa..6fd16c5 100644 --- a/database/schema.sql +++ b/database/schema.sql @@ -73,6 +73,7 @@ CREATE TABLE IF NOT EXISTS post_from_telegram_suggest ( created_at INTEGER NOT NULL, status TEXT NOT NULL DEFAULT 'suggest', is_anonymous INTEGER, + published_message_id INTEGER, FOREIGN KEY (author_id) REFERENCES our_users(user_id) ON DELETE CASCADE ); @@ -93,6 +94,15 @@ CREATE TABLE IF NOT EXISTS content_post_from_telegram ( FOREIGN KEY (message_id) REFERENCES post_from_telegram_suggest(message_id) ON DELETE CASCADE ); +-- Content of published posts +CREATE TABLE IF NOT EXISTS published_post_content ( + published_message_id INTEGER NOT NULL, + content_name TEXT NOT NULL, + content_type TEXT, + published_at INTEGER NOT NULL, + PRIMARY KEY (published_message_id, content_name) +); + -- Bot users information (user_id is now PRIMARY KEY) CREATE TABLE IF NOT EXISTS our_users ( user_id INTEGER NOT NULL PRIMARY KEY, @@ -130,3 +140,5 @@ CREATE INDEX IF NOT EXISTS idx_user_messages_date ON user_messages(date); CREATE INDEX IF NOT EXISTS idx_audio_message_reference_date ON audio_message_reference(date_added); CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_date ON post_from_telegram_suggest(created_at); CREATE INDEX IF NOT EXISTS idx_our_users_date_changed ON our_users(date_changed); +CREATE INDEX IF NOT EXISTS idx_published_post_content_message_id ON published_post_content(published_message_id); +CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_published ON post_from_telegram_suggest(published_message_id); diff --git a/env.example b/env.example index bb48ef3..dbab9a9 100644 --- a/env.example +++ b/env.example @@ -12,6 +12,14 @@ IMPORTANT_LOGS=-1001234567890 ARCHIVE=-1001234567890 TEST_GROUP=-1001234567890 +# S3 Storage (для хранения медиафайлов опубликованных постов) +S3_ENABLED=false +S3_ENDPOINT_URL=https://api.s3.ru +S3_ACCESS_KEY=your_s3_access_key_here +S3_SECRET_KEY=your_s3_secret_key_here +S3_BUCKET_NAME=your_s3_bucket_name +S3_REGION=us-east-1 + # Bot Settings PREVIEW_LINK=false LOGS=false diff --git a/helper_bot/handlers/callback/dependency_factory.py b/helper_bot/handlers/callback/dependency_factory.py index ade17fc..d175608 100644 --- a/helper_bot/handlers/callback/dependency_factory.py +++ b/helper_bot/handlers/callback/dependency_factory.py @@ -13,7 +13,8 @@ def get_post_publish_service() -> PostPublishService: db = bdf.get_db() settings = bdf.settings - return PostPublishService(None, db, settings) + s3_storage = bdf.get_s3_storage() + return PostPublishService(None, db, settings, s3_storage) def get_ban_service() -> BanService: diff --git a/helper_bot/handlers/callback/services.py b/helper_bot/handlers/callback/services.py index c8ce08f..1184ab9 100644 --- a/helper_bot/handlers/callback/services.py +++ b/helper_bot/handlers/callback/services.py @@ -3,6 +3,7 @@ import html from typing import Dict, Any from aiogram import Bot +from aiogram import types from aiogram.types import CallbackQuery from helper_bot.utils.helper_func import ( @@ -33,11 +34,12 @@ from helper_bot.utils.metrics import ( class PostPublishService: - def __init__(self, bot: Bot, db, settings: Dict[str, Any]): + def __init__(self, bot: Bot, db, settings: Dict[str, Any], s3_storage=None): # bot может быть None - в этом случае используем бота из контекста сообщения self.bot = bot self.db = db self.settings = settings + self.s3_storage = s3_storage self.group_for_posts = settings['Telegram']['group_for_posts'] self.main_public = settings['Telegram']['main_public'] self.important_logs = settings['Telegram']['important_logs'] @@ -98,9 +100,16 @@ class PostPublishService: # Формируем финальный текст с учетом is_anonymous formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) - await send_text_message(self.main_public, call.message, formatted_text) + sent_message = await send_text_message(self.main_public, call.message, formatted_text) + + # Сохраняем published_message_id + await self.db.update_published_message_id( + original_message_id=call.message.message_id, + published_message_id=sent_message.message_id + ) + await self._delete_post_and_notify_author(call, author_id) - logger.info(f'Текст сообщения опубликован в канале {self.main_public}.') + logger.info(f'Текст сообщение опубликован в канале {self.main_public}, published_message_id={sent_message.message_id}.') @track_time("_publish_photo_post", "post_publish_service") @track_errors("post_publish_service", "_publish_photo_post") @@ -126,9 +135,19 @@ class PostPublishService: # Формируем финальный текст с учетом is_anonymous formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) - await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, formatted_text) + sent_message = await send_photo_message(self.main_public, call.message, call.message.photo[-1].file_id, formatted_text) + + # Сохраняем published_message_id + await self.db.update_published_message_id( + original_message_id=call.message.message_id, + published_message_id=sent_message.message_id + ) + + # Сохраняем медиафайл из опубликованного поста (используем уже сохраненный файл) + await self._save_published_post_content(sent_message, sent_message.message_id, call.message.message_id) + await self._delete_post_and_notify_author(call, author_id) - logger.info(f'Пост с фото опубликован в канале {self.main_public}.') + logger.info(f'Пост с фото опубликован в канале {self.main_public}, published_message_id={sent_message.message_id}.') @track_time("_publish_video_post", "post_publish_service") @track_errors("post_publish_service", "_publish_video_post") @@ -154,9 +173,19 @@ class PostPublishService: # Формируем финальный текст с учетом is_anonymous formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) - await send_video_message(self.main_public, call.message, call.message.video.file_id, formatted_text) + sent_message = await send_video_message(self.main_public, call.message, call.message.video.file_id, formatted_text) + + # Сохраняем published_message_id + await self.db.update_published_message_id( + original_message_id=call.message.message_id, + published_message_id=sent_message.message_id + ) + + # Сохраняем медиафайл из опубликованного поста (используем уже сохраненный файл) + await self._save_published_post_content(sent_message, sent_message.message_id, call.message.message_id) + await self._delete_post_and_notify_author(call, author_id) - logger.info(f'Пост с видео опубликован в канале {self.main_public}.') + logger.info(f'Пост с видео опубликован в канале {self.main_public}, published_message_id={sent_message.message_id}.') @track_time("_publish_video_note_post", "post_publish_service") @track_errors("post_publish_service", "_publish_video_note_post") @@ -169,9 +198,19 @@ class PostPublishService: logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'") raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных") - await send_video_note_message(self.main_public, call.message, call.message.video_note.file_id) + sent_message = await send_video_note_message(self.main_public, call.message, call.message.video_note.file_id) + + # Сохраняем published_message_id + await self.db.update_published_message_id( + original_message_id=call.message.message_id, + published_message_id=sent_message.message_id + ) + + # Сохраняем медиафайл из опубликованного поста (используем уже сохраненный файл) + await self._save_published_post_content(sent_message, sent_message.message_id, call.message.message_id) + await self._delete_post_and_notify_author(call, author_id) - logger.info(f'Пост с кружком опубликован в канале {self.main_public}.') + logger.info(f'Пост с кружком опубликован в канале {self.main_public}, published_message_id={sent_message.message_id}.') @track_time("_publish_audio_post", "post_publish_service") @track_errors("post_publish_service", "_publish_audio_post") @@ -197,9 +236,19 @@ class PostPublishService: # Формируем финальный текст с учетом is_anonymous formatted_text = get_text_message(raw_text, user.first_name, user.username, is_anonymous) - await send_audio_message(self.main_public, call.message, call.message.audio.file_id, formatted_text) + sent_message = await send_audio_message(self.main_public, call.message, call.message.audio.file_id, formatted_text) + + # Сохраняем published_message_id + await self.db.update_published_message_id( + original_message_id=call.message.message_id, + published_message_id=sent_message.message_id + ) + + # Сохраняем медиафайл из опубликованного поста (используем уже сохраненный файл) + await self._save_published_post_content(sent_message, sent_message.message_id, call.message.message_id) + await self._delete_post_and_notify_author(call, author_id) - logger.info(f'Пост с аудио опубликован в канале {self.main_public}.') + logger.info(f'Пост с аудио опубликован в канале {self.main_public}, published_message_id={sent_message.message_id}.') @track_time("_publish_voice_post", "post_publish_service") @track_errors("post_publish_service", "_publish_voice_post") @@ -212,9 +261,19 @@ class PostPublishService: logger.error(f"Не удалось обновить статус поста message_id={call.message.message_id} на 'approved'") raise PostNotFoundError(f"Пост с message_id={call.message.message_id} не найден в базе данных") - await send_voice_message(self.main_public, call.message, call.message.voice.file_id) + sent_message = await send_voice_message(self.main_public, call.message, call.message.voice.file_id) + + # Сохраняем published_message_id + await self.db.update_published_message_id( + original_message_id=call.message.message_id, + published_message_id=sent_message.message_id + ) + + # Сохраняем медиафайл из опубликованного поста (используем уже сохраненный файл) + await self._save_published_post_content(sent_message, sent_message.message_id, call.message.message_id) + await self._delete_post_and_notify_author(call, author_id) - logger.info(f'Пост с войсом опубликован в канале {self.main_public}.') + logger.info(f'Пост с войсом опубликован в канале {self.main_public}, published_message_id={sent_message.message_id}.') @track_time("_publish_media_group", "post_publish_service") @track_errors("post_publish_service", "_publish_media_group") @@ -259,17 +318,36 @@ class PostPublishService: # Отправляем медиагруппу в канал logger.info(f"Отправляю медиагруппу в канал {self.main_public}") - await send_media_group_to_channel( + sent_messages = await send_media_group_to_channel( bot=self._get_bot(call.message), chat_id=self.main_public, post_content=post_content, - post_text=formatted_text + post_text=formatted_text, + s3_storage=self.s3_storage ) + + # Получаем оригинальные message_id из медиагруппы + original_message_ids = await self.db.get_post_ids_from_telegram_by_last_id(helper_message_id) + logger.debug(f"Получены оригинальные message_id медиагруппы: {original_message_ids}") + + # Сохраняем published_message_id для каждого сообщения медиагруппы + if len(sent_messages) == len(original_message_ids): + for i, original_message_id in enumerate(original_message_ids): + published_message_id = sent_messages[i].message_id + await self.db.update_published_message_id( + original_message_id=original_message_id, + published_message_id=published_message_id + ) + # Сохраняем медиафайл из опубликованного сообщения (используем уже сохраненный файл) + await self._save_published_post_content(sent_messages[i], published_message_id, original_message_id) + logger.debug(f"Сохранен published_message_id: {original_message_id} -> {published_message_id}") + else: + logger.warning(f"Количество опубликованных сообщений ({len(sent_messages)}) не совпадает с количеством оригинальных ({len(original_message_ids)})") await self.db.update_status_for_media_group_by_helper_id(helper_message_id, "approved") logger.debug(f"Удаляю медиагруппу и уведомляю автора {author_id}") await self._delete_media_group_and_notify_author(call, author_id) - logger.info(f'Медиагруппа опубликована в канале {self.main_public}.') + logger.info(f'Медиагруппа опубликована в канале {self.main_public}, опубликовано сообщений: {len(sent_messages)}.') except Exception as e: logger.error(f"Ошибка при публикации медиагруппы: {e}") @@ -412,6 +490,34 @@ class PostPublishService: raise UserBlockedBotError("Пользователь заблокировал бота") raise + @track_time("_save_published_post_content", "post_publish_service") + @track_errors("post_publish_service", "_save_published_post_content") + async def _save_published_post_content(self, published_message: types.Message, published_message_id: int, original_message_id: int) -> None: + """Сохраняет ссылку на медиафайл из опубликованного поста (файл уже в S3 или на диске).""" + try: + # Получаем уже сохраненный путь/S3 ключ из оригинального поста + saved_content = await self.db.get_post_content_by_message_id(original_message_id) + + if saved_content and len(saved_content) > 0: + # Копируем тот же путь/S3 ключ + file_path, content_type = saved_content[0] + logger.debug(f"Копируем путь/S3 ключ для опубликованного поста: {file_path}") + + success = await self.db.add_published_post_content( + published_message_id=published_message_id, + content_path=file_path, # Тот же путь/S3 ключ + content_type=content_type + ) + if success: + logger.info(f"Ссылка на файл сохранена для опубликованного поста: published_message_id={published_message_id}, path={file_path}") + else: + logger.warning(f"Не удалось сохранить ссылку на файл: published_message_id={published_message_id}") + else: + logger.warning(f"Контент не найден для оригинального поста message_id={original_message_id}") + except Exception as e: + logger.error(f"Ошибка при сохранении ссылки на контент опубликованного поста {published_message_id}: {e}") + # Не прерываем публикацию, если сохранение контента не удалось + class BanService: def __init__(self, bot: Bot, db, settings: Dict[str, Any]): diff --git a/helper_bot/handlers/private/private_handlers.py b/helper_bot/handlers/private/private_handlers.py index c9ad68d..05a21b9 100644 --- a/helper_bot/handlers/private/private_handlers.py +++ b/helper_bot/handlers/private/private_handlers.py @@ -44,11 +44,11 @@ sleep = asyncio.sleep class PrivateHandlers: """Main handler class for private messages""" - def __init__(self, db: AsyncBotDB, settings: BotSettings): + def __init__(self, db: AsyncBotDB, settings: BotSettings, s3_storage=None): self.db = db self.settings = settings self.user_service = UserService(db, settings) - self.post_service = PostService(db, settings) + self.post_service = PostService(db, settings, s3_storage) self.sticker_service = StickerService(settings) # Create router @@ -224,9 +224,9 @@ class PrivateHandlers: # Factory function to create handlers with dependencies -def create_private_handlers(db: AsyncBotDB, settings: BotSettings) -> PrivateHandlers: +def create_private_handlers(db: AsyncBotDB, settings: BotSettings, s3_storage=None) -> PrivateHandlers: """Create private handlers instance with dependencies""" - return PrivateHandlers(db, settings) + return PrivateHandlers(db, settings, s3_storage) # Legacy router for backward compatibility @@ -252,7 +252,8 @@ def init_legacy_router(): ) db = bdf.get_db() - handlers = create_private_handlers(db, settings) + s3_storage = bdf.get_s3_storage() + handlers = create_private_handlers(db, settings, s3_storage) # Instead of trying to copy handlers, we'll use the new router directly # This maintains backward compatibility while using the new architecture diff --git a/helper_bot/handlers/private/services.py b/helper_bot/handlers/private/services.py index 27bad6b..4341f13 100644 --- a/helper_bot/handlers/private/services.py +++ b/helper_bot/handlers/private/services.py @@ -143,9 +143,19 @@ class UserService: class PostService: """Service for post-related operations""" - def __init__(self, db: DatabaseProtocol, settings: BotSettings) -> None: + def __init__(self, db: DatabaseProtocol, settings: BotSettings, s3_storage=None) -> None: self.db = db self.settings = settings + self.s3_storage = s3_storage + + async def _save_media_background(self, sent_message: types.Message, bot_db: Any, s3_storage) -> None: + """Сохраняет медиа в фоне, чтобы не блокировать ответ пользователю""" + try: + success = await add_in_db_media(sent_message, bot_db, s3_storage) + if not success: + logger.warning(f"_save_media_background: Не удалось сохранить медиа для поста {sent_message.message_id}") + except Exception as e: + logger.error(f"_save_media_background: Ошибка при сохранении медиа для поста {sent_message.message_id}: {e}") @track_time("handle_text_post", "post_service") @track_errors("post_service", "handle_text_post") @@ -155,14 +165,14 @@ class PostService: post_text = get_text_message(message.text.lower(), first_name, message.from_user.username) markup = get_reply_keyboard_for_post() - sent_message_id = await send_text_message(self.settings.group_for_posts, message, post_text, markup) + sent_message = await send_text_message(self.settings.group_for_posts, message, post_text, markup) # Сохраняем сырой текст и определяем анонимность raw_text = message.text or "" is_anonymous = determine_anonymity(raw_text) post = TelegramPost( - message_id=sent_message_id, + message_id=sent_message.message_id, text=raw_text, author_id=message.from_user.id, created_at=int(datetime.now().timestamp()), @@ -196,9 +206,8 @@ class PostService: is_anonymous=is_anonymous ) await self.db.add_post(post) - success = await add_in_db_media(sent_message, self.db) - if not success: - logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}") + # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю + asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage)) @track_time("handle_video_post", "post_service") @track_errors("post_service", "handle_video_post") @@ -226,9 +235,8 @@ class PostService: is_anonymous=is_anonymous ) await self.db.add_post(post) - success = await add_in_db_media(sent_message, self.db) - if not success: - logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}") + # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю + asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage)) @track_time("handle_video_note_post", "post_service") @track_errors("post_service", "handle_video_note_post") @@ -252,9 +260,8 @@ class PostService: is_anonymous=is_anonymous ) await self.db.add_post(post) - success = await add_in_db_media(sent_message, self.db) - if not success: - logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}") + # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю + asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage)) @track_time("handle_audio_post", "post_service") @track_errors("post_service", "handle_audio_post") @@ -282,9 +289,8 @@ class PostService: is_anonymous=is_anonymous ) await self.db.add_post(post) - success = await add_in_db_media(sent_message, self.db) - if not success: - logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}") + # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю + asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage)) @track_time("handle_voice_post", "post_service") @track_errors("post_service", "handle_voice_post") @@ -308,9 +314,8 @@ class PostService: is_anonymous=is_anonymous ) await self.db.add_post(post) - success = await add_in_db_media(sent_message, self.db) - if not success: - logger.warning(f"handle_photo_post: Не удалось сохранить медиа для поста {sent_message.message_id}") + # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю + asyncio.create_task(self._save_media_background(sent_message, self.db, self.s3_storage)) @track_time("handle_media_group_post", "post_service") @track_errors("post_service", "handle_media_group_post") @@ -342,18 +347,18 @@ class PostService: # Отправляем медиагруппу в группу для модерации media_group = await prepare_media_group_from_middlewares(album, post_caption) media_group_message_id = await send_media_group_message_to_private_chat( - self.settings.group_for_posts, message, media_group, self.db, main_post.message_id + self.settings.group_for_posts, message, media_group, self.db, main_post.message_id, self.s3_storage ) await asyncio.sleep(0.2) # Создаем helper сообщение с кнопками markup = get_reply_keyboard_for_post() - help_message_id = await send_text_message(self.settings.group_for_posts, message, "ВРУЧНУЮ ВЫКЛАДЫВАТЬ, ПОСЛЕ ВЫКЛАДКИ УДАЛИТЬ ОБА ПОСТА") + help_message = await send_text_message(self.settings.group_for_posts, message, "ВРУЧНУЮ ВЫКЛАДЫВАТЬ, ПОСЛЕ ВЫКЛАДКИ УДАЛИТЬ ОБА ПОСТА") # Создаем helper пост и связываем его с основным helper_post = TelegramPost( - message_id=help_message_id, # ID helper сообщения + message_id=help_message.message_id, # ID helper сообщения text="^", # Специальный маркер для медиагруппы author_id=message.from_user.id, helper_text_message_id=main_post.message_id, # Ссылка на основной пост @@ -364,7 +369,7 @@ class PostService: # Обновляем основной пост, чтобы он ссылался на helper await self.db.update_helper_message( message_id=main_post.message_id, - helper_message_id=help_message_id + helper_message_id=help_message.message_id ) @track_time("process_post", "post_service") diff --git a/helper_bot/utils/base_dependency_factory.py b/helper_bot/utils/base_dependency_factory.py index 2da61a0..7959b34 100644 --- a/helper_bot/utils/base_dependency_factory.py +++ b/helper_bot/utils/base_dependency_factory.py @@ -1,8 +1,10 @@ import os import sys +from typing import Optional from dotenv import load_dotenv from database.async_db import AsyncBotDB +from helper_bot.utils.s3_storage import S3StorageService class BaseDependencyFactory: @@ -21,6 +23,7 @@ class BaseDependencyFactory: self.database = AsyncBotDB(database_path) self._load_settings_from_env() + self._init_s3_storage() def _load_settings_from_env(self): """Загружает настройки из переменных окружения.""" @@ -48,6 +51,29 @@ class BaseDependencyFactory: 'port': self._parse_int(os.getenv('METRICS_PORT', '8080')) } + self.settings['S3'] = { + 'enabled': self._parse_bool(os.getenv('S3_ENABLED', 'false')), + 'endpoint_url': os.getenv('S3_ENDPOINT_URL', ''), + 'access_key': os.getenv('S3_ACCESS_KEY', ''), + 'secret_key': os.getenv('S3_SECRET_KEY', ''), + 'bucket_name': os.getenv('S3_BUCKET_NAME', ''), + 'region': os.getenv('S3_REGION', 'us-east-1') + } + + def _init_s3_storage(self): + """Инициализирует S3StorageService если S3 включен.""" + self.s3_storage = None + if self.settings['S3']['enabled']: + s3_config = self.settings['S3'] + if s3_config['endpoint_url'] and s3_config['access_key'] and s3_config['secret_key'] and s3_config['bucket_name']: + self.s3_storage = S3StorageService( + endpoint_url=s3_config['endpoint_url'], + access_key=s3_config['access_key'], + secret_key=s3_config['secret_key'], + bucket_name=s3_config['bucket_name'], + region=s3_config['region'] + ) + def _parse_bool(self, value: str) -> bool: """Парсит строковое значение в boolean.""" return value.lower() in ('true', '1', 'yes', 'on') @@ -65,6 +91,10 @@ class BaseDependencyFactory: def get_db(self) -> AsyncBotDB: """Возвращает подключение к базе данных.""" return self.database + + def get_s3_storage(self) -> Optional[S3StorageService]: + """Возвращает S3StorageService если S3 включен, иначе None.""" + return self.s3_storage _global_instance = None diff --git a/helper_bot/utils/helper_func.py b/helper_bot/utils/helper_func.py index 8e3dad7..360c1b7 100644 --- a/helper_bot/utils/helper_func.py +++ b/helper_bot/utils/helper_func.py @@ -2,6 +2,8 @@ import html import os import random import time +import tempfile +import asyncio from datetime import datetime, timedelta from time import sleep from typing import List, Dict, Any, Optional, TYPE_CHECKING, Union @@ -158,17 +160,19 @@ def get_text_message(post_text: str, first_name: str, username: str = None, is_a @track_time("download_file", "helper_func") @track_errors("helper_func", "download_file") @track_file_operations("unknown") -async def download_file(message: types.Message, file_id: str, content_type: str = None) -> Optional[str]: +async def download_file(message: types.Message, file_id: str, content_type: str = None, + s3_storage = None) -> Optional[str]: """ - Скачивает файл по file_id из Telegram и сохраняет в соответствующую папку. + Скачивает файл по file_id из Telegram и сохраняет в S3 или на локальный диск. Args: message: сообщение file_id: File ID файла content_type: тип контента (photo, video, audio, voice, video_note) + s3_storage: опциональный S3StorageService для сохранения в S3 Returns: - Путь к сохраненному файлу, если файл был скачан успешно, иначе None + S3 ключ (если s3_storage указан) или локальный путь к файлу, иначе None """ start_time = time.time() @@ -178,51 +182,95 @@ async def download_file(message: types.Message, file_id: str, content_type: str logger.error("download_file: Неверные параметры - file_id, message или bot отсутствуют") return None - # Определяем папку по типу контента - type_folders = { - 'photo': 'photos', - 'video': 'videos', - 'audio': 'music', - 'voice': 'voice', - 'video_note': 'video_notes' - } - - folder = type_folders.get(content_type, 'other') - base_path = "files" - full_folder_path = os.path.join(base_path, folder) - - # Создаем необходимые папки - os.makedirs(base_path, exist_ok=True) - os.makedirs(full_folder_path, exist_ok=True) - - logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}") - # Получаем информацию о файле file = await message.bot.get_file(file_id) if not file or not file.file_path: logger.error(f"download_file: Не удалось получить информацию о файле {file_id}") return None - # Генерируем уникальное имя файла + # Определяем расширение original_filename = os.path.basename(file.file_path) file_extension = os.path.splitext(original_filename)[1] or '.bin' - safe_filename = f"{file_id}{file_extension}" - file_path = os.path.join(full_folder_path, safe_filename) - # Скачиваем файл - await message.bot.download_file(file_path=file.file_path, destination=file_path) - - # Проверяем, что файл действительно скачался - if not os.path.exists(file_path): - logger.error(f"download_file: Файл не был скачан - {file_path}") - return None - - file_size = os.path.getsize(file_path) - download_time = time.time() - start_time - - logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с") - - return file_path + if s3_storage: + # Сохраняем в S3 + # Скачиваем во временный файл + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=file_extension) + temp_path = temp_file.name + temp_file.close() + + try: + # Скачиваем из Telegram + await message.bot.download_file(file_path=file.file_path, destination=temp_path) + + # Генерируем S3 ключ + s3_key = s3_storage.generate_s3_key(content_type, file_id) + + # Загружаем в S3 + success = await s3_storage.upload_file(temp_path, s3_key) + + # Удаляем временный файл + try: + os.remove(temp_path) + except: + pass + + if success: + file_size = file.file_size if hasattr(file, 'file_size') else 0 + download_time = time.time() - start_time + logger.info(f"download_file: Файл загружен в S3 - {s3_key}, размер: {file_size} байт, время: {download_time:.2f}с") + return s3_key + else: + logger.error(f"download_file: Не удалось загрузить файл в S3: {s3_key}") + return None + except Exception as e: + # Удаляем временный файл при ошибке + try: + os.remove(temp_path) + except: + pass + download_time = time.time() - start_time + logger.error(f"download_file: Ошибка загрузки файла в S3 {file_id}: {e}, время: {download_time:.2f}с") + return None + else: + # Старая логика - сохраняем на локальный диск + # Определяем папку по типу контента + type_folders = { + 'photo': 'photos', + 'video': 'videos', + 'audio': 'music', + 'voice': 'voice', + 'video_note': 'video_notes' + } + + folder = type_folders.get(content_type, 'other') + base_path = "files" + full_folder_path = os.path.join(base_path, folder) + + # Создаем необходимые папки + os.makedirs(base_path, exist_ok=True) + os.makedirs(full_folder_path, exist_ok=True) + + logger.debug(f"download_file: Начинаю скачивание файла {file_id} типа {content_type} в папку {folder}") + + # Генерируем уникальное имя файла + safe_filename = f"{file_id}{file_extension}" + file_path = os.path.join(full_folder_path, safe_filename) + + # Скачиваем файл + await message.bot.download_file(file_path=file.file_path, destination=file_path) + + # Проверяем, что файл действительно скачался + if not os.path.exists(file_path): + logger.error(f"download_file: Файл не был скачан - {file_path}") + return None + + file_size = os.path.getsize(file_path) + download_time = time.time() - start_time + + logger.info(f"download_file: Файл успешно скачан - {file_path}, размер: {file_size} байт, время: {download_time:.2f}с") + + return file_path except Exception as e: download_time = time.time() - start_time @@ -283,11 +331,21 @@ async def prepare_media_group_from_middlewares(album, post_caption: str = ''): return media_group +async def _save_media_group_background(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int], s3_storage) -> None: + """Сохраняет медиагруппу в фоне, чтобы не блокировать ответ пользователю""" + try: + success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id, s3_storage) + if not success: + logger.warning(f"_save_media_group_background: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}") + except Exception as e: + logger.error(f"_save_media_group_background: Ошибка при сохранении медиа для медиагруппы {sent_message[-1].message_id}: {e}") + @track_time("add_in_db_media_mediagroup", "helper_func") @track_errors("helper_func", "add_in_db_media_mediagroup") @track_media_processing("media_group") @db_query_time("add_in_db_media_mediagroup", "posts", "insert") -async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, main_post_id: Optional[int] = None) -> bool: +async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: Any, + main_post_id: Optional[int] = None, s3_storage = None) -> bool: """ Добавляет контент медиа-группы в базу данных @@ -351,23 +409,31 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: logger.debug(f"add_in_db_media_mediagroup: Обрабатываю {content_type} в сообщении {i+1}/{len(sent_message)}") - # Скачиваем файл - file_path = await download_file(message, file_id=file_id, content_type=content_type) + # Получаем s3_storage если не передан + if s3_storage is None: + bdf = get_global_instance() + s3_storage = bdf.get_s3_storage() + + # Скачиваем файл (в S3 или на локальный диск) + file_path = await download_file(message, file_id=file_id, content_type=content_type, s3_storage=s3_storage) if not file_path: logger.error(f"add_in_db_media_mediagroup: Не удалось скачать файл {file_id} в сообщении {i+1}/{len(sent_message)}") failed_count += 1 continue # Добавляем в базу данных - success = await bot_db.add_post_content(post_id, message.message_id, file_path, content_type) + # Для медиагруппы используем post_id (основной пост) как message_id для контента, + # так как FOREIGN KEY требует существования message_id в post_from_telegram_suggest + success = await bot_db.add_post_content(post_id, post_id, file_path, content_type) if not success: logger.error(f"add_in_db_media_mediagroup: Не удалось добавить контент в БД для сообщения {i+1}/{len(sent_message)}") - # Удаляем скачанный файл при ошибке БД - try: - os.remove(file_path) - logger.debug(f"add_in_db_media_mediagroup: Удален файл {file_path} после ошибки БД") - except Exception as e: - logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}") + # Удаляем скачанный файл при ошибке БД (только если это локальный файл, не S3) + if file_path.startswith('files/'): + try: + os.remove(file_path) + logger.debug(f"add_in_db_media_mediagroup: Удален файл {file_path} после ошибки БД") + except Exception as e: + logger.warning(f"add_in_db_media_mediagroup: Не удалось удалить файл {file_path}: {e}") failed_count += 1 continue @@ -402,7 +468,7 @@ async def add_in_db_media_mediagroup(sent_message: List[types.Message], bot_db: @track_media_processing("media_group") @db_query_time("add_in_db_media", "posts", "insert") @track_file_operations("media") -async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool: +async def add_in_db_media(sent_message: types.Message, bot_db: Any, s3_storage = None) -> bool: """ Добавляет контент одиночного сообщения в базу данных @@ -451,8 +517,13 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool: logger.debug(f"add_in_db_media: Обрабатываю {content_type} для сообщения {post_id}") - # Скачиваем файл - file_path = await download_file(sent_message, file_id=file_id, content_type=content_type) + # Получаем s3_storage если не передан + if s3_storage is None: + bdf = get_global_instance() + s3_storage = bdf.get_s3_storage() + + # Скачиваем файл (в S3 или на локальный диск) + file_path = await download_file(sent_message, file_id=file_id, content_type=content_type, s3_storage=s3_storage) if not file_path: logger.error(f"add_in_db_media: Не удалось скачать файл {file_id} для сообщения {post_id}") return False @@ -461,12 +532,13 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool: success = await bot_db.add_post_content(post_id, sent_message.message_id, file_path, content_type) if not success: logger.error(f"add_in_db_media: Не удалось добавить контент в БД для сообщения {post_id}") - # Удаляем скачанный файл при ошибке БД - try: - os.remove(file_path) - logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД") - except Exception as e: - logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}") + # Удаляем скачанный файл при ошибке БД (только если это локальный файл, не S3) + if file_path.startswith('files/'): + try: + os.remove(file_path) + logger.debug(f"add_in_db_media: Удален файл {file_path} после ошибки БД") + except Exception as e: + logger.warning(f"add_in_db_media: Не удалось удалить файл {file_path}: {e}") return False processing_time = time.time() - start_time @@ -484,7 +556,7 @@ async def add_in_db_media(sent_message: types.Message, bot_db: Any) -> bool: @track_media_processing("media_group") @db_query_time("send_media_group_message_to_private_chat", "posts", "insert") async def send_media_group_message_to_private_chat(chat_id: int, message: types.Message, - media_group: List, bot_db: Any, main_post_id: Optional[int] = None) -> int: + media_group: List, bot_db: Any, main_post_id: Optional[int] = None, s3_storage=None) -> int: sent_message = await message.bot.send_media_group( chat_id=chat_id, media=media_group, @@ -496,62 +568,93 @@ async def send_media_group_message_to_private_chat(chat_id: int, message: types. created_at=int(datetime.now().timestamp()) ) await bot_db.add_post(post) - success = await add_in_db_media_mediagroup(sent_message, bot_db, main_post_id) - if not success: - logger.warning(f"send_media_group_message_to_private_chat: Не удалось сохранить медиа для медиагруппы {sent_message[-1].message_id}") + # Сохраняем медиа в фоне, чтобы не блокировать ответ пользователю + asyncio.create_task(_save_media_group_background(sent_message, bot_db, main_post_id, s3_storage)) message_id = sent_message[-1].message_id return message_id @track_time("send_media_group_to_channel", "helper_func") @track_errors("helper_func", "send_media_group_to_channel") @track_media_processing("media_group") -async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str): +async def send_media_group_to_channel(bot, chat_id: int, post_content: List, post_text: str, s3_storage = None): """ Отправляет медиа-группу с подписью к последнему файлу. Args: bot: Экземпляр бота aiogram. chat_id: ID чата для отправки. - post_content: Список кортежей с путями к файлам. + post_content: Список кортежей с путями к файлам (локальные пути или S3 ключи). post_text: Текст подписи. + s3_storage: опциональный S3StorageService для работы с S3. """ logger.info(f"Начинаю отправку медиа-группы в чат {chat_id}, количество файлов: {len(post_content)}") + # Получаем s3_storage если не передан + if s3_storage is None: + bdf = get_global_instance() + s3_storage = bdf.get_s3_storage() + media = [] - for i, file_path in enumerate(post_content): - try: - file = FSInputFile(path=file_path[0]) - type = file_path[1] - logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path[0]} (тип: {type})") - - if type == 'video': - media.append(types.InputMediaVideo(media=file)) - elif type == 'photo': - media.append(types.InputMediaPhoto(media=file)) - else: - logger.warning(f"Неизвестный тип файла: {type} для {file_path[0]}") - except FileNotFoundError: - logger.error(f"Файл не найден: {file_path[0]}") - return - except Exception as e: - logger.error(f"Ошибка при обработке файла {file_path[0]}: {e}") - return - - logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки") - - # Добавляем подпись к последнему файлу - if media: - # Экранируем post_text для безопасного использования в HTML - safe_post_text = html.escape(str(post_text)) if post_text else "" - media[-1].caption = safe_post_text - logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}") - + temp_files = [] # Для хранения путей к временным файлам + try: - await bot.send_media_group(chat_id=chat_id, media=media) - logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}") - except Exception as e: - logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}") - raise + for i, file_path_tuple in enumerate(post_content): + try: + file_path, content_type = file_path_tuple + logger.debug(f"Обрабатываю файл {i+1}/{len(post_content)}: {file_path} (тип: {content_type})") + + # Проверяем, это S3 ключ или локальный путь + actual_path = file_path + if s3_storage and not file_path.startswith('files/') and not os.path.exists(file_path): + # Это S3 ключ, скачиваем во временный файл + temp_path = await s3_storage.download_to_temp(file_path) + if not temp_path: + logger.error(f"Не удалось скачать файл из S3: {file_path}") + continue + temp_files.append(temp_path) + actual_path = temp_path + elif not os.path.exists(file_path): + logger.error(f"Файл не найден: {file_path}") + continue + + file = FSInputFile(path=actual_path) + + if content_type == 'video': + media.append(types.InputMediaVideo(media=file)) + elif content_type == 'photo': + media.append(types.InputMediaPhoto(media=file)) + else: + logger.warning(f"Неизвестный тип файла: {content_type} для {file_path}") + except FileNotFoundError: + logger.error(f"Файл не найден: {file_path_tuple[0]}") + continue + except Exception as e: + logger.error(f"Ошибка при обработке файла {file_path_tuple[0]}: {e}") + continue + + logger.info(f"Подготовлено {len(media)} медиа-файлов для отправки") + + # Добавляем подпись к последнему файлу + if media: + # Экранируем post_text для безопасного использования в HTML + safe_post_text = html.escape(str(post_text)) if post_text else "" + media[-1].caption = safe_post_text + logger.debug(f"Добавлена подпись к последнему файлу: {safe_post_text[:50]}{'...' if len(safe_post_text) > 50 else ''}") + + try: + sent_messages = await bot.send_media_group(chat_id=chat_id, media=media) + logger.info(f"Медиа-группа успешно отправлена в чат {chat_id}, количество сообщений: {len(sent_messages)}") + return sent_messages + except Exception as e: + logger.error(f"Ошибка при отправке медиа-группы в чат {chat_id}: {e}") + raise + finally: + # Удаляем временные файлы + for temp_file in temp_files: + try: + os.remove(temp_file) + except: + pass @track_time("send_text_message", "helper_func") @track_errors("helper_func", "send_text_message") @@ -575,7 +678,7 @@ async def send_text_message(chat_id, message: types.Message, post_text: str, mar ) sent_message = await send_with_rate_limit(_send_message, chat_id) - return sent_message.message_id + return sent_message @track_time("send_photo_message", "helper_func") @track_errors("helper_func", "send_photo_message") diff --git a/helper_bot/utils/s3_storage.py b/helper_bot/utils/s3_storage.py new file mode 100644 index 0000000..090fa61 --- /dev/null +++ b/helper_bot/utils/s3_storage.py @@ -0,0 +1,175 @@ +""" +Сервис для работы с S3 хранилищем. +""" +import aioboto3 +import os +import tempfile +from typing import Optional +from pathlib import Path +from logs.custom_logger import logger + + +class S3StorageService: + """Сервис для работы с S3 хранилищем.""" + + def __init__(self, endpoint_url: str, access_key: str, secret_key: str, + bucket_name: str, region: str = "us-east-1"): + self.endpoint_url = endpoint_url + self.access_key = access_key + self.secret_key = secret_key + self.bucket_name = bucket_name + self.region = region + self.session = aioboto3.Session() + + async def upload_file(self, file_path: str, s3_key: str, + content_type: Optional[str] = None) -> bool: + """Загружает файл в S3.""" + try: + async with self.session.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + region_name=self.region + ) as s3: + extra_args = {} + if content_type: + extra_args['ContentType'] = content_type + + await s3.upload_file( + file_path, + self.bucket_name, + s3_key, + ExtraArgs=extra_args + ) + logger.info(f"Файл загружен в S3: {s3_key}") + return True + except Exception as e: + logger.error(f"Ошибка загрузки файла в S3 {s3_key}: {e}") + return False + + async def upload_fileobj(self, file_obj, s3_key: str, + content_type: Optional[str] = None) -> bool: + """Загружает файл из объекта в S3.""" + try: + async with self.session.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + region_name=self.region + ) as s3: + extra_args = {} + if content_type: + extra_args['ContentType'] = content_type + + await s3.upload_fileobj( + file_obj, + self.bucket_name, + s3_key, + ExtraArgs=extra_args + ) + logger.info(f"Файл загружен в S3 из объекта: {s3_key}") + return True + except Exception as e: + logger.error(f"Ошибка загрузки файла в S3 из объекта {s3_key}: {e}") + return False + + async def download_file(self, s3_key: str, local_path: str) -> bool: + """Скачивает файл из S3 на локальный диск.""" + try: + async with self.session.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + region_name=self.region + ) as s3: + # Создаем директорию если её нет + os.makedirs(os.path.dirname(local_path), exist_ok=True) + + await s3.download_file( + self.bucket_name, + s3_key, + local_path + ) + logger.info(f"Файл скачан из S3: {s3_key} -> {local_path}") + return True + except Exception as e: + logger.error(f"Ошибка скачивания файла из S3 {s3_key}: {e}") + return False + + async def download_to_temp(self, s3_key: str) -> Optional[str]: + """Скачивает файл из S3 во временный файл. Возвращает путь к временному файлу.""" + try: + # Определяем расширение из ключа + ext = Path(s3_key).suffix or '.bin' + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) + temp_path = temp_file.name + temp_file.close() + + success = await self.download_file(s3_key, temp_path) + if success: + return temp_path + else: + # Удаляем временный файл при ошибке + try: + os.remove(temp_path) + except: + pass + return None + except Exception as e: + logger.error(f"Ошибка скачивания файла из S3 во временный файл {s3_key}: {e}") + return None + + async def file_exists(self, s3_key: str) -> bool: + """Проверяет существование файла в S3.""" + try: + async with self.session.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + region_name=self.region + ) as s3: + await s3.head_object(Bucket=self.bucket_name, Key=s3_key) + return True + except: + return False + + async def delete_file(self, s3_key: str) -> bool: + """Удаляет файл из S3.""" + try: + async with self.session.client( + 's3', + endpoint_url=self.endpoint_url, + aws_access_key_id=self.access_key, + aws_secret_access_key=self.secret_key, + region_name=self.region + ) as s3: + await s3.delete_object(Bucket=self.bucket_name, Key=s3_key) + logger.info(f"Файл удален из S3: {s3_key}") + return True + except Exception as e: + logger.error(f"Ошибка удаления файла из S3 {s3_key}: {e}") + return False + + def generate_s3_key(self, content_type: str, file_id: str) -> str: + """Генерирует S3 ключ для файла. Один и тот же для всех постов с этим file_id.""" + type_folders = { + 'photo': 'photos', + 'video': 'videos', + 'audio': 'music', + 'voice': 'voice', + 'video_note': 'video_notes' + } + + folder = type_folders.get(content_type, 'other') + # Определяем расширение из file_id или используем дефолтное + ext = '.jpg' if content_type == 'photo' else \ + '.mp4' if content_type == 'video' else \ + '.mp3' if content_type == 'audio' else \ + '.ogg' if content_type == 'voice' else \ + '.mp4' if content_type == 'video_note' else '.bin' + + return f"{folder}/{file_id}{ext}" diff --git a/requirements.txt b/requirements.txt index 505c65c..4efef4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,4 +27,7 @@ charset-normalizer>=3.0.0 pluggy==1.5.0 attrs~=23.2.0 typing_extensions~=4.12.2 -emoji~=2.8.0 \ No newline at end of file +emoji~=2.8.0 + +# S3 Storage (для хранения медиафайлов опубликованных постов) +aioboto3>=12.0.0 \ No newline at end of file diff --git a/scripts/add_published_posts_support.py b/scripts/add_published_posts_support.py new file mode 100755 index 0000000..341c1e6 --- /dev/null +++ b/scripts/add_published_posts_support.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +""" +Скрипт миграции для добавления поддержки опубликованных постов: +1. Добавляет колонку published_message_id в таблицу post_from_telegram_suggest +2. Создает таблицу published_post_content для хранения медиафайлов опубликованных постов +3. Создает индексы для производительности +""" +import argparse +import asyncio +import os +import sys +from pathlib import Path + +project_root = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(project_root)) + +import aiosqlite + +from logs.custom_logger import logger + +DEFAULT_DB_PATH = "database/tg-bot-database.db" + + +def _column_exists(rows: list, name: str) -> bool: + """Проверяет существование колонки в таблице. + PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk).""" + for row in rows: + if row[1] == name: + return True + return False + + + + +async def main(db_path: str, dry_run: bool = False) -> None: + """Выполняет миграцию БД для поддержки опубликованных постов.""" + db_path = os.path.abspath(db_path) + if not os.path.exists(db_path): + logger.error("База данных не найдена: %s", db_path) + print(f"Ошибка: база данных не найдена: {db_path}") + return + + async with aiosqlite.connect(db_path) as conn: + await conn.execute("PRAGMA foreign_keys = ON") + + changes_made = [] + + # 1. Проверяем и добавляем колонку published_message_id + cursor = await conn.execute( + "PRAGMA table_info(post_from_telegram_suggest)" + ) + rows = await cursor.fetchall() + await cursor.close() + + if not _column_exists(rows, "published_message_id"): + if dry_run: + print("DRY RUN: Будет добавлена колонка published_message_id в post_from_telegram_suggest") + changes_made.append("Добавление колонки published_message_id") + else: + logger.info("Добавление колонки published_message_id в post_from_telegram_suggest") + await conn.execute( + "ALTER TABLE post_from_telegram_suggest " + "ADD COLUMN published_message_id INTEGER" + ) + await conn.commit() + print("✓ Колонка published_message_id добавлена в post_from_telegram_suggest") + changes_made.append("Добавлена колонка published_message_id") + else: + print("✓ Колонка published_message_id уже существует в post_from_telegram_suggest") + + # 2. Проверяем и создаем таблицу published_post_content + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='published_post_content'" + ) + table_exists = await cursor.fetchone() + await cursor.close() + + if not table_exists: + if dry_run: + print("DRY RUN: Будет создана таблица published_post_content") + changes_made.append("Создание таблицы published_post_content") + else: + logger.info("Создание таблицы published_post_content") + await conn.execute(""" + CREATE TABLE IF NOT EXISTS published_post_content ( + published_message_id INTEGER NOT NULL, + content_name TEXT NOT NULL, + content_type TEXT, + published_at INTEGER NOT NULL, + PRIMARY KEY (published_message_id, content_name) + ) + """) + await conn.commit() + print("✓ Таблица published_post_content создана") + changes_made.append("Создана таблица published_post_content") + else: + print("✓ Таблица published_post_content уже существует") + + # 3. Проверяем и создаем индексы + indexes = [ + ("idx_published_post_content_message_id", + "CREATE INDEX IF NOT EXISTS idx_published_post_content_message_id " + "ON published_post_content(published_message_id)"), + ("idx_post_from_telegram_suggest_published", + "CREATE INDEX IF NOT EXISTS idx_post_from_telegram_suggest_published " + "ON post_from_telegram_suggest(published_message_id)") + ] + + for index_name, index_sql in indexes: + cursor = await conn.execute( + "SELECT name FROM sqlite_master WHERE type='index' AND name=?", + (index_name,) + ) + index_exists = await cursor.fetchone() + await cursor.close() + + if not index_exists: + if dry_run: + print(f"DRY RUN: Будет создан индекс {index_name}") + changes_made.append(f"Создание индекса {index_name}") + else: + logger.info(f"Создание индекса {index_name}") + await conn.execute(index_sql) + await conn.commit() + print(f"✓ Индекс {index_name} создан") + changes_made.append(f"Создан индекс {index_name}") + else: + print(f"✓ Индекс {index_name} уже существует") + + # Финальная статистика + if dry_run: + if changes_made: + print("\n" + "="*60) + print("DRY RUN: Следующие изменения будут выполнены:") + for change in changes_made: + print(f" - {change}") + print("="*60) + else: + print("\n✓ Все необходимые изменения уже применены. Ничего делать не нужно.") + else: + if changes_made: + logger.info(f"Миграция завершена. Выполнено изменений: {len(changes_made)}") + print(f"\n✓ Миграция завершена успешно!") + print(f"Выполнено изменений: {len(changes_made)}") + for change in changes_made: + print(f" - {change}") + else: + print("\n✓ Все необходимые изменения уже применены. Ничего делать не нужно.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Добавление поддержки опубликованных постов в БД" + ) + parser.add_argument( + "--db", + default=os.environ.get("DB_PATH", DEFAULT_DB_PATH), + help="Путь к БД (или переменная окружения DB_PATH)", + ) + parser.add_argument( + "--dry-run", + action="store_true", + help="Показать что будет сделано без выполнения изменений", + ) + args = parser.parse_args() + asyncio.run(main(args.db, dry_run=args.dry_run)) diff --git a/scripts/test_s3_connection.py b/scripts/test_s3_connection.py new file mode 100755 index 0000000..fe37fc7 --- /dev/null +++ b/scripts/test_s3_connection.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +""" +Скрипт для проверки подключения к S3 хранилищу. +Читает настройки из .env файла или переменных окружения. +""" +import asyncio +import os +import sys +from pathlib import Path + +project_root = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(project_root)) + +# Загружаем .env файл +from dotenv import load_dotenv +env_path = os.path.join(project_root, '.env') +if os.path.exists(env_path): + load_dotenv(env_path) + +try: + import aioboto3 +except ImportError: + print("❌ Библиотека aioboto3 не установлена.") + print("Установите её командой: pip install aioboto3") + sys.exit(1) + +# Данные для подключения из .env или переменных окружения +S3_ACCESS_KEY = os.getenv('S3_ACCESS_KEY', 'j3tears100@gmail.com') +S3_SECRET_KEY = os.getenv('S3_SECRET_KEY', 'wQ1-6sZEPs92sbZTSf96') +S3_ENDPOINT_URL = os.getenv('S3_ENDPOINT_URL', 'https://api.s3.miran.ru:443') +S3_BUCKET_NAME = os.getenv('S3_BUCKET_NAME', 'telegram-helper-bot') +S3_REGION = os.getenv('S3_REGION', 'us-east-1') + +async def test_s3_connection(): + """Тестирует подключение к S3 хранилищу.""" + print("🔍 Тестирование подключения к S3 хранилищу...") + print(f"Endpoint: {S3_ENDPOINT_URL}") + print(f"Bucket: {S3_BUCKET_NAME}") + print(f"Region: {S3_REGION}") + print(f"Access Key: {S3_ACCESS_KEY}") + print() + + session = aioboto3.Session() + + try: + async with session.client( + 's3', + endpoint_url=S3_ENDPOINT_URL, + aws_access_key_id=S3_ACCESS_KEY, + aws_secret_access_key=S3_SECRET_KEY, + region_name=S3_REGION + ) as s3: + # Пытаемся получить список бакетов (может не иметь прав, пропускаем если ошибка) + print("📦 Получение списка бакетов...") + try: + response = await s3.list_buckets() + buckets = response.get('Buckets', []) + print(f"✅ Подключение успешно! Найдено бакетов: {len(buckets)}") + + if buckets: + print("\n📋 Список бакетов:") + for bucket in buckets: + print(f" - {bucket['Name']} (создан: {bucket.get('CreationDate', 'неизвестно')})") + else: + print("\n⚠️ Бакеты не найдены.") + except Exception as list_error: + print(f"⚠️ Не удалось получить список бакетов: {list_error}") + print(" Это нормально, если нет прав на list_buckets") + print(" Продолжаем тестирование с указанным бакетом...") + + # Пытаемся создать тестовый файл в указанном бакете + print("\n🧪 Тестирование записи файла...") + # Используем первый найденный бакет, если указанный не найден + test_bucket = S3_BUCKET_NAME + if buckets: + # Проверяем, есть ли указанный бакет в списке + bucket_names = [b['Name'] for b in buckets] + if test_bucket not in bucket_names: + print(f"⚠️ Бакет '{test_bucket}' не найден в списке.") + print(f" Используем первый найденный бакет: '{buckets[0]['Name']}'") + test_bucket = buckets[0]['Name'] + + test_key = 'test-connection.txt' + test_content = b'Test connection to S3 storage' + + try: + # Проверяем существование бакета + try: + await s3.head_bucket(Bucket=test_bucket) + print(f"✅ Бакет '{test_bucket}' существует и доступен") + except Exception as head_error: + print(f"❌ Бакет '{test_bucket}' недоступен: {head_error}") + print(" Проверьте права доступа к бакету") + return False + + await s3.put_object( + Bucket=test_bucket, + Key=test_key, + Body=test_content + ) + print(f"✅ Файл успешно записан в бакет '{test_bucket}' с ключом '{test_key}'") + + # Пытаемся прочитать файл + print("🧪 Тестирование чтения файла...") + response = await s3.get_object(Bucket=test_bucket, Key=test_key) + content = await response['Body'].read() + + if content == test_content: + print("✅ Файл успешно прочитан, содержимое совпадает") + else: + print("⚠️ Файл прочитан, но содержимое не совпадает") + + # Удаляем тестовый файл + print("🧹 Удаление тестового файла...") + await s3.delete_object(Bucket=test_bucket, Key=test_key) + print("✅ Тестовый файл удален") + + except Exception as e: + print(f"❌ Ошибка при тестировании записи/чтения: {e}") + print(f" Тип ошибки: {type(e).__name__}") + import traceback + print(f" Полный traceback:") + traceback.print_exc() + print("\nВозможные причины:") + print(" 1. Неверное имя бакета") + print(" 2. Нет прав на запись в бакет") + print(" 3. Неверный endpoint URL или регион") + print(" 4. Проблемы с форматом endpoint (попробуйте без :443)") + + return True + + except Exception as e: + print(f"❌ Ошибка подключения к S3: {e}") + print("\nВозможные причины:") + print(" 1. Неверные credentials (Access Key / Secret Key)") + print(" 2. Неверный endpoint URL") + print(" 3. Проблемы с сетью") + print(" 4. Неверный регион (попробуйте изменить region_name)") + return False + + +if __name__ == "__main__": + result = asyncio.run(test_s3_connection()) + sys.exit(0 if result else 1)