Source code for fedn.network.storage.s3.boto3repository

"""Module implementing Repository for Amazon S3 using boto3."""

import io
import os
from typing import IO, List

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from fedn.common.log_config import logger
from fedn.network.storage.s3.base import RepositoryBase


[docs] class Boto3Repository(RepositoryBase): """Class implementing Repository for Amazon S3 using boto3."""
[docs] def __init__(self, config: dict) -> None: """Initialize object.""" super().__init__() self.name = "Boto3Repository" self.region = os.environ.get("AWS_REGION") or os.environ.get("AWS_DEFAULT_REGION") or config.get("storage_region") or "eu-west-1" common_config = { "use_ssl": config.get("storage_secure_mode", True), "verify": config.get("storage_verify_ssl", True), } access_key = config.get("storage_access_key") secret_key = config.get("storage_secret_key") if access_key and secret_key: self.s3_client = boto3.client( "s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=self.region, endpoint_url=config.get("storage_endpoint", "http://minio:9000"), **common_config, ) else: # Use default credentials (IAM role via service account, environment variables, etc.) self.s3_client = boto3.client("s3", region_name=self.region, **common_config) logger.info(f"Using {self.name} for S3 storage.")
[docs] def set_artifact(self, instance_name: str, instance: IO, bucket: str, is_file: bool = False) -> bool: """Set object with name instance_name. :param instance_name: The name of the object :type instance_name: str :param instance: The object :type instance: Any :param bucket: The bucket name :type bucket: str :param is_file: Whether the instance is a file, defaults to False :type is_file: bool, optional :return: True if the artifact was set successfully :rtype: bool """ try: if is_file: logger.info(f"Uploading file to bucket: {bucket} with key: {instance_name}") self.s3_client.upload_file(Filename=instance, Bucket=bucket, Key=instance_name) else: logger.info(f"Uploading object to bucket: {bucket} with key: {instance_name}") self.s3_client.put_object(Bucket=bucket, Key=instance_name, Body=instance) return True except (BotoCoreError, ClientError) as e: logger.error(f"Failed to upload artifact: {instance_name} to bucket: {bucket}. Error: {e}") raise Exception(f"Could not upload artifact: {e}") from e
[docs] def get_artifact(self, instance_name: str, bucket: str) -> bytes: """Retrieve object with name instance_name. :param instance_name: The name of the object to retrieve :type instance_name: str :param bucket: The bucket name :type bucket: str :return: The retrieved object :rtype: bytes """ try: response = self.s3_client.get_object(Bucket=bucket, Key=instance_name) return response["Body"].read() except (BotoCoreError, ClientError) as e: logger.error(f"Failed to fetch artifact: {instance_name} from bucket: {bucket}. Error: {e}") raise Exception(f"Could not fetch artifact: {e}") from e
[docs] def get_artifact_stream(self, instance_name: str, bucket: str) -> io.BytesIO: """Return a stream handler for object with name instance_name. :param instance_name: The name of the object :type instance_name: str :param bucket: The bucket name :type bucket: str :return: Stream handler for object instance_name :rtype: io.BytesIO """ try: response = self.s3_client.get_object(Bucket=bucket, Key=instance_name) return io.BytesIO(response["Body"].read()) except (BotoCoreError, ClientError) as e: logger.error(f"Failed to fetch artifact stream: {instance_name} from bucket: {bucket}. Error: {e}") raise Exception(f"Could not fetch artifact stream: {e}") from e
[docs] def list_artifacts(self, bucket: str) -> List[str]: """List all objects in bucket. :param bucket: Name of the bucket :type bucket: str :return: A list of object names :rtype: List[str] """ try: response = self.s3_client.list_objects_v2(Bucket=bucket) return [obj["Key"] for obj in response.get("Contents", [])] except (BotoCoreError, ClientError) as e: logger.error(f"Failed to list artifacts in bucket: {bucket}. Error: {e}") raise Exception(f"Could not list artifacts: {e}") from e
[docs] def delete_artifact(self, instance_name: str, bucket: str) -> None: """Delete object with name instance_name from bucket. :param instance_name: The object name :type instance_name: str :param bucket: Bucket to delete from :type bucket: str """ try: self.s3_client.delete_object(Bucket=bucket, Key=instance_name) except (BotoCoreError, ClientError) as e: logger.error(f"Failed to delete artifact: {instance_name} from bucket: {bucket}. Error: {e}") raise Exception(f"Could not delete artifact: {e}") from e
[docs] def create_bucket(self, bucket_name: str) -> None: """Create a new bucket. If bucket exists, do nothing. :param bucket_name: The name of the bucket :type bucket_name: str """ try: if self.region == "us-east-1": self.s3_client.create_bucket(Bucket=bucket_name) else: self.s3_client.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={"LocationConstraint": self.region}) logger.info(f"Bucket {bucket_name} created successfully.") except self.s3_client.exceptions.BucketAlreadyExists: logger.info(f"Bucket {bucket_name} already exists. No new bucket was created.") except self.s3_client.exceptions.BucketAlreadyOwnedByYou: logger.info(f"Bucket {bucket_name} already owned by you. No new bucket was created.") except (BotoCoreError, ClientError) as e: logger.error(f"Failed to create bucket: {bucket_name}. Error: {e}") raise Exception(f"Could not create bucket: {e}") from e