Skip to content

Commit b891bf2

Browse files
jonvexnsivabalanVamsiyihualinliu-code
committed
fix(ingest): Repair affected logical timestamp milli tables (#14161)
Co-authored-by: Jonathan Vexler <=> Co-authored-by: sivabalan <n.siva.b@gmail.com> Co-authored-by: Vamsi <vamsi@onehouse.ai> Co-authored-by: Y Ethan Guo <ethan.guoyihua@gmail.com> Co-authored-by: Lin Liu <linliu.code@gmail.com>
1 parent 1f02a88 commit b891bf2

File tree

66 files changed

+5603
-98
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+5603
-98
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import java.util.stream.Collectors;
109109
import java.util.stream.Stream;
110110

111+
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
111112
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
112113
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
113114
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -890,6 +891,70 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio
890891
}
891892
}
892893

894+
/**
895+
* Validates that columns with secondary indexes are not evolved in an incompatible way.
896+
*
897+
* @param tableSchema the current table schema
898+
* @param writerSchema the new writer schema
899+
* @param indexMetadata the index metadata containing all index definitions
900+
* @throws SchemaCompatibilityException if a secondary index column has incompatible evolution
901+
*/
902+
static void validateSecondaryIndexSchemaEvolution(
903+
Schema tableSchema,
904+
Schema writerSchema,
905+
HoodieIndexMetadata indexMetadata) throws SchemaCompatibilityException {
906+
907+
// Filter for secondary index definitions
908+
List<HoodieIndexDefinition> secondaryIndexDefs = indexMetadata.getIndexDefinitions().values().stream()
909+
.filter(indexDef -> MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX))
910+
.collect(Collectors.toList());
911+
912+
if (secondaryIndexDefs.isEmpty()) {
913+
return;
914+
}
915+
916+
// Create a map from source field to index name for efficient lookup
917+
Map<String, String> columnToIndexName = new HashMap<>();
918+
for (HoodieIndexDefinition indexDef : secondaryIndexDefs) {
919+
String indexName = indexDef.getIndexName();
920+
for (String sourceField : indexDef.getSourceFields()) {
921+
// Note: If a column is part of multiple indexes, this will use the last one
922+
// This is fine since we just need any index name for error reporting
923+
columnToIndexName.put(sourceField, indexName);
924+
}
925+
}
926+
927+
// Check each indexed column for schema evolution
928+
for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
929+
String columnName = entry.getKey();
930+
String indexName = entry.getValue();
931+
932+
Schema.Field tableField = tableSchema.getField(columnName);
933+
934+
if (tableField == null) {
935+
// This shouldn't happen as indexed columns should exist in table schema
936+
LOG.warn("Secondary index '{}' references non-existent column: {}", indexName, columnName);
937+
continue;
938+
}
939+
940+
// Use AvroSchemaCompatibility's field lookup logic to handle aliases
941+
Schema.Field writerField = AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
942+
943+
if (writerField != null && !tableField.schema().equals(writerField.schema())) {
944+
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
945+
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
946+
continue;
947+
}
948+
949+
String errorMessage = String.format(
950+
"Column '%s' has secondary index '%s' and cannot evolve from schema '%s' to '%s'. "
951+
+ "Please drop the secondary index before changing the column type.",
952+
columnName, indexName, tableField.schema(), writerField.schema());
953+
throw new SchemaCompatibilityException(errorMessage);
954+
}
955+
}
956+
}
957+
893958
public void validateUpsertSchema() throws HoodieUpsertException {
894959
if (isMetadataTable) {
895960
return;

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hudi.common.model.HoodieRecord;
2626
import org.apache.hudi.common.model.HoodieSparkRecord;
2727
import org.apache.hudi.common.util.FileFormatUtils;
28+
import org.apache.hudi.common.util.Option;
2829
import org.apache.hudi.common.util.ParquetReaderIterator;
2930
import org.apache.hudi.common.util.ParquetUtils;
3031
import org.apache.hudi.common.util.StringUtils;
@@ -58,10 +59,14 @@
5859

5960
public class HoodieSparkParquetReader implements HoodieSparkFileReader {
6061

62+
public static final String ENABLE_LOGICAL_TIMESTAMP_REPAIR = "spark.hudi.logicalTimestampField.repair.enable";
6163
private final StoragePath path;
6264
private final HoodieStorage storage;
6365
private final FileFormatUtils parquetUtils;
64-
private List<ParquetReaderIterator> readerIterators = new ArrayList<>();
66+
private final List<ClosableIterator> readerIterators = new ArrayList<>();
67+
private Option<MessageType> fileSchemaOption = Option.empty();
68+
private Option<StructType> structTypeOption = Option.empty();
69+
private Option<Schema> schemaOption = Option.empty();
6570

6671
public HoodieSparkParquetReader(HoodieStorage storage, StoragePath path) {
6772
this.path = path;
@@ -135,16 +140,39 @@ private ClosableIterator<InternalRow> getInternalRowIterator(Schema readerSchema
135140
return parquetReaderIterator;
136141
}
137142

143+
private MessageType getFileSchema() {
144+
if (fileSchemaOption.isEmpty()) {
145+
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
146+
fileSchemaOption = Option.of(messageType);
147+
}
148+
return fileSchemaOption.get();
149+
}
150+
138151
@Override
139152
public Schema getSchema() {
140-
// Some types in avro are not compatible with parquet.
141-
// Avro only supports representing Decimals as fixed byte array
142-
// and therefore if we convert to Avro directly we'll lose logical type-info.
143-
MessageType messageType = ((ParquetUtils) parquetUtils).readSchema(storage, path);
144-
StructType structType = new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
145-
return SparkAdapterSupport$.MODULE$.sparkAdapter()
146-
.getAvroSchemaConverters()
147-
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING);
153+
if (schemaOption.isEmpty()) {
154+
// Some types in avro are not compatible with parquet.
155+
// Avro only supports representing Decimals as fixed byte array
156+
// and therefore if we convert to Avro directly we'll lose logical type-info.
157+
MessageType messageType = getFileSchema();
158+
StructType structType = getStructSchema();
159+
schemaOption = Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter()
160+
.getAvroSchemaConverters()
161+
.toAvroType(structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
162+
}
163+
return schemaOption.get();
164+
}
165+
166+
protected StructType getStructSchema() {
167+
if (structTypeOption.isEmpty()) {
168+
MessageType messageType = getFileSchema();
169+
structTypeOption = Option.of(convertToStruct(messageType));
170+
}
171+
return structTypeOption.get();
172+
}
173+
174+
private StructType convertToStruct(MessageType messageType) {
175+
return new ParquetToSparkSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(messageType);
148176
}
149177

150178
@Override

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@
1818

1919
package org.apache.hudi.io.storage.row;
2020

21+
import org.apache.avro.LogicalTypes;
22+
import org.apache.avro.Schema;
2123
import org.apache.hadoop.conf.Configuration;
24+
25+
import org.apache.hudi.SparkAdapterSupport$;
2226
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
2327
import org.apache.hudi.common.bloom.BloomFilter;
2428
import org.apache.hudi.common.config.HoodieConfig;
@@ -27,10 +31,19 @@
2731
import org.apache.hudi.common.util.ReflectionUtils;
2832

2933
import org.apache.parquet.hadoop.api.WriteSupport;
34+
import org.apache.parquet.schema.GroupType;
35+
import org.apache.parquet.schema.LogicalTypeAnnotation;
36+
import org.apache.parquet.schema.Type;
37+
import org.apache.parquet.schema.Types;
38+
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
3039
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
40+
import org.apache.spark.sql.types.DataTypes;
41+
import org.apache.spark.sql.types.Decimal;
42+
import org.apache.spark.sql.types.Metadata;
3143
import org.apache.spark.sql.types.StructType;
3244
import org.apache.spark.unsafe.types.UTF8String;
3345

46+
import java.util.Arrays;
3447
import java.util.Collections;
3548
import java.util.Map;
3649

@@ -99,5 +112,4 @@ public static HoodieRowParquetWriteSupport getHoodieRowParquetWriteSupport(Confi
99112
new Class<?>[] {Configuration.class, StructType.class, Option.class, HoodieConfig.class},
100113
conf, structType, bloomFilterOpt, config);
101114
}
102-
103115
}

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ object AvroConversionUtils {
9292
recordNamespace: String): Row => GenericRecord = {
9393
val serde = sparkAdapter.createSparkRowSerDe(sourceSqlType)
9494
val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName, recordNamespace)
95-
val nullable = AvroSchemaUtils.resolveNullableSchema(avroSchema) != avroSchema
95+
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) != avroSchema
9696

9797
val converter = AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema, nullable)
9898

hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
9797
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
9898
// (and back)
9999
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
100-
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
100+
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema
101101

102102
// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
103103
// serializer is not able to digest it
@@ -166,7 +166,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
166166
// making Spark deserialize its internal representation [[InternalRow]] into [[Row]] for subsequent conversion
167167
// (and back)
168168
val sameSchema = writerAvroSchema.equals(readerAvroSchema)
169-
val nullable = AvroSchemaUtils.resolveNullableSchema(writerAvroSchema) != writerAvroSchema
169+
val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema) != writerAvroSchema
170170

171171
// NOTE: We have to serialize Avro schema, and then subsequently parse it on the executor node, since Spark
172172
// serializer is not able to digest it

hudi-common/src/main/java/org/apache/hudi/BaseHoodieTableFileIndex.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,6 +483,11 @@ protected boolean shouldReadAsPartitionedTable() {
483483
return (partitionColumns.length > 0 && canParsePartitionValues()) || HoodieTableMetadata.isMetadataTable(basePath);
484484
}
485485

486+
protected PartitionPath convertToPartitionPath(String partitionPath) {
487+
Object[] partitionColumnValues = parsePartitionColumnValues(partitionColumns, partitionPath);
488+
return new PartitionPath(partitionPath, partitionColumnValues);
489+
}
490+
486491
private static long fileSliceSize(FileSlice fileSlice) {
487492
long logFileSize = fileSlice.getLogFiles().map(HoodieLogFile::getFileSize)
488493
.filter(s -> s > 0)

hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,13 +207,13 @@ public static Option<Schema> findNestedFieldSchema(Schema schema, String fieldNa
207207
}
208208
String[] parts = fieldName.split("\\.");
209209
for (String part : parts) {
210-
Schema.Field foundField = resolveNullableSchema(schema).getField(part);
210+
Schema.Field foundField = getNonNullTypeFromUnion(schema).getField(part);
211211
if (foundField == null) {
212212
throw new HoodieAvroSchemaException(fieldName + " not a field in " + schema);
213213
}
214214
schema = foundField.schema();
215215
}
216-
return Option.of(resolveNullableSchema(schema));
216+
return Option.of(getNonNullTypeFromUnion(schema));
217217
}
218218

219219
public static Option<Schema.Type> findNestedFieldType(Schema schema, String fieldName) {
@@ -253,7 +253,7 @@ public static Schema resolveUnionSchema(Schema schema, String fieldSchemaFullNam
253253
List<Schema> innerTypes = schema.getTypes();
254254
if (innerTypes.size() == 2 && isNullable(schema)) {
255255
// this is a basic nullable field so handle it more efficiently
256-
return resolveNullableSchema(schema);
256+
return getNonNullTypeFromUnion(schema);
257257
}
258258

259259
Schema nonNullType =
@@ -287,7 +287,7 @@ public static boolean isNullable(Schema schema) {
287287
* Resolves typical Avro's nullable schema definition: {@code Union(Schema.Type.NULL, <NonNullType>)},
288288
* decomposing union and returning the target non-null type
289289
*/
290-
public static Schema resolveNullableSchema(Schema schema) {
290+
public static Schema getNonNullTypeFromUnion(Schema schema) {
291291
if (schema.getType() != Schema.Type.UNION) {
292292
return schema;
293293
}

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
110110
import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
111111
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
112+
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
112113
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
113114
import static org.apache.hudi.common.util.ValidationUtils.checkState;
114115
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.tryUpcastDecimal;
@@ -383,7 +384,7 @@ public static Schema makeFieldNonNull(Schema schema, String fieldName, Object fi
383384
.stream()
384385
.map(field -> {
385386
if (Objects.equals(field.name(), fieldName)) {
386-
return new Schema.Field(field.name(), AvroSchemaUtils.resolveNullableSchema(field.schema()), field.doc(), fieldDefaultValue);
387+
return createNewSchemaField(field.name(), AvroSchemaUtils.getNonNullTypeFromUnion(field.schema()), field.doc(), fieldDefaultValue);
387388
} else {
388389
return new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
389390
}
@@ -717,7 +718,7 @@ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record, String
717718
Object val = valueNode.get(part);
718719

719720
if (i == parts.length - 1) {
720-
return resolveNullableSchema(valueNode.getSchema().getField(part).schema());
721+
return getNonNullTypeFromUnion(valueNode.getSchema().getField(part).schema());
721722
} else {
722723
if (!(val instanceof GenericRecord)) {
723724
throw new HoodieException("Cannot find a record at part value :" + part);
@@ -741,10 +742,16 @@ public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema, Str
741742
int i = 0;
742743
for (; i < parts.length; i++) {
743744
String part = parts[i];
744-
Schema schema = writeSchema.getField(part).schema();
745+
try {
746+
// Resolve nullable/union schema to the actual schema
747+
currentSchema = getNonNullTypeFromUnion(currentSchema.getField(part).schema());
745748

746-
if (i == parts.length - 1) {
747-
return resolveNullableSchema(schema);
749+
if (i == parts.length - 1) {
750+
// Return the schema for the final part
751+
return getNonNullTypeFromUnion(currentSchema);
752+
}
753+
} catch (Exception e) {
754+
throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
748755
}
749756
}
750757
throw new HoodieException("Failed to get schema. Not a valid field name: " + fieldName);
@@ -782,7 +789,7 @@ public static Object convertValueForSpecificDataTypes(Schema fieldSchema,
782789
return null;
783790
}
784791

785-
return convertValueForAvroLogicalTypes(resolveNullableSchema(fieldSchema), fieldValue, consistentLogicalTimestampEnabled);
792+
return convertValueForAvroLogicalTypes(getNonNullTypeFromUnion(fieldSchema), fieldValue, consistentLogicalTimestampEnabled);
786793
}
787794

788795
/**
@@ -1049,7 +1056,9 @@ private static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Sche
10491056
return oldValue;
10501057
case LONG:
10511058
if (oldSchema.getLogicalType() != newSchema.getLogicalType()) {
1052-
if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
1059+
if (oldSchema.getLogicalType() == null || newSchema.getLogicalType() == null) {
1060+
return oldValue;
1061+
} else if (oldSchema.getLogicalType() instanceof LogicalTypes.TimestampMillis) {
10531062
if (newSchema.getLogicalType() instanceof LogicalTypes.TimestampMicros) {
10541063
return DateTimeUtils.millisToMicros((Long) oldValue);
10551064
}

hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.apache.avro.io.DecoderFactory;
4242
import org.apache.avro.io.Encoder;
4343
import org.apache.avro.io.EncoderFactory;
44+
import org.apache.parquet.schema.AvroSchemaRepair;
4445

4546
import javax.annotation.Nonnull;
4647

@@ -143,7 +144,7 @@ protected <T> ClosableIterator<HoodieRecord<T>> deserializeRecords(byte[] conten
143144
checkState(this.readerSchema != null, "Reader's schema has to be non-null");
144145
checkArgument(type != HoodieRecordType.SPARK, "Not support read avro to spark record");
145146
// TODO AvroSparkReader need
146-
RecordIterator iterator = RecordIterator.getInstance(this, content);
147+
RecordIterator iterator = RecordIterator.getInstance(this, content, true);
147148
return new CloseableMappingIterator<>(iterator, data -> (HoodieRecord<T>) new HoodieAvroIndexedRecord(data));
148149
}
149150

@@ -156,7 +157,7 @@ private static class RecordIterator implements ClosableIterator<IndexedRecord> {
156157
private int totalRecords = 0;
157158
private int readRecords = 0;
158159

159-
private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content) throws IOException {
160+
private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content, boolean enableLogicalTimestampFieldRepair) throws IOException {
160161
this.content = content;
161162

162163
this.dis = new SizeAwareDataInputStream(new DataInputStream(new ByteArrayInputStream(this.content)));
@@ -167,16 +168,21 @@ private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content)
167168
this.totalRecords = this.dis.readInt();
168169
}
169170

170-
if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema)) {
171-
this.reader = new GenericDatumReader<>(writerSchema, writerSchema);
171+
// writer schema could refer to table schema.
172+
// avoid this for MDT for sure.
173+
// and for tables having no logical ts column.
174+
Schema repairedWriterSchema = enableLogicalTimestampFieldRepair
175+
? AvroSchemaRepair.repairLogicalTypes(writerSchema, readerSchema) : writerSchema;
176+
if (recordNeedsRewriteForExtendedAvroTypePromotion(repairedWriterSchema, readerSchema)) {
177+
this.reader = new GenericDatumReader<>(repairedWriterSchema, repairedWriterSchema);
172178
this.promotedSchema = Option.of(readerSchema);
173179
} else {
174-
this.reader = new GenericDatumReader<>(writerSchema, readerSchema);
180+
this.reader = new GenericDatumReader<>(repairedWriterSchema, readerSchema);
175181
}
176182
}
177183

178-
public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, byte[] content) throws IOException {
179-
return new RecordIterator(dataBlock.readerSchema, dataBlock.getSchemaFromHeader(), content);
184+
public static RecordIterator getInstance(HoodieAvroDataBlock dataBlock, byte[] content, boolean enableLogicalTimestampFieldRepair) throws IOException {
185+
return new RecordIterator(dataBlock.readerSchema, dataBlock.getSchemaFromHeader(), content, enableLogicalTimestampFieldRepair);
180186
}
181187

182188
@Override

0 commit comments

Comments
 (0)