Skip to content

Commit 01da263

Browse files
committed
#797 Improve code coverage and remove redundant validation method.
1 parent bfa90d6 commit 01da263

File tree

3 files changed

+152
-32
lines changed

3 files changed

+152
-32
lines changed

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/parameters/CobolParametersValidator.scala

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package za.co.absa.cobrix.spark.cobol.source.parameters
1818

1919
import org.apache.hadoop.conf.Configuration
2020
import org.apache.hadoop.fs.Path
21-
import org.apache.spark.SparkConf
2221
import org.slf4j.LoggerFactory
2322
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
2423
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
@@ -55,27 +54,6 @@ object CobolParametersValidator {
5554
}
5655
}
5756

58-
/**
59-
* Validates the provided Spark and Hadoop configuration objects and throws an exception
60-
* if any inconsistency or issue is detected in the configurations.
61-
*
62-
* The method extracts essential parameters from the Spark configuration, validates these
63-
* parameters alongside the Hadoop configuration object, and ensures that all required
64-
* settings are present and correct. Conflicting or missing configurations are identified
65-
* and result in exceptions being thrown.
66-
*
67-
* @param sparkConf the Spark configuration object containing parameters necessary for
68-
* this validation step.
69-
* @param hadoopConf the Hadoop configuration object used to perform validation tasks
70-
* related to Hadoop filesystem operations.
71-
* @return this method does not return any value. It throws an exception if any validation fails.
72-
*/
73-
def validateOrThrow(sparkConf: SparkConf, hadoopConf: Configuration): Unit = {
74-
val parameters = Map[String, String](PARAM_COPYBOOK_PATH -> sparkConf.get(PARAM_COPYBOOK_PATH), PARAM_SOURCE_PATH -> sparkConf.get
75-
(PARAM_SOURCE_PATH))
76-
validateOrThrow(parameters, hadoopConf)
77-
}
78-
7957
/**
8058
* Validates the provided parameters for processing COBOL data and throws an exception
8159
* if any inconsistency or issue is detected in the configurations.

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/CobolStreamer.scala

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,8 @@ import org.apache.spark.sql.Row
2020
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
2121
import org.apache.spark.streaming.StreamingContext
2222
import org.apache.spark.streaming.dstream.DStream
23-
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat
24-
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
25-
import za.co.absa.cobrix.cobol.parser.policies.{FillerNamingPolicy, StringTrimmingPolicy}
26-
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
27-
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
2823
import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser._
24+
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
2925
import za.co.absa.cobrix.spark.cobol.reader.{FixedLenNestedReader, FixedLenReader}
3026
import za.co.absa.cobrix.spark.cobol.source.parameters.CobolParametersValidator
3127
import za.co.absa.cobrix.spark.cobol.utils.HDFSUtils
@@ -43,10 +39,15 @@ object CobolStreamer {
4339
}
4440

4541
implicit class Deserializer(@transient val ssc: StreamingContext) extends Serializable {
42+
val parameters: Map[String, String] = Map[String, String](
43+
PARAM_COPYBOOK_PATH -> ssc.sparkContext.getConf.get(PARAM_COPYBOOK_PATH),
44+
PARAM_SOURCE_PATH -> ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH)
45+
)
46+
47+
CobolParametersValidator.validateOrThrow(parameters, ssc.sparkContext.hadoopConfiguration)
48+
49+
val reader: FixedLenReader = CobolStreamer.getReader(ssc)
4650

47-
CobolParametersValidator.validateOrThrow(ssc.sparkContext.getConf, ssc.sparkContext.hadoopConfiguration)
48-
val reader = CobolStreamer.getReader(ssc)
49-
5051
def cobolStream(): DStream[Row] = {
5152
ssc
5253
.binaryRecordsStream(ssc.sparkContext.getConf.get(PARAM_SOURCE_PATH), reader.getCobolSchema.getRecordSize)
@@ -59,4 +60,4 @@ object CobolStreamer {
5960
}
6061
}
6162

62-
}
63+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/writer/NestedWriterSuite.scala

Lines changed: 142 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,84 @@ class NestedWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFi
5151
| 10 CITY PIC X(10).
5252
| 05 cnt-2 PIC 9(1).
5353
| 05 PEOPLE
54-
| OCCURS 0 TO 3 DEPENDING ON CNT-2.
54+
| OCCURS 0 TO 3 DEPENDING ON cnt-2.
5555
| 10 NAME PIC X(14).
5656
| 10 FILLER PIC X(1).
5757
| 10 PHONE-NUMBER PIC X(12).
5858
|""".stripMargin
5959

60+
"getFieldDefinition" should {
61+
"support alphanumeric PIC" in {
62+
val copybookContents =
63+
""" 01 RECORD.
64+
05 NAME PIC X(10).
65+
"""
66+
67+
val copybook = CopybookParser.parse(copybookContents)
68+
val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive]
69+
70+
val actual = NestedRecordCombiner.getFieldDefinition(nameField)
71+
72+
assert(actual == "X(10)")
73+
}
74+
75+
"support integral with COMP" in {
76+
val copybookContents =
77+
""" 01 RECORD.
78+
05 NAME PIC 9(10) USAGE IS COMP.
79+
"""
80+
81+
val copybook = CopybookParser.parse(copybookContents)
82+
val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive]
83+
84+
val actual = NestedRecordCombiner.getFieldDefinition(nameField)
85+
86+
assert(actual == "9(10) COMP-4")
87+
}
88+
89+
"support integral DISPLAY" in {
90+
val copybookContents =
91+
""" 01 RECORD.
92+
05 NAME PIC 9(10).
93+
"""
94+
95+
val copybook = CopybookParser.parse(copybookContents)
96+
val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive]
97+
98+
val actual = NestedRecordCombiner.getFieldDefinition(nameField)
99+
100+
assert(actual == "9(10) USAGE IS DISPLAY")
101+
}
102+
103+
"support decimal with COMP" in {
104+
val copybookContents =
105+
""" 01 RECORD.
106+
05 NAME PIC S9(5)V99 USAGE IS COMP.
107+
"""
108+
109+
val copybook = CopybookParser.parse(copybookContents)
110+
val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive]
111+
112+
val actual = NestedRecordCombiner.getFieldDefinition(nameField)
113+
114+
assert(actual == "S9(5)V99 COMP-4")
115+
}
116+
117+
"support decimal DISPLAY" in {
118+
val copybookContents =
119+
""" 01 RECORD.
120+
05 NAME PIC S9(5)V99 USAGE IS DISPLAY.
121+
"""
122+
123+
val copybook = CopybookParser.parse(copybookContents)
124+
val nameField = copybook.getFieldByName("NAME").asInstanceOf[Primitive]
125+
126+
val actual = NestedRecordCombiner.getFieldDefinition(nameField)
127+
128+
assert(actual == "S9(5)V99 USAGE IS DISPLAY")
129+
}
130+
}
131+
60132
"writer" should {
61133
"write the dataframe with OCCURS" in {
62134
val exampleJsons = Seq(
@@ -292,6 +364,75 @@ class NestedWriterSuite extends AnyWordSpec with SparkTestBase with BinaryFileFi
292364
}
293365
}
294366
}
367+
368+
"write the dataframe with OCCURS DEPENDING ON and variable length occurs and null values" in {
369+
val exampleJsons = Seq(
370+
"""{"ID":1,"NUMBERS":[10,20,30],"PLACE":{"COUNTRY_CODE":"US","CITY":"New York"}}""",
371+
"""{"ID":2,"PLACE":{"COUNTRY_CODE":"ZA","CITY":"Cape Town"},"PEOPLE":[{"NAME":"Test User","PHONE_NUMBER":"555-1235"}]}"""
372+
)
373+
374+
import spark.implicits._
375+
376+
val df = spark.read.json(exampleJsons.toDS())
377+
.select("ID", "NUMBERS", "PLACE", "PEOPLE")
378+
379+
withTempDirectory("cobol_writer3") { tempDir =>
380+
val path = new Path(tempDir, "writer3")
381+
382+
df.coalesce(1)
383+
.orderBy("id")
384+
.write
385+
.format("cobol")
386+
.mode(SaveMode.Overwrite)
387+
.option("copybook_contents", copybookWithDependingOn)
388+
.option("record_format", "V")
389+
.option("is_rdw_big_endian", "false")
390+
.option("is_rdw_part_of_record_length", "true")
391+
.option("variable_size_occurs", "true")
392+
.save(path.toString)
393+
394+
// val df2 = spark.read.format("cobol")
395+
// .option("copybook_contents", copybookWithDependingOn)
396+
// .option("record_format", "V")
397+
// .option("is_rdw_big_endian", "false")
398+
// .option("is_rdw_part_of_record_length", "true")
399+
// .option("variable_size_occurs", "true")
400+
// .load(path.toString)
401+
// println(SparkUtils.convertDataFrameToPrettyJSON(df2))
402+
403+
val fs = path.getFileSystem(spark.sparkContext.hadoopConfiguration)
404+
405+
assert(fs.exists(path), "Output directory should exist")
406+
val files = fs.listStatus(path)
407+
.filter(_.getPath.getName.startsWith("part-"))
408+
assert(files.nonEmpty, "Output directory should contain part files")
409+
410+
val partFile = files.head.getPath
411+
val data = fs.open(partFile)
412+
val bytes = new Array[Byte](files.head.getLen.toInt)
413+
data.readFully(bytes)
414+
data.close()
415+
416+
// Expected EBCDIC data for sample test data
417+
val expected = Array(
418+
0x1B, 0x00, 0x00, 0x00, // RDW record 0
419+
0xF0, 0xF1, 0x00, 0xF3, 0xF1, 0xF0, 0xF2, 0xF0, 0xF3, 0xF0, 0xE4, 0xE2, 0xD5, 0x85, 0xA6, 0x40, 0xE8, 0x96,
420+
0x99, 0x92, 0x40, 0x40, 0xF0,
421+
0x30, 0x00, 0x00, 0x00,
422+
0xF0, 0xF2, 0x00, 0xF0, 0xE9, 0xC1, 0xC3, 0x81, 0x97, 0x85, 0x40, 0xE3, 0x96, 0xA6, 0x95, 0x40, 0xF1, 0xE3,
423+
0x85, 0xA2, 0xA3, 0x40, 0xE4, 0xA2, 0x85, 0x99, 0x40, 0x40, 0x40, 0x40, 0x40, 0x00, 0xF5, 0xF5, 0xF5, 0xCA,
424+
0xF1, 0xF2, 0xF3, 0xF5, 0x40, 0x40, 0x40, 0x40
425+
).map(_.toByte)
426+
427+
if (!bytes.sameElements(expected)) {
428+
println(s"Expected bytes: ${expected.map("%02X" format _).mkString(" ")}")
429+
println(s"Actual bytes: ${bytes.map("%02X" format _).mkString(" ")}")
430+
println(s"Actual bytes: ${bytes.map("0x%02X" format _).mkString(", ")}")
431+
432+
assert(bytes.sameElements(expected), "Written data should match expected EBCDIC encoding")
433+
}
434+
}
435+
}
295436
}
296437

297438
"constructWriterAst" should {

0 commit comments

Comments
 (0)