Skip to content

[FLINK-39113][s3] Fix s3.sse.kms.encryption-context config in native s3 connector#28070

Open
gaborgsomogyi wants to merge 5 commits intoapache:masterfrom
gaborgsomogyi:FLINK-39579
Open

[FLINK-39113][s3] Fix s3.sse.kms.encryption-context config in native s3 connector#28070
gaborgsomogyi wants to merge 5 commits intoapache:masterfrom
gaborgsomogyi:FLINK-39579

Conversation

@gaborgsomogyi
Copy link
Copy Markdown
Contributor

What is the purpose of the change

s3.sse.kms.encryption-context was listed in the README as a supported configuration option for SSE-KMS, but the corresponding ConfigOption never existed in NativeS3FileSystemFactory. As a result, the encryption context was silently ignored regardless of what users configured, and the sseKms(keyId, context) code path in S3EncryptionConfig was dead code unreachable from production.

This PR closes the gap by adding the missing config option and wiring it end-to-end.

Brief change log

  • Fix: Add SSE_KMS_ENCRYPTION_CONTEXT (s3.sse.kms.encryption-context) ConfigOption to NativeS3FileSystemFactory and wire it through S3EncryptionConfig.fromConfig() — the feature was documented but never implemented
  • Fix: Update docs (README, s3.md EN/ZH) with correct aws:s3:arn encryption context example reflecting real AWS usage
  • Tests: Add unit tests for S3EncryptionConfig, S3ExceptionUtils, S3FileStatus, S3BlockLocation

Verifying this change

Existing and new unit tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: yes

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? docs

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude code

@gaborgsomogyi
Copy link
Copy Markdown
Contributor Author

cc @Samrat002

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 29, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@gaborgsomogyi gaborgsomogyi changed the title [FLINK-39579][s3] Fix s3.sse.kms.encryption-context config in native s3 connector [FLINK-39113][s3] Fix s3.sse.kms.encryption-context config in native s3 connector Apr 29, 2026
Copy link
Copy Markdown
Contributor

@Samrat002 Samrat002 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NativeS3OutputStream.uploadToS3() is a separate write path for small files. It builds a PutObjectRequest but currently doesn't call ssekmsEncryptionContext(). The context wired through NativeS3ObjectOperations.applyEncryption() won't apply to writes that go through NativeS3OutputStream. Could you extend that code path as well?

Comment thread docs/content.zh/docs/deployment/filesystems/s3.md Outdated
@gaborgsomogyi gaborgsomogyi force-pushed the FLINK-39579 branch 2 times, most recently from 69ec88c to cdd8670 Compare April 30, 2026 16:02
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 30, 2026
@gaborgsomogyi gaborgsomogyi requested a review from Samrat002 April 30, 2026 16:44
Copy link
Copy Markdown
Contributor

@Samrat002 Samrat002 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left few comments.

Cheers,
Samrat

}

@VisibleForTesting
@Nullable
Copy link
Copy Markdown
Contributor

@Samrat002 Samrat002 May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@Nullable

Keep the field non-nullable. It must fail fast.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This alone is not enough because there are nullable marker scattered around and we should make that consistent... We need to check if we remove all then what will happen

Comment on lines +111 to +117
* Creates a config for SSE-KMS encryption with the default KMS key and an encryption context.
*
* @param encryptionContext The encryption context key-value pairs
*/
public static S3EncryptionConfig sseKms(Map<String, String> encryptionContext) {
return new S3EncryptionConfig(EncryptionType.SSE_KMS, null, encryptionContext);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Creates a config for SSE-KMS encryption with the default KMS key and an encryption context.
*
* @param encryptionContext The encryption context key-value pairs
*/
public static S3EncryptionConfig sseKms(Map<String, String> encryptionContext) {
return new S3EncryptionConfig(EncryptionType.SSE_KMS, null, encryptionContext);
}

Comment on lines +177 to +179
return kmsKeyId != null && !kmsKeyId.isEmpty()
? sseKms(kmsKeyId, encryptionContext)
: sseKms(encryptionContext);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return kmsKeyId != null && !kmsKeyId.isEmpty()
? sseKms(kmsKeyId, encryptionContext)
: sseKms(encryptionContext);
return sseKms(kmsKeyId, encryptionContext);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be abstracted into a single method. There is no requirement for an overloaded method for sseKms

     public static S3EncryptionConfig sseKms(
             String kmsKeyId, Map<String, String> encryptionContext) {
        return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId, encryptionContext);
        if (kmsKeyId != null && !kmsKeyId.isEmpty()) {
            return new S3EncryptionConfig(EncryptionType.SSE_KMS, kmsKeyId, encryptionContext);
        } else {
            return new S3EncryptionConfig(EncryptionType.SSE_KMS, null, encryptionContext);
        }

@MethodSource
void sseKms_contextOnlyFactory_absentContext_hasEncryptionContextFalse(
Map<String, String> context) {
S3EncryptionConfig c = S3EncryptionConfig.sseKms(context);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
S3EncryptionConfig c = S3EncryptionConfig.sseKms(context);
S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, context);

@Test
void sseKms_contextOnlyFactory_contextMutatedAfterCreation_contextUnchanged() {
Map<String, String> ctx = new HashMap<>(Map.of("dept", "finance"));
S3EncryptionConfig c = S3EncryptionConfig.sseKms(ctx);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
S3EncryptionConfig c = S3EncryptionConfig.sseKms(ctx);
S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, ctx);

@MethodSource
void serializeEncryptionContext_jsonSpecialChars_escapedCorrectly(
String key, String value, String expectedFragment) {
S3EncryptionConfig c = S3EncryptionConfig.sseKms(Map.of(key, value));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
S3EncryptionConfig c = S3EncryptionConfig.sseKms(Map.of(key, value));
S3EncryptionConfig c = S3EncryptionConfig.sseKms(null, Map.of(key, value));

Comment on lines +139 to +143
private static S3Exception s3ExceptionStatusOnly(int statusCode) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
return (S3Exception) b.build();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static S3Exception s3ExceptionStatusOnly(int statusCode) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
return (S3Exception) b.build();
}
private static AwsServiceException s3ExceptionStatusOnly(int statusCode) {
return S3Exception.builder().statusCode(statusCode).build();
}

Comment on lines +130 to +137
private static S3Exception s3ExceptionWithMessageAndDetails(
int statusCode, String message, AwsErrorDetails details) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
b.message(message);
b.awsErrorDetails(details);
return (S3Exception) b.build();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static S3Exception s3ExceptionWithMessageAndDetails(
int statusCode, String message, AwsErrorDetails details) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
b.message(message);
b.awsErrorDetails(details);
return (S3Exception) b.build();
}
private static AwsServiceException s3ExceptionWithMessageAndDetails(
int statusCode, String message, AwsErrorDetails details) {
return S3Exception.builder()
.statusCode(statusCode)
.message(message)
.awsErrorDetails(details)
.build();
}

Comment on lines +123 to +128
private static S3Exception s3Exception(int statusCode, String message) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
b.message(message);
return (S3Exception) b.build();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static S3Exception s3Exception(int statusCode, String message) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
b.message(message);
return (S3Exception) b.build();
}
private static AwsServiceException s3Exception(int statusCode, String message) {
return S3Exception.builder().statusCode(statusCode).message(message).build();
}

Comment on lines +116 to +121
private static S3Exception s3Exception(int statusCode, AwsErrorDetails details) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
b.awsErrorDetails(details);
return (S3Exception) b.build();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static S3Exception s3Exception(int statusCode, AwsErrorDetails details) {
S3Exception.Builder b = S3Exception.builder();
b.statusCode(statusCode);
b.awsErrorDetails(details);
return (S3Exception) b.build();
}
private static AwsServiceException s3Exception(int statusCode, AwsErrorDetails details) {
return S3Exception.builder().statusCode(statusCode).awsErrorDetails(details).build();
}

…ryption context

- Switch fromConfig() normalization to toLowerCase(Locale.ROOT), removing
  the SSE_KMS placeholder case
- Add sseKms(Map) factory overload so encryption context is preserved when
  using the default AWS-managed key
- Revert Map.copyOf() to null-tolerant unmodifiableMap(new HashMap<>())
…eam write path

Move serializeEncryptionContext to S3EncryptionConfig and apply it in
NativeS3OutputStream.uploadToS3(), which previously dropped the KMS
encryption context for small-file (non-multipart) writes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants