""" Сервис для работы с S3 хранилищем. """ import aioboto3 import os import tempfile from typing import Optional from pathlib import Path from logs.custom_logger import logger class S3StorageService: """Сервис для работы с S3 хранилищем.""" def __init__(self, endpoint_url: str, access_key: str, secret_key: str, bucket_name: str, region: str = "us-east-1"): self.endpoint_url = endpoint_url self.access_key = access_key self.secret_key = secret_key self.bucket_name = bucket_name self.region = region self.session = aioboto3.Session() async def upload_file(self, file_path: str, s3_key: str, content_type: Optional[str] = None) -> bool: """Загружает файл в S3.""" try: async with self.session.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region ) as s3: extra_args = {} if content_type: extra_args['ContentType'] = content_type await s3.upload_file( file_path, self.bucket_name, s3_key, ExtraArgs=extra_args ) logger.info(f"Файл загружен в S3: {s3_key}") return True except Exception as e: logger.error(f"Ошибка загрузки файла в S3 {s3_key}: {e}") return False async def upload_fileobj(self, file_obj, s3_key: str, content_type: Optional[str] = None) -> bool: """Загружает файл из объекта в S3.""" try: async with self.session.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region ) as s3: extra_args = {} if content_type: extra_args['ContentType'] = content_type await s3.upload_fileobj( file_obj, self.bucket_name, s3_key, ExtraArgs=extra_args ) logger.info(f"Файл загружен в S3 из объекта: {s3_key}") return True except Exception as e: logger.error(f"Ошибка загрузки файла в S3 из объекта {s3_key}: {e}") return False async def download_file(self, s3_key: str, local_path: str) -> bool: """Скачивает файл из S3 на локальный диск.""" try: async with self.session.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region ) as s3: # Создаем директорию если её нет os.makedirs(os.path.dirname(local_path), exist_ok=True) await s3.download_file( self.bucket_name, s3_key, local_path ) logger.info(f"Файл скачан из S3: {s3_key} -> {local_path}") return True except Exception as e: logger.error(f"Ошибка скачивания файла из S3 {s3_key}: {e}") return False async def download_to_temp(self, s3_key: str) -> Optional[str]: """Скачивает файл из S3 во временный файл. Возвращает путь к временному файлу.""" try: # Определяем расширение из ключа ext = Path(s3_key).suffix or '.bin' temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=ext) temp_path = temp_file.name temp_file.close() success = await self.download_file(s3_key, temp_path) if success: return temp_path else: # Удаляем временный файл при ошибке try: os.remove(temp_path) except: pass return None except Exception as e: logger.error(f"Ошибка скачивания файла из S3 во временный файл {s3_key}: {e}") return None async def file_exists(self, s3_key: str) -> bool: """Проверяет существование файла в S3.""" try: async with self.session.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region ) as s3: await s3.head_object(Bucket=self.bucket_name, Key=s3_key) return True except: return False async def delete_file(self, s3_key: str) -> bool: """Удаляет файл из S3.""" try: async with self.session.client( 's3', endpoint_url=self.endpoint_url, aws_access_key_id=self.access_key, aws_secret_access_key=self.secret_key, region_name=self.region ) as s3: await s3.delete_object(Bucket=self.bucket_name, Key=s3_key) logger.info(f"Файл удален из S3: {s3_key}") return True except Exception as e: logger.error(f"Ошибка удаления файла из S3 {s3_key}: {e}") return False def generate_s3_key(self, content_type: str, file_id: str) -> str: """Генерирует S3 ключ для файла. Один и тот же для всех постов с этим file_id.""" type_folders = { 'photo': 'photos', 'video': 'videos', 'audio': 'music', 'voice': 'voice', 'video_note': 'video_notes' } folder = type_folders.get(content_type, 'other') # Определяем расширение из file_id или используем дефолтное ext = '.jpg' if content_type == 'photo' else \ '.mp4' if content_type == 'video' else \ '.mp3' if content_type == 'audio' else \ '.ogg' if content_type == 'voice' else \ '.mp4' if content_type == 'video_note' else '.bin' return f"{folder}/{file_id}{ext}"