@@ -34,6 +34,14 @@ class NestedRecordCombiner extends RecordCombiner {
3434
3535 import NestedRecordCombiner ._
3636
37+ /**
38+ * Converts Spark DataFrame to the RDD with data in mainframe format as arrays of bytes, each array being a record.
39+ *
40+ * @param df The input DataFrame
41+ * @param cobolSchema The output COBOL schema
42+ * @param readerParameters The reader properties which are actually writer properties parsed as spark-cobol options
43+ * @return The RDD of records in mainframe format
44+ */
3745 override def combine (df : DataFrame , cobolSchema : CobolSchema , readerParameters : ReaderParameters ): RDD [Array [Byte ]] = {
3846 val hasRdw = readerParameters.recordFormat == RecordFormat .VariableLength
3947 val isRdwBigEndian = readerParameters.isRdwBigEndian
@@ -64,15 +72,25 @@ class NestedRecordCombiner extends RecordCombiner {
6472 s " RDW length $recordLengthLong exceeds ${Int .MaxValue } and cannot be encoded safely. "
6573 )
6674 }
67- val recordLength = recordLengthLong.toInt
68-
69- processRDD(df.rdd, cobolSchema.copybook, df.schema, size, recordLength, startOffset, hasRdw, isRdwBigEndian)
75+ processRDD(df.rdd, cobolSchema.copybook, df.schema, size, adjustment1 + adjustment2, startOffset, hasRdw, isRdwBigEndian, readerParameters.variableSizeOccurs)
7076 }
7177}
7278
7379object NestedRecordCombiner {
7480 private val log = LoggerFactory .getLogger(this .getClass)
7581
82+ /**
83+ * Generates a field definition string containing the PIC clause and USAGE clause for a primitive COBOL field.
84+ *
85+ * This method extracts the picture clause (PIC) and usage information from the field's data type
86+ * and combines them into a single definition string. For integral and decimal types with compact
87+ * encoding, it includes the USAGE clause; otherwise, the default DISPLAY usage is assumed or omitted.
88+ *
89+ * The purpose is to render COBOL field name and type in exceptions and log messages.
90+ *
91+ * @param field The primitive field whose definition string should be generated
92+ * @return A string containing the PIC clause and optional USAGE clause, with any trailing whitespace trimmed
93+ */
7694 def getFieldDefinition (field : Primitive ): String = {
7795 val pic = field.dataType.originalPic.getOrElse(field.dataType.pic)
7896
@@ -85,41 +103,101 @@ object NestedRecordCombiner {
85103 s " $pic $usage" .trim
86104 }
87105
106+ /**
107+ * Constructs a writer AST (Abstract Syntax Tree) from a copybook and Spark schema for serialization purposes.
108+ *
109+ * This method creates a hierarchical structure of WriterAst nodes that maps the copybook structure to the
110+ * corresponding Spark schema. The resulting GroupField can be used to serialize Spark Rows into binary format
111+ * according to the copybook specification. The AST contains getter functions that extract values from Rows
112+ * and metadata needed to write those values to the correct byte positions in the output buffer.
113+ *
114+ * The purpose of WriterAst class hierarchy is to provide memory and CPU efficient way of creating binary
115+ * records from Spark dataframes. It links Cobol schema and Spark schema in a single tree.
116+ *
117+ * @param copybook The copybook definition describing the binary record layout and field specifications
118+ * @param schema The Spark StructType schema that corresponds to the structure of the data to be written
119+ * @return A GroupField representing the root of the writer AST, containing all non-filler, non-redefines
120+ * fields with their associated getter functions and position information for binary serialization
121+ */
88122 def constructWriterAst (copybook : Copybook , schema : StructType ): GroupField = {
89123 buildGroupField(getAst(copybook), schema, row => row, " " , new mutable.HashMap [String , DependingOnField ]())
90124 }
91125
92- def processRDD (rdd : RDD [Row ], copybook : Copybook , schema : StructType , recordSize : Int , recordLengthHeader : Int , startOffset : Int , hasRdw : Boolean , isRdwBigEndian : Boolean ): RDD [Array [Byte ]] = {
126+ /**
127+ * Processes an RDD of Spark Rows and converts them into an RDD of byte arrays according to the copybook specification.
128+ *
129+ * The resulting RDD can then be written to storage as files in mainframe format (usually, EBCDIC).
130+ *
131+ * Each Row is transformed into a fixed or variable-length byte array representation based on the copybook layout.
132+ *
133+ * Variable-record-length records supported are ones that have RDW headers (big-endian or little-endian).
134+ * For variable-length records with OCCURS DEPENDING ON, the output may be trimmed to the actual bytes written.
135+ *
136+ * @param rdd The input RDD containing Spark Rows to be converted to binary format
137+ * @param copybook The copybook definition that describes the binary record layout and field specifications
138+ * @param schema The Spark StructType schema that corresponds to the structure of the input Rows
139+ * @param recordSize The maximum size in bytes allocated for each output record
140+ * @param recordLengthAdj An adjustment value added to the bytes written when computing the RDW length field
141+ * @param startOffset The byte offset at which data writing should begin, typically 0 for fixed-length or 4 for RDW records
142+ * @param hasRdw A flag indicating whether to prepend a Record Descriptor Word header to each output record
143+ * @param isRdwBigEndian A flag indicating the byte order for the RDW header, true for big-endian, false for little-endian
144+ * @param variableSizeOccurs A flag indicating whether OCCURS DEPENDING ON fields should use actual element counts rather than maximum sizes
145+ * @return An RDD of byte arrays, where each array represents one record in binary format according to the copybook specification
146+ */
147+ private [cobrix] def processRDD (rdd : RDD [Row ],
148+ copybook : Copybook ,
149+ schema : StructType ,
150+ recordSize : Int ,
151+ recordLengthAdj : Int ,
152+ startOffset : Int ,
153+ hasRdw : Boolean ,
154+ isRdwBigEndian : Boolean ,
155+ variableSizeOccurs : Boolean ): RDD [Array [Byte ]] = {
93156 val writerAst = constructWriterAst(copybook, schema)
94157
95158 rdd.mapPartitions { rows =>
96159 rows.map { row =>
97160 val ar = new Array [Byte ](recordSize)
98161
162+ val bytesWritten = writeToBytes(writerAst, row, ar, startOffset, variableSizeOccurs)
163+
99164 if (hasRdw) {
165+ val recordLengthToWriteToRDW = bytesWritten + recordLengthAdj
166+
100167 if (isRdwBigEndian) {
101- ar(0 ) = ((recordLengthHeader >> 8 ) & 0xFF ).toByte
102- ar(1 ) = (recordLengthHeader & 0xFF ).toByte
168+ ar(0 ) = ((recordLengthToWriteToRDW >> 8 ) & 0xFF ).toByte
169+ ar(1 ) = (recordLengthToWriteToRDW & 0xFF ).toByte
103170 // The last two bytes are reserved and defined by IBM as binary zeros on all platforms.
104171 ar(2 ) = 0
105172 ar(3 ) = 0
106173 } else {
107- ar(0 ) = (recordLengthHeader & 0xFF ).toByte
108- ar(1 ) = ((recordLengthHeader >> 8 ) & 0xFF ).toByte
174+ ar(0 ) = (recordLengthToWriteToRDW & 0xFF ).toByte
175+ ar(1 ) = ((recordLengthToWriteToRDW >> 8 ) & 0xFF ).toByte
109176 // This is non-standard. But so are little-endian RDW headers.
110177 // As an advantage, it has no effect for small records but adds support for big records (> 64KB).
111- ar(2 ) = ((recordLengthHeader >> 16 ) & 0xFF ).toByte
112- ar(3 ) = ((recordLengthHeader >> 24 ) & 0xFF ).toByte
178+ ar(2 ) = ((recordLengthToWriteToRDW >> 16 ) & 0xFF ).toByte
179+ ar(3 ) = ((recordLengthToWriteToRDW >> 24 ) & 0xFF ).toByte
113180 }
114181 }
115182
116- writeToBytes(writerAst, row, ar, startOffset)
117-
118- ar
183+ if (! variableSizeOccurs || recordSize == bytesWritten + startOffset) {
184+ ar
185+ } else {
186+ java.util.Arrays .copyOf(ar, bytesWritten + startOffset)
187+ }
119188 }
120189 }
121190 }
122191
192+ /**
193+ * Retrieves the appropriate AST (Abstract Syntax Tree) group from a copybook.
194+ * If the root AST has exactly one child and that child is a Group, returns that child.
195+ * Otherwise, returns the root AST itself. This normalization ensures consistent handling
196+ * of copybook structures regardless of whether they have a single top-level group or multiple elements.
197+ *
198+ * @param copybook The copybook object containing the AST structure to extract from
199+ * @return The normalized Group representing the copybook structure, either the single child group or the root AST
200+ */
123201 def getAst (copybook : Copybook ): Group = {
124202 val rootAst = copybook.ast
125203
@@ -134,10 +212,11 @@ object NestedRecordCombiner {
134212 * Recursively walks the copybook group and the Spark StructType in lockstep, producing
135213 * [[WriterAst ]] nodes whose getters extract the correct value from a [[org.apache.spark.sql.Row ]].
136214 *
137- * @param group A copybook Group node whose children will be processed.
138- * @param schema The Spark StructType that corresponds to `group`.
139- * @param getter A function that, given the "outer" Row, returns the Row that belongs to this group.
140- * @param path The path to the field
215+ * @param group A copybook Group node whose children will be processed.
216+ * @param schema The Spark StructType that corresponds to `group`.
217+ * @param getter A function that, given the "outer" Row, returns the Row that belongs to this group.
218+ * @param path The path to the field
219+ * @param dependeeMap A map of field names to their corresponding DependingOnField specs, used to resolve dependencies for OCCURS DEPENDING ON fields.
141220 * @return A [[GroupField ]] covering all non-filler, non-redefines children found in both
142221 * the copybook and the Spark schema.
143222 */
@@ -159,6 +238,19 @@ object NestedRecordCombiner {
159238 * Returns a filler when the field is absent from the schema (e.g. filtered out during reading).
160239 */
161240 private def buildPrimitiveNode (p : Primitive , schema : StructType , path : String , dependeeMap : mutable.HashMap [String , DependingOnField ]): WriterAst = {
241+ def addDependee (): DependingOnField = {
242+ val spec = DependingOnField (p, p.binaryProperties.offset)
243+ val uppercaseName = p.name.toUpperCase()
244+ if (dependeeMap.contains(uppercaseName)) {
245+ throw new IllegalArgumentException (s " Duplicate field name ' ${p.name}' found in copybook. " +
246+ s " Field names must be unique (case-insensitive) when OCCURS DEPENDING ON is used. " +
247+ s " Already found a dependee field with the same name at line ${dependeeMap(uppercaseName).cobolField.lineNumber}, " +
248+ s " current field line number: ${p.lineNumber}. " )
249+ }
250+ dependeeMap += (p.name.toUpperCase -> spec)
251+ spec
252+ }
253+
162254 val fieldName = p.name
163255 val fieldIndexOpt = schema.fields.zipWithIndex.find { case (field, _) =>
164256 field.name.equalsIgnoreCase(fieldName)
@@ -180,19 +272,15 @@ object NestedRecordCombiner {
180272 PrimitiveArray (p, row => row.getAs[mutable.WrappedArray [AnyRef ]](idx), dependingOnField)
181273 } else {
182274 if (p.isDependee) {
183- val spec = DependingOnField (p, p.binaryProperties.offset)
184- dependeeMap += (p.name.toUpperCase -> spec)
185- PrimitiveDependeeField (spec)
275+ PrimitiveDependeeField (addDependee())
186276 } else {
187277 PrimitiveField (p, row => row.get(idx))
188278 }
189279 }
190280 }.getOrElse {
191281 // Dependee fields need not to be defines in Spark schema.
192282 if (p.isDependee) {
193- val spec = DependingOnField (p, p.binaryProperties.offset)
194- dependeeMap += (p.name.toUpperCase -> spec)
195- PrimitiveDependeeField (spec)
283+ PrimitiveDependeeField (addDependee())
196284 } else {
197285 log.error(s " Field ' $path${p.name}' is not found in Spark schema. Will be replaced by filler. " )
198286 Filler (p.binaryProperties.actualSize)
@@ -258,12 +346,15 @@ object NestedRecordCombiner {
258346 * supplied. The row array may contain fewer elements than the copybook allows — any
259347 * missing tail elements are silently skipped, leaving those bytes as zeroes.
260348 *
261- * @param ast The [[WriterAst ]] node to process.
262- * @param row The Spark [[Row ]] from which values are read.
263- * @param ar The target byte array (record buffer).
264- * @param currentOffset RDW prefix length (0 for fixed-length records, 4 for variable).
349+ * @param ast The [[WriterAst ]] node to process.
350+ * @param row The Spark [[Row ]] from which values are read.
351+ * @param ar The target byte array (record buffer).
352+ * @param currentOffset RDW prefix length (0 for fixed-length records, 4 for variable).
353+ * @param variableLengthOccurs A flag indicating whether size of OCCURS DEPENDING ON should match the number of elements
354+ * and not always fixed.
355+ * @throws IllegalArgumentException if a field value cannot be encoded according to the copybook definition.
265356 */
266- private def writeToBytes (ast : WriterAst , row : Row , ar : Array [Byte ], currentOffset : Int ): Int = {
357+ private def writeToBytes (ast : WriterAst , row : Row , ar : Array [Byte ], currentOffset : Int , variableLengthOccurs : Boolean ): Int = {
267358 ast match {
268359 // ── Filler ──────────────────────────────────────────────────────────────
269360 case Filler (size) => size
@@ -286,11 +377,14 @@ object NestedRecordCombiner {
286377 val nestedRow = getter(row)
287378 if (nestedRow != null ) {
288379 var writtenBytes = 0
289- children.foreach(child =>
290- writtenBytes += writeToBytes(child, nestedRow, ar, currentOffset + writtenBytes)
291- )
380+ children.foreach { child =>
381+ val written = writeToBytes(child, nestedRow, ar, currentOffset + writtenBytes, variableLengthOccurs)
382+ writtenBytes += written
383+ }
384+ writtenBytes
385+ } else {
386+ cobolField.binaryProperties.actualSize
292387 }
293- cobolField.binaryProperties.actualSize
294388
295389 // ── Array of primitives (OCCURS on a primitive field) ───────────────────
296390 case PrimitiveArray (cobolField, arrayGetter, dependingOn) =>
@@ -315,8 +409,22 @@ object NestedRecordCombiner {
315409 dependingOn.foreach(spec =>
316410 Copybook .setPrimitiveField(spec.cobolField, ar, elementsToWrite, fieldStartOffsetOverride = spec.baseOffset)
317411 )
412+ if (variableLengthOccurs) {
413+ // For variable-length OCCURS, the actual size is determined by the number of elements written.
414+ elementSize * elementsToWrite
415+ } else {
416+ cobolField.binaryProperties.actualSize
417+ }
418+ } else {
419+ if (variableLengthOccurs) {
420+ dependingOn.foreach(spec =>
421+ Copybook .setPrimitiveField(spec.cobolField, ar, 0 , fieldStartOffsetOverride = spec.baseOffset)
422+ )
423+ 0
424+ } else {
425+ cobolField.binaryProperties.actualSize
426+ }
318427 }
319- cobolField.binaryProperties.actualSize
320428
321429 // ── Array of groups (OCCURS on a group field) ───────────────────────────
322430 case GroupArray (groupField : GroupField , cobolField, arrayGetter, dependingOn) =>
@@ -334,15 +442,29 @@ object NestedRecordCombiner {
334442 // Build an adjusted element offset so that each child's base offset
335443 // (which is relative to the group's base) lands at the correct position in ar.
336444 val elementStartOffset = baseOffset + i * elementSize
337- writeToBytes(groupField, elementRow, ar, elementStartOffset)
445+ writeToBytes(groupField, elementRow, ar, elementStartOffset, variableLengthOccurs )
338446 }
339447 i += 1
340448 }
341449 dependingOn.foreach(spec =>
342450 Copybook .setPrimitiveField(spec.cobolField, ar, elementsToWrite, fieldStartOffsetOverride = spec.baseOffset)
343451 )
452+ if (variableLengthOccurs) {
453+ // For variable-length OCCURS, the actual size is determined by the number of elements written.
454+ elementSize * elementsToWrite
455+ } else {
456+ cobolField.binaryProperties.actualSize
457+ }
458+ } else {
459+ if (variableLengthOccurs) {
460+ dependingOn.foreach(spec =>
461+ Copybook .setPrimitiveField(spec.cobolField, ar, 0 , fieldStartOffsetOverride = spec.baseOffset)
462+ )
463+ 0
464+ } else {
465+ cobolField.binaryProperties.actualSize
466+ }
344467 }
345- cobolField.binaryProperties.actualSize
346468 }
347469 }
348470}
0 commit comments