Skip to content

S3 Directory Support#3304

Open
jterapin wants to merge 78 commits intoversion-3from
s3-directory-support
Open

S3 Directory Support#3304
jterapin wants to merge 78 commits intoversion-3from
s3-directory-support

Conversation

@jterapin
Copy link
Contributor

@jterapin jterapin commented Oct 13, 2025

Adds directory upload/download to Transfer Manager.

upload_directory

  • Upload all files from a local directory to S3
  • Optional recursive traversal of subdirectories
  • Symlink handling with circular reference detection
  • S3 key prefix support
  • Filter callback to selectively upload files
  • Request callback to modify upload parameters per file
  • Progress callback for transfer monitoring
  • Configurable failure handling (fail-fast or continue on error)

download_directory

  • Download all objects from an S3 bucket/prefix to a local directory
  • S3 prefix stripping for clean local paths
  • Path traversal detection
  • Filter callback to selectively download objects
  • Request callback to modify download parameters per object
  • Progress callback for transfer monitoring
  • Configurable failure handling (fail-fast or continue on error)
  • Automatic directory creation for nested structures

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

  1. To make sure we include your contribution in the release notes, please make sure to add description entry for your changes in the "unreleased changes" section of the CHANGELOG.md file (at corresponding gem). For the description entry, please make sure it lives in one line and starts with Feature or Issue in the correct format.

  2. For generated code changes, please checkout below instructions first:
    https://github.com/aws/aws-sdk-ruby/blob/version-3/CONTRIBUTING.md

Thank you for your contribution!

@github-actions
Copy link

github-actions bot commented Oct 13, 2025

Detected 1 possible performance regressions:

  • aws-sdk-kinesis.gem_size_kb - z-score regression: 81.5 -> 82.0. Z-score: Infinity

@jterapin jterapin marked this pull request as ready for review January 14, 2026 17:30
# * `:completed_downloads` - Number of objects successfully downloaded
# * `:failed_downloads` - Number of objects that failed to download
# * `:errors` - Array of errors for failed downloads (only present when failures occur)
def download_directory(destination, bucket:, **options)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to allow for setting a custom thread for executor here. Same goes for uploader.

request_abort unless opts[:ignore_failure]
end

def process_download_queue(producer, downloader, opts)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a situation where we need to raise, I decided against doing @queue_executor.kill since there might be work in-flight that hangs. It would be best to exit gracefully and the bubbles up to raise within #build_results

raise ArgumentError, 'Invalid directory' unless Dir.exist?(source_directory)

uploader = FileUploader.new(
multipart_threshold: opts.delete(:multipart_threshold),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Self-reminder to documentation this at TransferManager#upload_directory

I need to take a look at other params available on FileUploader and FileDownloader level but I'm concerned about overlapped params if it occurs.

Copy link
Contributor

@alextwoods alextwoods left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice - this is looking good! Great test coverage and documentation. Good thread safety.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - manger mispelled.

end

def validate_path(path, key)
segments = path.split('/')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be File::SEPERATOR rather than / here? I think the path at this point will come from File.join and so would have os seperator? I might be wrong about that though.

autoload :LegacySigner, 'aws-sdk-s3/legacy_signer'

# transfer manager + multipart upload/download utilities
autoload :DefaultExecutor, 'aws-sdk-s3/default_executor'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it looks like default_executor is listed twice here.


attr_reader :client, :executor

def abort_requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be more clear with a ? suffix (ie abort_requested?) to make it clear that its a status check rather than an action.

@mutex.synchronize { @abort_requested }
end

def request_abort
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"request_abort" feels more verbose than just "abort". I think the internal @abort_requested makes sense, but the method's name I think could just be abort. I assume the request_abort is to make it clear that aborting is async, but I think thats implied with abort already. What do you think?

downloads, errors = process_download_queue(producer, downloader, download_opts)
build_result(downloads, errors)
ensure
@abort_requested = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we setting @abort_requested to false here?

def stream_objects(continuation_token: nil)
resp = @client.list_objects_v2(bucket: @bucket, prefix: @s3_prefix, continuation_token: continuation_token)
resp.contents&.each do |o|
break if @directory_downloader.abort_requested
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will block on the mutex - I'm not sure we always need to that here. It is definitely safe to do so, but likely has a performance hit. Ditto I think with the check on line 91:

        begin
          producer.each do |object|
              break if abort_requested

Since we're using a SizedQueue for communicating between threads - maybe we could use clear/close on it rather than constantly blocking on the mutex? If we used close It will cause threads waiting to raise ClosedQueueError which we could catch and handle to detect aborts? I haven't fully thought that out, but I think it might simplify the code and avoid locking as much.

@transferred_bytes += bytes_transferred
@transferred_files += 1

@progress_callback.call(@transferred_bytes, @transferred_files)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why we're calling the progress_callback inside the synchronize block - it does ensure progress is linear for users. However, depending on what they're doing in the callback (things like IO like printing/writing to file, ect) - this could end up being a small bottleneck. I'm not sure how much that matters vs the fully ordered progress callbacks.

@mutex.synchronize { errors << e }
request_abort
end
upload_attempts.times { completion_queue.pop }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance that upload_attemps will ever be larger than the completion queue?

Copy link
Contributor

@richardwang1124 richardwang1124 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Looks pretty good overall. Left a few comments.

# you are responsible for shutting it down when finished.
def initialize(options = {})
@client = options[:client] || Client.new
@executor = options[:executor]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you added DefaultExecutor here? Like

@executor = options[:executor] || DefaultExecutor.new

downloads, errors = process_download_queue(producer, downloader, download_opts)
build_result(downloads, errors)
ensure
@abort_requested = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need @mutex.synchronize?

uploads, errors = process_upload_queue(producer, uploader, upload_opts)
build_result(uploads, errors)
ensure
@abort_requested = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as my previous comment

@mutex.synchronize { errors << e }
request_abort
end
download_attempts.times { completion_queue.pop }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that download_attempts will be less than or equal to completion_queue size?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants