Skip to content

Commit 5ef5773

Browse files
committed
Fix cherry-pick error
1 parent b891bf2 commit 5ef5773

File tree

21 files changed

+433
-501
lines changed

21 files changed

+433
-501
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java

Lines changed: 1 addition & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
4747
import org.apache.hudi.common.model.HoodieFileFormat;
4848
import org.apache.hudi.common.model.HoodieKey;
49+
import org.apache.hudi.common.model.HoodieRecordLocation;
4950
import org.apache.hudi.common.model.HoodieWriteStat;
5051
import org.apache.hudi.common.table.HoodieTableConfig;
5152
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -108,7 +109,6 @@
108109
import java.util.stream.Collectors;
109110
import java.util.stream.Stream;
110111

111-
import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
112112
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
113113
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
114114
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
@@ -891,70 +891,6 @@ private void validateSchema() throws HoodieUpsertException, HoodieInsertExceptio
891891
}
892892
}
893893

894-
/**
895-
* Validates that columns with secondary indexes are not evolved in an incompatible way.
896-
*
897-
* @param tableSchema the current table schema
898-
* @param writerSchema the new writer schema
899-
* @param indexMetadata the index metadata containing all index definitions
900-
* @throws SchemaCompatibilityException if a secondary index column has incompatible evolution
901-
*/
902-
static void validateSecondaryIndexSchemaEvolution(
903-
Schema tableSchema,
904-
Schema writerSchema,
905-
HoodieIndexMetadata indexMetadata) throws SchemaCompatibilityException {
906-
907-
// Filter for secondary index definitions
908-
List<HoodieIndexDefinition> secondaryIndexDefs = indexMetadata.getIndexDefinitions().values().stream()
909-
.filter(indexDef -> MetadataPartitionType.fromPartitionPath(indexDef.getIndexName()).equals(MetadataPartitionType.SECONDARY_INDEX))
910-
.collect(Collectors.toList());
911-
912-
if (secondaryIndexDefs.isEmpty()) {
913-
return;
914-
}
915-
916-
// Create a map from source field to index name for efficient lookup
917-
Map<String, String> columnToIndexName = new HashMap<>();
918-
for (HoodieIndexDefinition indexDef : secondaryIndexDefs) {
919-
String indexName = indexDef.getIndexName();
920-
for (String sourceField : indexDef.getSourceFields()) {
921-
// Note: If a column is part of multiple indexes, this will use the last one
922-
// This is fine since we just need any index name for error reporting
923-
columnToIndexName.put(sourceField, indexName);
924-
}
925-
}
926-
927-
// Check each indexed column for schema evolution
928-
for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
929-
String columnName = entry.getKey();
930-
String indexName = entry.getValue();
931-
932-
Schema.Field tableField = tableSchema.getField(columnName);
933-
934-
if (tableField == null) {
935-
// This shouldn't happen as indexed columns should exist in table schema
936-
LOG.warn("Secondary index '{}' references non-existent column: {}", indexName, columnName);
937-
continue;
938-
}
939-
940-
// Use AvroSchemaCompatibility's field lookup logic to handle aliases
941-
Schema.Field writerField = AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
942-
943-
if (writerField != null && !tableField.schema().equals(writerField.schema())) {
944-
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
945-
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
946-
continue;
947-
}
948-
949-
String errorMessage = String.format(
950-
"Column '%s' has secondary index '%s' and cannot evolve from schema '%s' to '%s'. "
951-
+ "Please drop the secondary index before changing the column type.",
952-
columnName, indexName, tableField.schema(), writerField.schema());
953-
throw new SchemaCompatibilityException(errorMessage);
954-
}
955-
}
956-
}
957-
958894
public void validateUpsertSchema() throws HoodieUpsertException {
959895
if (isMetadataTable) {
960896
return;

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private StructType convertToStruct(MessageType messageType) {
177177

178178
@Override
179179
public void close() {
180-
readerIterators.forEach(ParquetReaderIterator::close);
180+
readerIterators.forEach(it -> it.close());
181181
}
182182

183183
@Override

hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,19 @@
1818

1919
package org.apache.hudi.io.storage.row;
2020

21-
import org.apache.avro.LogicalTypes;
22-
import org.apache.avro.Schema;
23-
import org.apache.hadoop.conf.Configuration;
24-
25-
import org.apache.hudi.SparkAdapterSupport$;
2621
import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
2722
import org.apache.hudi.common.bloom.BloomFilter;
2823
import org.apache.hudi.common.config.HoodieConfig;
2924
import org.apache.hudi.common.config.HoodieStorageConfig;
3025
import org.apache.hudi.common.util.Option;
3126
import org.apache.hudi.common.util.ReflectionUtils;
3227

28+
import org.apache.hadoop.conf.Configuration;
3329
import org.apache.parquet.hadoop.api.WriteSupport;
34-
import org.apache.parquet.schema.GroupType;
35-
import org.apache.parquet.schema.LogicalTypeAnnotation;
36-
import org.apache.parquet.schema.Type;
37-
import org.apache.parquet.schema.Types;
38-
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
3930
import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
40-
import org.apache.spark.sql.types.DataTypes;
41-
import org.apache.spark.sql.types.Decimal;
42-
import org.apache.spark.sql.types.Metadata;
4331
import org.apache.spark.sql.types.StructType;
4432
import org.apache.spark.unsafe.types.UTF8String;
4533

46-
import java.util.Arrays;
4734
import java.util.Collections;
4835
import java.util.Map;
4936

hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,16 @@
1919
package org.apache.spark.sql.hudi
2020

2121
import org.apache.avro.Schema
22-
import org.apache.hadoop.fs.{FileStatus, Path}
2322
import org.apache.hudi.client.utils.SparkRowSerDe
2423
import org.apache.hudi.common.table.HoodieTableMetaClient
2524
import org.apache.hudi.storage.StoragePath
2625

27-
import org.apache.avro.Schema
28-
import org.apache.hadoop.conf.Configuration
2926
import org.apache.spark.sql._
3027
import org.apache.spark.sql.avro.{HoodieAvroDeserializer, HoodieAvroSchemaConverters, HoodieAvroSerializer}
28+
import org.apache.spark.sql.HoodieUnsafeUtils
3129
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
3230
import org.apache.spark.sql.catalyst.catalog.CatalogTable
33-
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
34-
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, InterpretedPredicate}
31+
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedPredicate}
3532
import org.apache.spark.sql.catalyst.parser.ParserInterface
3633
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
3734
import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
@@ -53,7 +50,7 @@ import java.util.{Locale, TimeZone}
5350
trait SparkAdapter extends Serializable {
5451

5552
/**
56-
* Checks whether provided instance of [[InternalRow]] is actually an instance of [[ColumnarBatchRow]]
53+
* Checks whether provided instance of [[InternalRow]] is actually an instance of [[org.apache.spark.sql.vectorized.ColumnarBatchRow]]
5754
*/
5855
def isColumnarBatchRow(r: InternalRow): Boolean
5956

@@ -72,7 +69,7 @@ trait SparkAdapter extends Serializable {
7269

7370
/**
7471
* Returns an instance of [[HoodieCatalogUtils]] providing for common utils operating on Spark's
75-
* [[TableCatalog]]s
72+
* [[org.apache.spark.sql.connector.catalog.TableCatalog]]s
7673
*/
7774
def getCatalogUtils: HoodieCatalogUtils
7875

@@ -207,7 +204,7 @@ trait SparkAdapter extends Serializable {
207204
metadataColumns: Seq[AttributeReference] = Seq.empty): FileScanRDD
208205

209206
/**
210-
* Extract condition in [[DeleteFromTable]]
207+
* Extract condition in [[org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable]]
211208
* SPARK-38626 condition is no longer Option in Spark 3.3
212209
*/
213210
def extractDeleteCondition(deleteFromTable: Command): Expression
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.avro;
20+
21+
import com.github.benmanes.caffeine.cache.Caffeine;
22+
import com.github.benmanes.caffeine.cache.LoadingCache;
23+
import org.apache.avro.Schema;
24+
25+
/**
26+
* An avro schema cache implementation for reusing avro schema instantces in JVM/process scope.
27+
* This is a global cache which works for a JVM lifecycle.
28+
* A collection of schema instants are maintained.
29+
*
30+
* <p> NOTE: The schema which be used frequently should be cached through this cache.
31+
*/
32+
public class AvroSchemaCache {
33+
34+
35+
// Ensure that there is only one variable instance of the same schema within an entire JVM lifetime
36+
private static final LoadingCache<Schema, Schema> SCHEMA_CACHE = Caffeine.newBuilder().weakValues().maximumSize(1024).build(k -> k);
37+
38+
/**
39+
* Get schema variable from global cache. If not found, put it into the cache and then return it.
40+
* @param schema schema to get
41+
* @return if found, return the exist schema variable, otherwise return the param itself.
42+
*/
43+
public static Schema intern(Schema schema) {
44+
return SCHEMA_CACHE.get(schema);
45+
}
46+
47+
}

hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.avro.LogicalTypes.Decimal;
5555
import org.apache.avro.Schema;
5656
import org.apache.avro.Schema.Field;
57+
import org.apache.avro.Schema.Field.Order;
5758
import org.apache.avro.generic.GenericData;
5859
import org.apache.avro.generic.GenericData.Record;
5960
import org.apache.avro.generic.GenericDatumReader;
@@ -78,6 +79,7 @@
7879
import java.math.BigInteger;
7980
import java.math.RoundingMode;
8081
import java.nio.ByteBuffer;
82+
import java.nio.charset.StandardCharsets;
8183
import java.sql.Date;
8284
import java.sql.Timestamp;
8385
import java.time.Instant;
@@ -105,7 +107,6 @@
105107
import static org.apache.avro.Schema.Type.UNION;
106108
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
107109
import static org.apache.hudi.avro.AvroSchemaUtils.isNullable;
108-
import static org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema;
109110
import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
110111
import static org.apache.hudi.common.util.DateTimeUtils.instantToMicros;
111112
import static org.apache.hudi.common.util.DateTimeUtils.microsToInstant;
@@ -739,6 +740,7 @@ public static Schema getNestedFieldSchemaFromRecord(GenericRecord record, String
739740
*/
740741
public static Schema getNestedFieldSchemaFromWriteSchema(Schema writeSchema, String fieldName) {
741742
String[] parts = fieldName.split("\\.");
743+
Schema currentSchema = writeSchema;
742744
int i = 0;
743745
for (; i < parts.length; i++) {
744746
String part = parts[i];
@@ -1375,6 +1377,10 @@ public static boolean gteqAvro1_10() {
13751377
return StringUtils.compareVersions(AVRO_VERSION, "1.10") >= 0;
13761378
}
13771379

1380+
static boolean gteqAvro1_12() {
1381+
return StringUtils.compareVersions(AVRO_VERSION, "1.12") >= 0;
1382+
}
1383+
13781384
/**
13791385
* Wraps a value into Avro type wrapper.
13801386
*
@@ -1502,4 +1508,73 @@ private static boolean isLocalTimestampMicros(LogicalType logicalType) {
15021508
}
15031509
}
15041510

1511+
private static Object convertDefaultValueForAvroCompatibility(Object defaultValue) {
1512+
if (gteqAvro1_12() && defaultValue instanceof byte[]) {
1513+
// For Avro 1.12.0 compatibility, we need to convert the default value in byte array
1514+
// to String so that correct JsonNode is used for the default value for validation,
1515+
// instead of directly relying on Avro's JacksonUtils.toJsonNode which is called
1516+
// by `Schema.Field` constructor
1517+
// The logic of getting the String value is copied from JacksonUtils.toJsonNode in Avro 1.11.4
1518+
return new String((byte[]) defaultValue, StandardCharsets.ISO_8859_1);
1519+
}
1520+
return defaultValue;
1521+
}
1522+
1523+
/**
1524+
* Creates a new Avro Schema.Field from an existing field, with special handling for
1525+
* default values to ensure compatibility with Avro 1.12.0 and later versions.
1526+
*
1527+
* @param field the original Schema.Field to create a new field from
1528+
* @return a new Schema.Field with the same properties but properly formatted default value
1529+
*/
1530+
public static Schema.Field createNewSchemaField(Schema.Field field) {
1531+
return createNewSchemaField(field.name(), field.schema(), field.doc(), field.defaultVal());
1532+
}
1533+
1534+
/**
1535+
* Creates a new Avro Schema.Field with special handling for default values to ensure
1536+
* compatibility with Avro 1.12.0 and later versions.
1537+
*
1538+
* <p>In Avro 1.12.0+, the validation of default values for bytes fields is stricter.
1539+
* When the default value is a byte array, it needs to be converted to a String using
1540+
* ISO-8859-1 encoding so that the correct JsonNode type (TextNode) is used for validation,
1541+
* rather than BinaryNode which would fail validation. Changes in Avro 1.12.0 that
1542+
* lead to this behavior: [AVRO-3876] https://github.com/apache/avro/pull/2529
1543+
*
1544+
* <p>This conversion ensures that schemas with bytes fields having default values
1545+
* can be properly constructed without AvroTypeException in Avro 1.12.0+.
1546+
*
1547+
* @param name the name of the field
1548+
* @param schema the schema of the field
1549+
* @param doc the documentation for the field (can be null)
1550+
* @param defaultValue the default value for the field (can be null)
1551+
* @return a new Schema.Field with properly formatted default value for Avro 1.12.0+ compatibility
1552+
*/
1553+
public static Schema.Field createNewSchemaField(String name, Schema schema, String doc, Object defaultValue) {
1554+
return new Schema.Field(name, schema, doc, convertDefaultValueForAvroCompatibility(defaultValue));
1555+
}
1556+
1557+
/**
1558+
* Creates a new Avro Schema.Field with special handling for default values to ensure
1559+
* compatibility with Avro 1.12.0 and later versions.
1560+
*
1561+
* <p>In Avro 1.12.0+, the validation of default values for bytes fields is stricter.
1562+
* When the default value is a byte array, it needs to be converted to a String using
1563+
* ISO-8859-1 encoding so that the correct JsonNode type (TextNode) is used for validation,
1564+
* rather than BinaryNode which would fail validation. Changes in Avro 1.12.0 that
1565+
* lead to this behavior: [AVRO-3876] https://github.com/apache/avro/pull/2529
1566+
*
1567+
* <p>This conversion ensures that schemas with bytes fields having default values
1568+
* can be properly constructed without AvroTypeException in Avro 1.12.0+.
1569+
*
1570+
* @param name the name of the field
1571+
* @param schema the schema of the field
1572+
* @param doc the documentation for the field (can be null)
1573+
* @param defaultValue the default value for the field (can be null)
1574+
* @param order the sort order for this field (can be null, defaults to ascending)
1575+
* @return a new Schema.Field with properly formatted default value for Avro 1.12.0+ compatibility
1576+
*/
1577+
public static Schema.Field createNewSchemaField(String name, Schema schema, String doc, Object defaultValue, Order order) {
1578+
return new Schema.Field(name, schema, doc, convertDefaultValueForAvroCompatibility(defaultValue), order);
1579+
}
15051580
}

hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,19 @@ public final List<StoragePath> getPartitionPaths() {
637637
}
638638
}
639639

640+
public final List<String> getPartitionNames() {
641+
try {
642+
readLock.lock();
643+
return fetchAllStoredFileGroups()
644+
.filter(fg -> !isFileGroupReplaced(fg))
645+
.map(HoodieFileGroup::getPartitionPath)
646+
.distinct()
647+
.collect(Collectors.toList());
648+
} finally {
649+
readLock.unlock();
650+
}
651+
}
652+
640653
@Override
641654
public final Stream<Pair<String, CompactionOperation>> getPendingLogCompactionOperations() {
642655
try {

hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.hudi.metadata;
2020

21-
import org.apache.hudi.avro.AvroSchemaUtils;
2221
import org.apache.hudi.avro.ConvertingGenericData;
2322
import org.apache.hudi.avro.HoodieAvroUtils;
2423
import org.apache.hudi.avro.model.HoodieCleanMetadata;

hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import java.util.stream.IntStream;
9090
import java.util.stream.Stream;
9191

92+
import static org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField;
9293
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
9394
import static org.apache.hudi.common.util.ValidationUtils.checkState;
9495

@@ -216,6 +217,7 @@ public class HoodieTestDataGenerator implements AutoCloseable {
216217
private final String[] partitionPaths;
217218
//maintains the count of existing keys schema wise
218219
private Map<String, Integer> numKeysBySchema;
220+
private Option<Schema> extendedSchema = Option.empty();
219221

220222
public HoodieTestDataGenerator(long seed) {
221223
this(seed, DEFAULT_PARTITION_PATHS, new HashMap<>());

0 commit comments

Comments
 (0)