Skip to content

Commit 9f21467

Browse files
committed
Fix more Azure CI tests
1 parent 5ef643e commit 9f21467

File tree

14 files changed

+391
-157
lines changed

14 files changed

+391
-157
lines changed

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergedReadHandle.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.hudi.io;
2121

22+
import org.apache.hudi.avro.AvroSchemaUtils;
2223
import org.apache.hudi.avro.HoodieAvroUtils;
2324
import org.apache.hudi.common.model.FileSlice;
2425
import org.apache.hudi.common.model.HoodieLogFile;
@@ -68,10 +69,13 @@ public HoodieMergedReadHandle(HoodieWriteConfig config,
6869
Pair<String, String> partitionPathFileIDPair,
6970
Option<FileSlice> fileSliceOption) {
7071
super(config, instantTime, hoodieTable, partitionPathFileIDPair);
71-
readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
72+
Schema orignalReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
7273
// config.getSchema is not canonicalized, while config.getWriteSchema is canonicalized. So, we have to use the canonicalized schema to read the existing data.
7374
baseFileReaderSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getWriteSchema()), config.allowOperationMetadataField());
7475
fileSliceOpt = fileSliceOption.isPresent() ? fileSliceOption : getLatestFileSlice();
76+
// Repair reader schema.
77+
// Assume writer schema should be correct. If not, no repair happens.
78+
readerSchema = AvroSchemaUtils.getRepairedSchema(orignalReaderSchema, baseFileReaderSchema);
7579
}
7680

7781
public List<HoodieRecord<T>> getMergedRecords() {

hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hudi.table.action.commit;
2020

21+
import org.apache.hudi.avro.AvroSchemaUtils;
2122
import org.apache.hudi.common.config.HoodieCommonConfig;
2223
import org.apache.hudi.common.model.HoodieBaseFile;
2324
import org.apache.hudi.common.model.HoodieRecord;
@@ -86,7 +87,8 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
8687
HoodieFileReader bootstrapFileReader = null;
8788

8889
Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
89-
Schema readerSchema = baseFileReader.getSchema();
90+
Schema readerSchema = AvroSchemaUtils.getRepairedSchema(baseFileReader.getSchema(), writerSchema);
91+
9092

9193
// In case Advanced Schema Evolution is enabled we might need to rewrite currently
9294
// persisted records to adhere to an evolved schema

hudi-client/hudi-spark-client/src/parquet/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.spark.sql.execution.datasources.parquet
2121

2222
import org.apache.hudi.SparkAdapterSupport
23+
import org.apache.hudi.common.util.{Option => HOption}
2324
import org.apache.hudi.common.util.ValidationUtils
2425
import org.apache.parquet.hadoop.api.InitContext
2526
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
@@ -29,18 +30,17 @@ import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
2930
import java.time.ZoneId
3031
import scala.collection.JavaConverters._
3132

32-
class HoodieParquetReadSupport(
33-
convertTz: Option[ZoneId],
34-
enableVectorizedReader: Boolean,
35-
val enableTimestampFieldRepair: Boolean,
36-
datetimeRebaseSpec: RebaseSpec,
37-
int96RebaseSpec: RebaseSpec,
38-
tableSchemaOpt: org.apache.hudi.common.util.Option[MessageType] = org.apache.hudi.common.util.Option.empty())
33+
class HoodieParquetReadSupport(convertTz: Option[ZoneId],
34+
enableVectorizedReader: Boolean,
35+
val enableTimestampFieldRepair: Boolean,
36+
datetimeRebaseSpec: RebaseSpec,
37+
int96RebaseSpec: RebaseSpec,
38+
tableSchemaOpt: HOption[MessageType] = HOption.empty())
3939
extends ParquetReadSupport(convertTz, enableVectorizedReader, datetimeRebaseSpec, int96RebaseSpec) with SparkAdapterSupport {
4040

4141
override def init(context: InitContext): ReadContext = {
4242
val readContext = super.init(context)
43-
// repair is needed here because this is the schema that is used by the reader to decide what
43+
// Repairing is needed here because this is the schema that is used by the reader to decide what
4444
// conversions are necessary
4545
val requestedParquetSchema = if (enableTimestampFieldRepair) {
4646
HoodieParquetReadSupport.getRepairedSchema(readContext.getRequestedSchema, tableSchemaOpt)
@@ -101,7 +101,8 @@ object HoodieParquetReadSupport {
101101
}
102102
}
103103

104-
def getRepairedSchema(fileSchema: MessageType, tableSchema: org.apache.hudi.common.util.Option[MessageType]): MessageType = {
104+
def getRepairedSchema(fileSchema: MessageType,
105+
tableSchema: org.apache.hudi.common.util.Option[MessageType]): MessageType = {
105106
try {
106107
val schemaRepairClass = Class.forName("org.apache.parquet.schema.SchemaRepair")
107108
val repairMethod = schemaRepairClass.getMethod(

hudi-common/src/main/java/org/apache/hudi/avro/MercifulJsonConverter.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,12 +301,27 @@ private static JsonToAvroFieldProcessor generateFixedTypeHandler() {
301301
return new JsonToAvroFieldProcessor() {
302302
@Override
303303
public Pair<Boolean, Object> convert(Object value, String name, Schema schema, boolean shouldSanitize, String invalidCharMask) {
304+
byte[] src;
304305
// The ObjectMapper use List to represent FixedType
305306
// eg: "decimal_val": [0, 0, 14, -63, -52] will convert to ArrayList<Integer>
306-
List<Integer> converval = (List<Integer>) value;
307-
byte[] src = new byte[converval.size()];
308-
for (int i = 0; i < converval.size(); i++) {
309-
src[i] = converval.get(i).byteValue();
307+
if (value instanceof List) {
308+
List<Integer> converval = (List<Integer>) value;
309+
src = new byte[converval.size()];
310+
for (int i = 0; i < converval.size(); i++) {
311+
src[i] = converval.get(i).byteValue();
312+
}
313+
} else if (value instanceof ByteBuffer) {
314+
// Handle ByteBuffer when reading from existing records
315+
ByteBuffer buffer = (ByteBuffer) value;
316+
int start = buffer.position();
317+
int length = buffer.limit() - start;
318+
src = new byte[length];
319+
buffer.get(src, 0, length);
320+
buffer.position(start);
321+
} else if (value instanceof byte[]) {
322+
src = (byte[]) value;
323+
} else {
324+
return Pair.of(false, null);
310325
}
311326
byte[] dst = new byte[schema.getFixedSize()];
312327
System.arraycopy(src, 0, dst, 0, Math.min(schema.getFixedSize(), src.length));

hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java

Lines changed: 29 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -152,26 +152,17 @@ public class HoodieTestDataGenerator implements AutoCloseable {
152152
+ "{\"name\":\"ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
153153
+ "{\"name\":\"local_ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"}},"
154154
+ "{\"name\":\"local_ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}},"
155-
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
156-
+ "{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
157-
+ "{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
158-
155+
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},";
159156
public static final String EXTENDED_LOGICAL_TYPES_SCHEMA = "{\"name\":\"ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
160157
+ "{\"name\":\"ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
161158
+ "{\"name\":\"local_ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-millis\"}},"
162159
+ "{\"name\":\"local_ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}},"
163-
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
164-
+ "{\"name\":\"dec_plain_large\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":20,\"scale\":10}},"
165-
+ "{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
166-
+ "{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
160+
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},";
167161

168162
// LTS = Local Timestamp
169163
public static final String EXTENDED_LOGICAL_TYPES_SCHEMA_NO_LTS = "{\"name\":\"ts_millis\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
170164
+ "{\"name\":\"ts_micros\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
171-
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
172-
+ "{\"name\":\"dec_plain_large\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":20,\"scale\":10}},"
173-
+ "{\"name\":\"dec_fixed_small\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedSmall\",\"size\":3,\"logicalType\":\"decimal\",\"precision\":5,\"scale\":2}},"
174-
+ "{\"name\":\"dec_fixed_large\",\"type\":{\"type\":\"fixed\",\"name\":\"decFixedLarge\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":18,\"scale\":9}},";
165+
+ "{\"name\":\"event_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},";
175166

176167
public static final String TRIP_EXAMPLE_SCHEMA =
177168
TRIP_SCHEMA_PREFIX + EXTRA_TYPE_SCHEMA + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
@@ -431,9 +422,17 @@ public RawTripTestPayload generatePayloadForShortTripSchema(HoodieKey key, Strin
431422
* Generates a new avro record of the above schema format for a delete.
432423
*/
433424
private RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime) throws IOException {
425+
return generateRandomDeleteValue(key, instantTime, TRIP_EXAMPLE_SCHEMA);
426+
}
427+
428+
private RawTripTestPayload generateRandomDeleteValue(HoodieKey key, String instantTime, String schemaStr) throws IOException {
434429
GenericRecord rec = generateGenericRecord(key.getRecordKey(), key.getPartitionPath(), "rider-" + instantTime, "driver-" + instantTime, 0,
435430
true, false);
436-
return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), TRIP_EXAMPLE_SCHEMA, true, 0L);
431+
return new RawTripTestPayload(Option.of(rec.toString()), key.getRecordKey(), key.getPartitionPath(), schemaStr, true, 0L);
432+
}
433+
434+
private RawTripTestPayload generateRandomDeleteValuePerSchema(HoodieKey key, String instantTime, String schemaStr) throws IOException {
435+
return generateRandomValueAsPerSchema(schemaStr, key, instantTime, false, true, 0L);
437436
}
438437

439438
/**
@@ -556,8 +555,7 @@ private void generateTripSuffixValues(GenericRecord rec, boolean isDeleteRecord)
556555
* Generate record conforming to TRIP_EXAMPLE_SCHEMA or TRIP_FLATTENED_SCHEMA if isFlattened is true
557556
*/
558557
public GenericRecord generateGenericRecord(String rowKey, String partitionPath, String riderName, String driverName,
559-
long timestamp, boolean isDeleteRecord,
560-
boolean isFlattened) {
558+
long timestamp, boolean isDeleteRecord, boolean isFlattened) {
561559
GenericRecord rec = new GenericData.Record(isFlattened ? FLATTENED_AVRO_SCHEMA : AVRO_SCHEMA);
562560
generateTripPrefixValues(rec, rowKey, partitionPath, riderName, driverName, timestamp);
563561
if (isFlattened) {
@@ -664,38 +662,6 @@ public RawTripTestPayload generateRecordForTripLogicalTypesSchema(HoodieKey key,
664662
// event_date
665663
int eventDateBase = (int) dateThreshold.toEpochDay();
666664
rec.put("event_date", above ? eventDateBase + 1 : eventDateBase - 1);
667-
668-
669-
// -------------------
670-
// Decimal thresholds
671-
// -------------------
672-
BigDecimal decPlainLargeThreshold = new BigDecimal("1234567890.0987654321"); // precision=20, scale=10
673-
674-
BigDecimal decFixedSmallThreshold = new BigDecimal("543.21"); // precision=5, scale=2
675-
BigDecimal decFixedLargeThreshold = new BigDecimal("987654321.123456789"); // precision=18, scale=9
676-
677-
// Increment for just-above/below threshold = smallest possible unit for that scale
678-
BigDecimal incSmallScale2 = new BigDecimal("0.01");
679-
BigDecimal incLargeScale9 = new BigDecimal("0.000000001");
680-
BigDecimal incLargeScale10 = new BigDecimal("0.0000000001");
681-
682-
// Assign thresholded decimals
683-
if (!v6) {
684-
rec.put("dec_plain_large", ByteBuffer.wrap((above
685-
? decPlainLargeThreshold.add(incLargeScale10)
686-
: decPlainLargeThreshold.subtract(incLargeScale10)).unscaledValue().toByteArray()));
687-
}
688-
689-
Conversions.DecimalConversion decimalConversions = new Conversions.DecimalConversion();
690-
Schema decFixedSmallSchema = AVRO_TRIP_LOGICAL_TYPES_SCHEMA.getField("dec_fixed_small").schema();
691-
rec.put("dec_fixed_small", decimalConversions.toFixed(above
692-
? decFixedSmallThreshold.add(incSmallScale2)
693-
: decFixedSmallThreshold.subtract(incSmallScale2), decFixedSmallSchema, LogicalTypes.decimal(5, 2)));
694-
695-
Schema decFixedLargeSchema = AVRO_TRIP_LOGICAL_TYPES_SCHEMA.getField("dec_fixed_large").schema();
696-
rec.put("dec_fixed_large", decimalConversions.toFixed(above
697-
? decFixedLargeThreshold.add(incLargeScale9)
698-
: decFixedLargeThreshold.subtract(incLargeScale9), decFixedLargeSchema, LogicalTypes.decimal(18, 9)));
699665
generateTripSuffixValues(rec, isDeleteRecord);
700666
return new RawTripTestPayload(rec.toString(), key.getRecordKey(), key.getPartitionPath(), rec.getSchema().toString());
701667
}
@@ -1139,9 +1105,13 @@ public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, Inte
11391105
* @return stream of hoodie record updates
11401106
*/
11411107
public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
1108+
return generateUniqueDeleteStream(n, TRIP_EXAMPLE_SCHEMA);
1109+
}
1110+
1111+
public Stream<HoodieKey> generateUniqueDeleteStream(Integer n, String streamStr) {
11421112
final Set<KeyPartition> used = new HashSet<>();
1143-
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
1144-
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
1113+
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(streamStr);
1114+
Integer numExistingKeys = numKeysBySchema.get(streamStr);
11451115
if (n > numExistingKeys) {
11461116
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
11471117
}
@@ -1159,7 +1129,7 @@ public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
11591129
used.add(kp);
11601130
result.add(kp.key);
11611131
}
1162-
numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
1132+
numKeysBySchema.put(streamStr, numExistingKeys);
11631133
return result.stream();
11641134
}
11651135

@@ -1171,9 +1141,13 @@ public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
11711141
* @return stream of hoodie records for delete
11721142
*/
11731143
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String instantTime, Integer n) {
1144+
return generateUniqueDeleteRecordStream(instantTime, n, TRIP_EXAMPLE_SCHEMA);
1145+
}
1146+
1147+
public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String instantTime, Integer n, String schemaStr) {
11741148
final Set<KeyPartition> used = new HashSet<>();
1175-
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
1176-
Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
1149+
Map<Integer, KeyPartition> existingKeys = existingKeysBySchema.get(schemaStr);
1150+
Integer numExistingKeys = numKeysBySchema.get(schemaStr);
11771151
if (n > numExistingKeys) {
11781152
throw new IllegalArgumentException("Requested unique deletes is greater than number of available keys");
11791153
}
@@ -1191,12 +1165,12 @@ public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String instantTime,
11911165
numExistingKeys--;
11921166
used.add(kp);
11931167
try {
1194-
result.add(new HoodieAvroRecord(kp.key, generateRandomDeleteValue(kp.key, instantTime)));
1168+
result.add(new HoodieAvroRecord(kp.key, generateRandomDeleteValuePerSchema(kp.key, instantTime, schemaStr)));
11951169
} catch (IOException e) {
11961170
throw new HoodieIOException(e.getMessage(), e);
11971171
}
11981172
}
1199-
numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
1173+
numKeysBySchema.put(schemaStr, numExistingKeys);
12001174
return result.stream();
12011175
}
12021176

@@ -1262,7 +1236,7 @@ public void close() {
12621236

12631237
private static long genRandomTimeMillis(Random r) {
12641238
// Fri Feb 13 15:31:30 PST 2009
1265-
long anchorTs = 1234567890L;
1239+
long anchorTs = 1234567890000L;
12661240
// NOTE: To provide for certainty and not generate overly random dates, we will limit
12671241
// dispersion to be w/in +/- 3 days from the anchor date
12681242
return anchorTs + r.nextLong() % 259200000L;

0 commit comments

Comments
 (0)