Skip to content

KAFKA-20249: Optimize raw value extraction in headers-aware deserializers#21706

Open
zheguang wants to merge 13 commits intoapache:trunkfrom
zheguang:zheguang-KAFKA-20249
Open

KAFKA-20249: Optimize raw value extraction in headers-aware deserializers#21706
zheguang wants to merge 13 commits intoapache:trunkfrom
zheguang:zheguang-KAFKA-20249

Conversation

@zheguang
Copy link
Contributor

@zheguang zheguang commented Mar 11, 2026

This patch implements two optimizations, and their JMH benchmarks, and
opportunistic refactoring.

  1. Skipping
    Previously the raw value extraction in headers-aware deserializers
    undergoes deserialization and/or copying of headers, while only skipping
    is required. This happens for both empty and nonempty headers.

  2. Empty headers copying
    Empty headers have constant metadata footprint: the headers size is
    varint-encoded 1 byte of 0, and headers themselves consume no bytes.
    Based on this invariant, the ByteBuffer-based extraction can be replaced
    with a direct System.arraycopy, which is a Java native method
    optimized for specific platforms.

The optimized headers-aware extraction methods:

  • rawAggregation
  • rawTimestampedValue
  • rawValue / rawPlainValue

Benchmark:
This patch also includes JMH benchmarks to test the speedup. On my
local machine, Optimization 1 speedup is 2-6x speedup. Optimization 2
is 1.2-1.3x.

Below is the throughput comparison of a recorded JMH run (higher score
is better):

Benchmark
Mode  Cnt      Score      Error  Units
RawBytesExtractionBenchmark.testHeadersWithoutHeaders
thrpt   15  10158.764 ±   85.564  ops/s
RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt
thrpt   15  14824.176 ± 1244.455  ops/s

RawBytesExtractionBenchmark.testRawAggregationWithHeaders
thrpt   15   1473.459 ±    7.170  ops/s
RawBytesExtractionBenchmark.testRawAggregationWithHeadersOpt
thrpt   15  11618.187 ±  235.385  ops/s

RawBytesExtractionBenchmark.testRawAggregationWithoutHeaders
thrpt   15   8337.728 ±  199.919  ops/s
RawBytesExtractionBenchmark.testRawAggregationWithoutHeadersOpt
thrpt   15  14564.899 ±  186.405  ops/s

RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeaders
thrpt   15  10217.292 ±  108.552  ops/s
RawBytesExtractionBenchmark.testRawTimestampedValueWithoutHeadersOpt
thrpt   15  12121.074 ±  201.235  ops/s

RawBytesExtractionBenchmark.testRawValueWithoutHeaders
thrpt   15  11632.484 ±  138.505  ops/s
RawBytesExtractionBenchmark.testRawValueWithoutHeadersOpt
thrpt   15  14669.563 ±   43.458  ops/s

RawBytesExtractionBenchmark.testTimestampWithoutHeaders
thrpt   15  14858.778 ±   39.301  ops/s
RawBytesExtractionBenchmark.testTimestampWithoutHeadersOpt
thrpt   15  19832.718 ±  916.980  ops/s
JMH benchmarks done

Test:

  • AggregationWithHeadersDeserializer.rawAggregate
    • empty headers:
      SessionToHeadersStoreAdapterTest.shouldStripHeadersFromRawAggregationValue
  • Utils.rawPlainValue
    • empty headers: UtilsTest.shouldExtractRawValueWithEmptyHeaders
    • empty headers, no timestamp:
      UtilsTest.testRawPlainValueWithEmptyHeadersAndInvalidTimestamp
  • Utils.rawTimestampedValue
    • empty headers: UtilsTest.testRawTimestampedValueWithEmptyHeaders
    • empty headers, no timestamp:
      UtilsTest.testRawTimestampedValueWithEmptyHeadersAndInvalidTimestamp

Refactor

  • point all calls to raw value (with timestamp and headers) extraction
    to common one in Utils.

Benchmark                                                     Mode  Cnt      Score     Error  Units
RawBytesExtraction.testRawAggregationWithoutHeaders          thrpt   15   7850.891 ± 307.428  ops/s
RawBytesExtraction.testRawAggregationWithoutHeadersFastPath  thrpt   15  14957.556 ± 517.450  ops/s
Benchmark                                                     Mode  Cnt      Score     Error  Units
RawBytesExtraction.testRawAggregationWithHeaders             thrpt   15   1411.338 ± 110.527  ops/s
RawBytesExtraction.testRawAggregationWithHeadersFastPath     thrpt   15   6106.665 ± 218.032  ops/s
RawBytesExtraction.testRawAggregationWithoutHeaders          thrpt   15   7734.538 ± 525.487  ops/s
RawBytesExtraction.testRawAggregationWithoutHeadersFastPath  thrpt   15  14300.408 ± 212.519  ops/s
Benchmark                                                Mode  Cnt      Score     Error  Units
RawBytesExtraction.testRawAggregationWithHeaders        thrpt   15   1481.854 ±  31.448  ops/s
RawBytesExtraction.testRawAggregationWithHeadersOpt     thrpt   15  11797.165 ± 103.432  ops/s

RawBytesExtraction.testRawAggregationWithoutHeaders     thrpt   15   8359.080 ±  47.918  ops/s
RawBytesExtraction.testRawAggregationWithoutHeadersOpt  thrpt   15  15298.827 ± 452.741  ops/s

RawBytesExtraction.testRawValueWithoutHeaders           thrpt   15  11329.997 ± 260.399  ops/s
RawBytesExtraction.testRawValueWithoutHeadersOpt        thrpt   15  15372.816 ± 184.651  ops/s
@github-actions github-actions bot added triage PRs from the community streams performance labels Mar 11, 2026
@zheguang
Copy link
Contributor Author

zheguang commented Mar 12, 2026

Hi @aliehsaeedii - if you could please have a quick look and see if this approach is headed in the right direction? Thanks!

Copy link
Contributor

@aliehsaeedii aliehsaeedii left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zheguang for the PR. Could we apply this optimization (does that make sense to apply it) in HeadersDeserializer, TimestampedToHeadersWindowStoreAdapter, and HeadersBytesStore classes?
Please add more utests so that all changes are tested.

* This is used by KIP-1271 to deserialize aggregations with headers from session state stores.
*/
class AggregationWithHeadersDeserializer<AGG> implements WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
public class AggregationWithHeadersDeserializer<AGG> implements WrappingNullableDeserializer<AggregationWithHeaders<AGG>, Void, AGG> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume you make them public to be used in jmh testing?!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right. It's a bit awkward to broaden this deserializer's scope, tbh. One option is to move those benchmarked methods out of this deserializer, into Utils, and instead only broaden the scope of Utils... Would this be preferred? Let me know.

Copy link
Contributor

@aliehsaeedii aliehsaeedii Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if we need to keep the jmh benchmarks!


final ByteBuffer buffer = ByteBuffer.wrap(aggregationWithHeaders);
readHeaders(buffer);
// Skip the headers bytes without deserizization or copying
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Skip the headers bytes without deserizization or copying
// Skip the headers bytes without deserialization or copying

return result;
}

private static boolean hasEmptyHeadersAndTimestamp(final byte[] rawValueTimestampHeaders) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should it be hasEmptyHeaders only? We don't check empty ts!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actualy... it does check a little something about the timestamp -- that the input is at least longer than the timestamp size. Line 64 is the relevant bit:

if (rawValueTimestampHeaders.length - 1 - StateSerdes.TIMESTAMP_SIZE < 0)

final ByteBuffer buf = ByteBuffer.wrap(data);
buf.put((byte) 0x00); // header size
buf.putLong(TIMESTAMP);
buf.put(VALUE); // non-header payload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
buf.put(VALUE); // non-header payload
buf.put(VALUE); // plain value

}

private static Headers readHeaders(final ByteBuffer buffer) {
public static Headers readHeaders(final ByteBuffer buffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we optimize readHeaders as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, fast path for empty headers gives 1.5x speedup on my local machine

Benchmark                                                              Mode  Cnt      Score     Error  Units
RawBytesExtractionBenchmark.testHeadersWithoutHeaders                 thrpt   15  10198.854 ±  62.216  ops/s
RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt              thrpt   15  15852.852 ±  47.469  ops/s

I'll add this change to this PR.

Copy link
Contributor Author

@zheguang zheguang Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean headers() (which calls readHeaders)? Yes I can see a speedup too:

RawBytesExtractionBenchmark.testHeadersWithoutHeaders                 thrpt   15  10158.764 ±   85.564  ops/s
RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt              thrpt   15  14824.176 ± 1244.455  ops/s

I will make this change in this PR.

public static Headers readHeaders(final ByteBuffer buffer) {
final int headersSize = ByteUtils.readVarint(buffer);
final byte[] rawHeaders = readBytes(buffer, headersSize);
return HeadersDeserializer.deserialize(rawHeaders);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it makes sense to do the if (rawAggregationWithHeaders.length > 0 && rawAggregationWithHeaders[0] == 0x00) { in static Headers headers(final byte[] rawAggregationWithHeaders) as well!

Copy link
Contributor Author

@zheguang zheguang Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... not sure -- for empty headers there is a fast (enough?) path in HeadersDeserializer.deserialize:

// in HeadersDeserializer
    public static Headers deserialize(final byte[] data) {
        if (data == null || data.length == 0) {
            return new RecordHeaders();
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll find out in the context of KAFKA-20303 to be sure though.

final int headersSize = ByteUtils.readVarint(buffer);
buffer.position(buffer.position() + headersSize + Long.BYTES);
return readBytes(buffer, buffer.remaining());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that make sense to apply the same optimization in the methods of the class such as headers(), value(), and deserialize()?

Copy link
Contributor Author

@zheguang zheguang Mar 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great pointer. value() indeed can just call rawPlainValue(). So I just added this change in this PR.

For the others, let me find out in the context of KAFKA-20303

I made the change in this PR anyways to find out ... yes there is a speedup to headers and timestsamp for empty headers:

RawBytesExtractionBenchmark.testHeadersWithoutHeaders                 thrpt   15  10158.764 ±   85.564  ops/s
RawBytesExtractionBenchmark.testHeadersWithoutHeadersOpt              thrpt   15  14824.176 ± 1244.455  ops/s

RawBytesExtractionBenchmark.testTimestampWithoutHeaders               thrpt   15  14858.778 ±   39.301  ops/s
RawBytesExtractionBenchmark.testTimestampWithoutHeadersOpt            thrpt   15  19832.718 ±  916.980  ops/s

if (hasEmptyHeadersAndTimestamp(rawValueTimestampHeaders)) {
// Strip header size (varint 1 byte), empty headers (no bytes), and timestamp
final byte[] res = new byte[rawValueTimestampHeaders.length - 1 - StateSerdes.TIMESTAMP_SIZE];
System.arraycopy(rawValueTimestampHeaders, 1 + StateSerdes.TIMESTAMP_SIZE, res, 0, res.length);
Copy link
Contributor

@aliehsaeedii aliehsaeedii Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should validate rawValueTimestampHeaders.length >= 1 + StateSerdes.TIMESTAMP_SIZE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is validated within the hasEmptyHeadersAndTimestamp call above, at l.64:

if (rawValueTimestampHeaders.length - 1 - StateSerdes.TIMESTAMP_SIZE < 0) {
   // throw serialization exception

@github-actions github-actions bot removed the triage PRs from the community label Mar 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants