import boto3
from botocore.exceptions import ClientError
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ObjectWrapper:
"""Encapsulates S3 object actions."""
def __init__(self, s3_object):
"""
:param s3_object: A Boto3 Object resource. This is a high-level resource in Boto3
that wraps object actions in a class-like structure.
"""
self.object = s3_object
self.key = self.object.key
def copy(self, dest_object):
"""
Copies the object to another bucket.
:param dest_object: The destination object initialized with a bucket and key.
This is a Boto3 Object resource.
"""
try:
dest_object.copy_from(
CopySource={"Bucket": self.object.bucket_name, "Key": self.object.key}
)
dest_object.wait_until_exists()
logger.info(
"Copied object from %s:%s to %s:%s.",
self.object.bucket_name,
self.object.key,
dest_object.bucket_name,
dest_object.key,
)
except ClientError:
logger.exception(
"Couldn't copy object from %s/%s to %s/%s.",
self.object.bucket_name,
self.object.key,
dest_object.bucket_name,
dest_object.key,
)
raise
def copy_all_objects_between_buckets(source_bucket, dest_bucket):
"""
Copies all objects from the source bucket to the destination bucket.
:param source_bucket: The name of the source S3 bucket.
:param dest_bucket: The name of the destination S3 bucket.
"""
s3_resource = boto3.resource('s3')
source_bucket_obj = s3_resource.Bucket(source_bucket)
for s3_object in source_bucket_obj.objects.all():
source_obj = s3_resource.Object(source_bucket, s3_object.key)
dest_obj = s3_resource.Object(dest_bucket, s3_object.key)
wrapper = ObjectWrapper(source_obj)
wrapper.copy(dest_obj)
if __name__ == "__main__":
# Example usage
source_bucket_name = 'your-source-bucket'
dest_bucket_name = 'your-destination-bucket'
copy_all_objects_between_buckets(source_bucket_name, dest_bucket_name)
Comments