Copy files from one bucket to another

PHOTO EMBED

Wed May 29 2024 13:10:04 GMT+0000 (Coordinated Universal Time)

Saved by @krishna01 ##copy_files_from_s3 ##copy_bucket

import boto3
from botocore.exceptions import ClientError
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ObjectWrapper:
    """Encapsulates S3 object actions."""

    def __init__(self, s3_object):
        """
        :param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3
                          that wraps object actions in a class-like structure.
        """
        self.object = s3_object
        self.key = self.object.key

    def copy(self, dest_object):
        """
        Copies the object to another bucket.

        :param dest_object: The destination object initialized with a bucket and key.
                            This is a Boto3 Object resource.
        """
        try:
            dest_object.copy_from(
                CopySource={"Bucket": self.object.bucket_name, "Key": self.object.key}
            )
            dest_object.wait_until_exists()
            logger.info(
                "Copied object from %s:%s to %s:%s.",
                self.object.bucket_name,
                self.object.key,
                dest_object.bucket_name,
                dest_object.key,
            )
        except ClientError:
            logger.exception(
                "Couldn't copy object from %s/%s to %s/%s.",
                self.object.bucket_name,
                self.object.key,
                dest_object.bucket_name,
                dest_object.key,
            )
            raise

def copy_all_objects_between_buckets(source_bucket, dest_bucket):
    """
    Copies all objects from the source bucket to the destination bucket.

    :param source_bucket: The name of the source S3 bucket.
    :param dest_bucket: The name of the destination S3 bucket.
    """
    s3_resource = boto3.resource('s3')
    source_bucket_obj = s3_resource.Bucket(source_bucket)
    
    for s3_object in source_bucket_obj.objects.all():
        source_obj = s3_resource.Object(source_bucket, s3_object.key)
        dest_obj = s3_resource.Object(dest_bucket, s3_object.key)

        wrapper = ObjectWrapper(source_obj)
        wrapper.copy(dest_obj)

if __name__ == "__main__":
    # Example usage
    source_bucket_name = 'your-source-bucket'
    dest_bucket_name = 'your-destination-bucket'

    copy_all_objects_between_buckets(source_bucket_name, dest_bucket_name)
content_copyCOPY