Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,52 +39,32 @@
@SuppressWarnings("checkstyle:membername")
@SuppressFBWarnings("URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD")
public class FFICommonConfig extends Struct {
/** Specifies AWS default configuration been overriden. */
public final Struct.Boolean override_aws_config = new Struct.Boolean();
/** Optional AWS configuration. */
/** Optional AWS configuration. Set to NULL if not used. */
public final Struct.StructRef<FFIAwsConfig> aws_config = new Struct.StructRef<>(FFIAwsConfig.class);
/** Array of input files to compact. */
public final FFIArray<java.lang.String> input_files = new FFIArray<>(this);
/** Whether the input files are individually sorted by the row and sort key fields. */
public final Struct.Boolean input_files_sorted = new Struct.Boolean();
/** Whether we should use readahead when reading from S3. */
public final Struct.Boolean use_readahead_store = new Struct.Boolean();
/** Whether Parquet page indexes should be read. */
public final Struct.Boolean read_page_indexes = new Struct.Boolean();
/** Output file name. */
public final Struct.UTF8StringRef output_file = new Struct.UTF8StringRef();
/** Specifies if sketch output is enabled. Can only be used with file output. */
public final Struct.Boolean write_sketch_file = new Struct.Boolean();
/** Whether we should use readahead when reading from S3. */
public final Struct.Boolean use_readahead_store = new Struct.Boolean();
/** Names of Sleeper row key fields from schema. */
public final FFIArray<java.lang.String> row_key_cols = new FFIArray<>(this);
/** Types for region schema 1 = Int, 2 = Long, 3 = String, 4 = Byte array. */
public final FFIArray<java.lang.Integer> row_key_schema = new FFIArray<>(this);
/** Names of Sleeper sort key fields from schema. */
public final FFIArray<java.lang.String> sort_key_cols = new FFIArray<>(this);
/** Maximum size of output Parquet row group in rows. */
public final Struct.size_t max_row_group_size = new Struct.size_t();
/** Maximum size of output Parquet page size in bytes. */
public final Struct.size_t max_page_size = new Struct.size_t();
/** Output Parquet compression codec. */
public final Struct.UTF8StringRef compression = new Struct.UTF8StringRef();
/** Output Parquet writer version. Must be 1.0 or 2.0 */
public final Struct.UTF8StringRef writer_version = new Struct.UTF8StringRef();
/** Column min/max values truncation length in output Parquet. */
public final Struct.size_t column_truncate_length = new Struct.size_t();
/** Max sizeof statistics block in output Parquet. */
public final Struct.size_t stats_truncate_length = new Struct.size_t();
/** Should row key fields use dictionary encoding in output Parquet. */
public final Struct.Boolean dict_enc_row_keys = new Struct.Boolean();
/** Should sort key fields use dictionary encoding in output Parquet. */
public final Struct.Boolean dict_enc_sort_keys = new Struct.Boolean();
/** Should value fields use dictionary encoding in output Parquet. */
public final Struct.Boolean dict_enc_values = new Struct.Boolean();
/** The Sleeper compaction region. */
public final Struct.StructRef<FFISleeperRegion> region = new StructRef<>(FFISleeperRegion.class);
/** Compaction aggregation configuration. This is optional. */
public final Struct.UTF8StringRef aggregation_config = new Struct.UTF8StringRef();
/** Compaction filtering configuration. This is optional. */
public final Struct.UTF8StringRef filtering_config = new Struct.UTF8StringRef();
/** Sleeper options. Set to NULL if defaults are suitable. */
public final Struct.StructRef<FFIParquetOptions> sleeper_options = new Struct.StructRef<>(FFIParquetOptions.class);

public FFICommonConfig(jnr.ffi.Runtime runtime) {
this(runtime, null);
Expand All @@ -93,16 +73,16 @@ public FFICommonConfig(jnr.ffi.Runtime runtime) {
public FFICommonConfig(jnr.ffi.Runtime runtime, DataFusionAwsConfig awsConfig) {
super(runtime);
if (awsConfig != null) {
this.override_aws_config.set(true);
this.aws_config.set(awsConfig.toFfi(runtime));
aws_config.set(awsConfig.toFfi(runtime));
} else {
this.override_aws_config.set(false);
// Null will use default AWS credentials
aws_config.set(0);
}
// Set to sensible defaults all members that don't have them.
// Primitives will all default to false/zero, FFIArrays also have safe defaults.
output_file.set("");
compression.set("");
writer_version.set("");
// Null here tells Rust to use defaults.
sleeper_options.set(0);
}

/**
Expand All @@ -120,8 +100,6 @@ public void validate() {
}
// Check strings non null
Objects.requireNonNull(output_file.get(), "Output file is null");
Objects.requireNonNull(writer_version.get(), "Parquet writer is null");
Objects.requireNonNull(compression.get(), "Parquet compression codec is null");
Objects.requireNonNull(aggregation_config.get(), "Aggregation configuration is null");
Objects.requireNonNull(filtering_config.get(), "Filtering configuration is null");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2022-2026 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.foreign.datafusion;

import jnr.ffi.Struct;

import java.util.Objects;

import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_COMPRESSION_CODEC;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_DICTIONARY_ENCODING_FOR_VALUE_FIELDS;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_PAGE_SIZE;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_PARQUET_ROWGROUP_ROWS;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_PARQUET_WRITER_VERSION;
import static sleeper.core.properties.instance.TableDefaultProperty.DEFAULT_STATISTICS_TRUNCATE_LENGTH;

/**
* Contains all the Sleeper options for DataFusion operation. These may not be needed for every DataFusion usage,
* so come with reasonable defaults. All defaults are documented in Rust code.
*
* <strong>THIS IS A C COMPATIBLE FFI STRUCT!</strong> If you updated this struct (field ordering, types, etc.),
* you MUST update the corresponding Rust definition in rust/sleeper_df/src/objects/ffi_sleeper_options.rs. The
* order and types of the fields must match exactly.
*/
@SuppressWarnings("checkstyle:membername")
public class FFIParquetOptions extends Struct {
/** Whether Parquet page indexes should be read. */
public final Struct.Boolean read_page_indexes = new Struct.Boolean();
/** Maximum size of output Parquet row group in rows. */
public final Struct.size_t max_row_group_size = new Struct.size_t();
/** Maximum size of output Parquet page size in bytes. */
public final Struct.size_t max_page_size = new Struct.size_t();
/** Output Parquet compression codec. */
public final Struct.UTF8StringRef compression = new Struct.UTF8StringRef();
/** Output Parquet writer version. Must be v1 or v2. */
public final Struct.UTF8StringRef writer_version = new Struct.UTF8StringRef();
/** Column min/max values truncation length in output Parquet. */
public final Struct.size_t column_truncate_length = new Struct.size_t();
/** Max sizeof statistics block in output Parquet. */
public final Struct.size_t stats_truncate_length = new Struct.size_t();
/** Should row key fields use dictionary encoding in output Parquet. */
public final Struct.Boolean dict_enc_row_keys = new Struct.Boolean();
/** Should sort key fields use dictionary encoding in output Parquet. */
public final Struct.Boolean dict_enc_sort_keys = new Struct.Boolean();
/** Should value fields use dictionary encoding in output Parquet. */
public final Struct.Boolean dict_enc_values = new Struct.Boolean();

public FFIParquetOptions(jnr.ffi.Runtime runtime) {
super(runtime);
read_page_indexes.set(false);
max_row_group_size.set(java.lang.Long.parseLong(DEFAULT_PARQUET_ROWGROUP_ROWS.getDefaultValue()));
max_page_size.set(java.lang.Long.parseLong(DEFAULT_PAGE_SIZE.getDefaultValue()));
compression.set(DEFAULT_COMPRESSION_CODEC.getDefaultValue());
writer_version.set(DEFAULT_PARQUET_WRITER_VERSION.getDefaultValue());
column_truncate_length.set(java.lang.Long.parseLong(DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH.getDefaultValue()));
stats_truncate_length.set(java.lang.Long.parseLong(DEFAULT_STATISTICS_TRUNCATE_LENGTH.getDefaultValue()));
dict_enc_row_keys.set(java.lang.Boolean.parseBoolean(DEFAULT_DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS.getDefaultValue()));
dict_enc_sort_keys.set(java.lang.Boolean.parseBoolean(DEFAULT_DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS.getDefaultValue()));
dict_enc_values.set(java.lang.Boolean.parseBoolean(DEFAULT_DICTIONARY_ENCODING_FOR_VALUE_FIELDS.getDefaultValue()));
}

/**
* Validates the state of this struct.
*
* @throws IllegalStateException when a invariant fails
*/
public void validate() {
if (max_row_group_size.get() < 1) {
throw new IllegalStateException("max row group size < 1");
}
if (max_page_size.get() < 1) {
throw new IllegalStateException("max page size < 1");
}
if (column_truncate_length.get() < 1) {
throw new IllegalStateException("column truncate length < 1");
}
if (stats_truncate_length.get() < 1) {
throw new IllegalStateException("stats truncate length < 1");
}
Objects.requireNonNull(compression.get(), "Parquet compression codec is null");
Objects.requireNonNull(writer_version.get(), "Parquet writer is null");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2022-2026 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.foreign.datafusion;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;

public class FFISleeperOptionsTest {
public static final jnr.ffi.Runtime RUNTIME = jnr.ffi.Runtime.getSystemRuntime();

@Test
void shouldFailOnZeroRowGroupSize() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.max_row_group_size.set(0);

// Then
assertThatIllegalStateException().isThrownBy(() -> options.validate()).withMessage("max row group size < 1");
}

@Test
void shouldFailOnZeroPageSize() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.max_page_size.set(0);

// Then
assertThatIllegalStateException().isThrownBy(() -> options.validate()).withMessage("max page size < 1");
}

@Test
void shouldFailOnZeroColumnTruncateLength() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.column_truncate_length.set(0);

// Then
assertThatIllegalStateException().isThrownBy(() -> options.validate()).withMessage("column truncate length < 1");
}

@Test
void shouldFailOnZeroStatsTruncateLength() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.stats_truncate_length.set(0);

// Then
assertThatIllegalStateException().isThrownBy(() -> options.validate()).withMessage("stats truncate length < 1");
}

@Test
void shouldFailOnNullCompressionCodec() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.compression.set(null);

// Then
assertThatNullPointerException().isThrownBy(() -> options.validate()).withMessage("Parquet compression codec is null");
}

@Test
void shouldFailOnNullWriterVersion() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.writer_version.set(null);

// Then
assertThatNullPointerException().isThrownBy(() -> options.validate()).withMessage("Parquet writer is null");
}

@Test
void shouldValidate() {
// Given
FFIParquetOptions options = new FFIParquetOptions(RUNTIME);
options.max_row_group_size.set(10);
options.max_page_size.set(10);
options.column_truncate_length.set(10);
options.stats_truncate_length.set(10);
options.compression.set("zstd");
options.writer_version.set("v2");

// When
options.validate();

// Then - pass
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import sleeper.foreign.bridge.FFIContext;
import sleeper.foreign.datafusion.DataFusionAwsConfig;
import sleeper.foreign.datafusion.FFICommonConfig;
import sleeper.foreign.datafusion.FFIParquetOptions;
import sleeper.parquet.row.ParquetRowWriterFactory;

import java.io.IOException;
Expand Down Expand Up @@ -107,27 +108,31 @@ public RowsProcessed compact(CompactionJob job, TableProperties tableProperties,
private static FFICommonConfig createCompactionParams(CompactionJob job, TableProperties tableProperties,
Region region, DataFusionAwsConfig awsConfig, jnr.ffi.Runtime runtime) {
Schema schema = tableProperties.getSchema();
FFIParquetOptions sleeperOptions = new FFIParquetOptions(runtime);
sleeperOptions.read_page_indexes.set(false);
sleeperOptions.max_row_group_size.set(tableProperties.getInt(PARQUET_ROW_GROUP_SIZE_ROWS));
sleeperOptions.max_page_size.set(tableProperties.getInt(PAGE_SIZE));
sleeperOptions.compression.set(tableProperties.get(COMPRESSION_CODEC));
sleeperOptions.writer_version.set(tableProperties.get(PARQUET_WRITER_VERSION));
sleeperOptions.column_truncate_length.set(tableProperties.getInt(COLUMN_INDEX_TRUNCATE_LENGTH));
sleeperOptions.stats_truncate_length.set(tableProperties.getInt(STATISTICS_TRUNCATE_LENGTH));
sleeperOptions.dict_enc_row_keys.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS));
sleeperOptions.dict_enc_sort_keys.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS));
sleeperOptions.dict_enc_values.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_VALUE_FIELDS));

FFICommonConfig params = new FFICommonConfig(runtime, awsConfig);
params.sleeper_options.set(sleeperOptions);
params.input_files.populate(job.getInputFiles().toArray(String[]::new), false);
// Files are always sorted for compactions
params.input_files_sorted.set(true);
params.use_readahead_store.set(tableProperties.getBoolean(DATAFUSION_S3_READAHEAD_ENABLED));
// Reading page indexes are not useful for compactions
params.read_page_indexes.set(false);
params.output_file.set(job.getOutputFile());
params.write_sketch_file.set(true);
params.use_readahead_store.set(tableProperties.getBoolean(DATAFUSION_S3_READAHEAD_ENABLED));
params.row_key_cols.populate(schema.getRowKeyFieldNames().toArray(String[]::new), false);
params.row_key_schema.populate(FFICommonConfig.getKeyTypes(schema.getRowKeyTypes()), false);
params.sort_key_cols.populate(schema.getSortKeyFieldNames().toArray(String[]::new), false);
params.max_row_group_size.set(tableProperties.getInt(PARQUET_ROW_GROUP_SIZE_ROWS));
params.max_page_size.set(tableProperties.getInt(PAGE_SIZE));
params.compression.set(tableProperties.get(COMPRESSION_CODEC));
params.writer_version.set(tableProperties.get(PARQUET_WRITER_VERSION));
params.column_truncate_length.set(tableProperties.getInt(COLUMN_INDEX_TRUNCATE_LENGTH));
params.stats_truncate_length.set(tableProperties.getInt(STATISTICS_TRUNCATE_LENGTH));
params.dict_enc_row_keys.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_ROW_KEY_FIELDS));
params.dict_enc_sort_keys.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_SORT_KEY_FIELDS));
params.dict_enc_values.set(tableProperties.getBoolean(DICTIONARY_ENCODING_FOR_VALUE_FIELDS));

params.aggregation_config.set(job.getAggregationConfig() == null ? "" : job.getAggregationConfig());
params.filtering_config.set(job.getFilterConfig() == null ? "" : job.getFilterConfig());
params.region.set(FFISleeperRegion.from(region, schema, runtime));
Expand Down
Loading
Loading