Skip to content

Commit ef85cbc

Browse files
Add support for WRITE_TRUNCATE_DATA in BQ Sink
1 parent 487b518 commit ef85cbc

File tree

10 files changed

+140
-14
lines changed

10 files changed

+140
-14
lines changed

docs/BigQueryMultiTable-batchsink.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ If the bucket was created automatically, it will be deleted after the run finish
5959
**Truncate Table:** Whether or not to truncate the table before writing to it.
6060
Should only be used with the Insert operation.
6161

62+
**Write Disposition**: Describes whether a job should truncate table but preserve metadata or not.
63+
For more details, see [here](https://docs.cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.WriteDisposition).
64+
6265
**Location:** The location where the big query datasets will get created. This value is ignored
6366
if the dataset or temporary bucket already exist.
6467

docs/BigQueryTable-batchsink.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ will be dropped.
7676
**Truncate Table**: Whether or not to truncate the table before writing to it.
7777
Should only be used with the Insert operation.
7878

79+
**Write Disposition**: Describes whether a job should truncate table but preserve metadata or not.
80+
For more details, see [here](https://docs.cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.WriteDisposition).
81+
7982
**Table Key**: List of fields that determines relation between tables during Update and Upsert operations.
8083

8184
**Dedupe By**: Column names and sort order used to choose which input record to update/upsert when there are

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
<groupId>io.cdap.plugin</groupId>
2222
<artifactId>google-cloud</artifactId>
23-
<version>0.24.2</version>
23+
<version>0.24.3-SNAPSHOT</version>
2424
<name>Google Cloud Plugins</name>
2525
<packaging>jar</packaging>
2626
<description>Plugins for Google Big Query</description>

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ private Configuration getBaseConfiguration(@Nullable CryptoKeyName cmekKeyName)
270270
baseConfiguration.setBoolean(BigQueryConstants.CONFIG_ALLOW_SCHEMA_RELAXATION,
271271
config.isAllowSchemaRelaxation());
272272
baseConfiguration.setStrings(BigQueryConfiguration.OUTPUT_TABLE_WRITE_DISPOSITION.getKey(),
273-
config.getWriteDisposition().name());
273+
config.getWriteDisposition());
274274
baseConfiguration.setStrings(BigQueryConstants.CONFIG_JSON_STRING_FIELDS, config.getJsonStringFields());
275275
// this setting is needed because gcs has default chunk size of 64MB. This is large default chunk size which can
276276
// cause OOM issue if there are many tables being written. See this - CDAP-16670

src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySinkConfig.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
4949
Schema.Type.BOOLEAN, Schema.Type.BYTES, Schema.Type.ARRAY, Schema.Type.RECORD);
5050

5151
public static final String NAME_TRUNCATE_TABLE = "truncateTable";
52+
public static final String NAME_WRITE_DISPOSITION = "writeDisposition";
5253
public static final String NAME_LOCATION = "location";
5354
private static final String NAME_GCS_CHUNK_SIZE = "gcsChunkSize";
5455
public static final String NAME_BQ_JOB_LABELS = "jobLabels";
@@ -79,9 +80,19 @@ public abstract class AbstractBigQuerySinkConfig extends BigQueryBaseConfig {
7980
@Macro
8081
@Nullable
8182
@Description("Whether or not to truncate the table before writing to it. "
82-
+ "Should only be used with the Insert operation. This could overwrite the table schema")
83+
+ "Should only be used with the Insert operation. This could overwrite the table schema based "
84+
+ "on write disposition value.")
8385
protected Boolean truncateTable;
8486

87+
@Name(NAME_WRITE_DISPOSITION)
88+
@Macro
89+
@Nullable
90+
@Description("WRITE_TRUNCATE_DATA preserves the table metadata where as WRITE_TRUNCATE does not. "
91+
+ "For more details, see "
92+
+ "https://docs.cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/"
93+
+ "BigQueryAuditMetadata.WriteDisposition.")
94+
protected String writeDisposition;
95+
8596
@Name(NAME_LOCATION)
8697
@Macro
8798
@Nullable
@@ -155,9 +166,16 @@ public boolean isAllowSchemaRelaxation() {
155166
return allowSchemaRelaxation == null ? false : allowSchemaRelaxation;
156167
}
157168

158-
public JobInfo.WriteDisposition getWriteDisposition() {
159-
return isTruncateTableSet() ? JobInfo.WriteDisposition.WRITE_TRUNCATE
160-
: JobInfo.WriteDisposition.WRITE_APPEND;
169+
private String getTruncateTableWriteDisposition() {
170+
if (writeDisposition == null) {
171+
return JobInfo.WriteDisposition.WRITE_TRUNCATE.name();
172+
}
173+
return writeDisposition;
174+
}
175+
176+
public String getWriteDisposition() {
177+
return isTruncateTableSet() ? getTruncateTableWriteDisposition()
178+
: JobInfo.WriteDisposition.WRITE_APPEND.name();
161179
}
162180

163181
public boolean isTruncateTableSet() {

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQueryOutputFormat.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
363363

364364
Map<String, String> fieldDescriptions = new HashMap<>();
365365
if (JobInfo.WriteDisposition.WRITE_TRUNCATE
366-
.equals(JobInfo.WriteDisposition.valueOf(writeDisposition)) && tableExists) {
366+
.equals(writeDisposition.toUpperCase()) && tableExists) {
367367
List<TableFieldSchema> tableFieldSchemas = Optional.ofNullable(bigQueryHelper.getTable(tableRef))
368368
.map(it -> it.getSchema())
369369
.map(it -> it.getFields())
@@ -411,8 +411,8 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
411411
// Schema update options should only be specified with WRITE_APPEND disposition,
412412
// or with WRITE_TRUNCATE disposition on a table partition - The logic below should change when we support
413413
// insertion into single partition
414-
if (allowSchemaRelaxation && !JobInfo.WriteDisposition.WRITE_TRUNCATE
415-
.equals(JobInfo.WriteDisposition.valueOf(writeDisposition))) {
414+
if (allowSchemaRelaxation && !JobInfo.WriteDisposition.WRITE_TRUNCATE.name()
415+
.equals(writeDisposition.toUpperCase())) {
416416
loadConfig.setSchemaUpdateOptions(Arrays.asList(
417417
JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION.name(),
418418
JobInfo.SchemaUpdateOption.ALLOW_FIELD_RELAXATION.name()));
@@ -440,7 +440,7 @@ private void importFromGcs(String projectId, TableReference tableRef, @Nullable
440440
if (operation.equals(Operation.INSERT) && gcsPaths.size() <= BQ_IMPORT_MAX_BATCH_SIZE) {
441441
// Directly load data into destination table when total no of input paths is loadable into BQ
442442
loadConfig.setSourceUris(gcsPaths);
443-
loadConfig.setWriteDisposition(writeDisposition);
443+
loadConfig.setWriteDisposition(writeDisposition.toUpperCase());
444444
loadConfig.setDestinationTable(tableRef);
445445

446446
JobConfiguration config = new JobConfiguration();
@@ -499,7 +499,7 @@ private void loadInBatchesInTempTable(TableReference tableRef, JobConfigurationL
499499
.setTableId(temporaryTableName);
500500

501501
loadConfig.setDestinationTable(temporaryTableReference);
502-
loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND.toString());
502+
loadConfig.setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND.name());
503503

504504
// Split the list of files in batches 10000 (current bq load job limit) and import /append onto a temp table
505505
List<List<String>> gcsPathsInBatches = Lists.partition(gcsPaths, BQ_IMPORT_MAX_BATCH_SIZE);
@@ -747,7 +747,7 @@ private static TableSchema createTableSchemaFromFields(String fieldsJson) throws
747747
private void updateFieldDescriptions(String writeDisposition, TableReference tableRef,
748748
Map<String, String> fieldDescriptions) throws IOException {
749749
if (JobInfo.WriteDisposition.WRITE_TRUNCATE
750-
.equals(JobInfo.WriteDisposition.valueOf(writeDisposition))) {
750+
.equals(writeDisposition.toUpperCase())) {
751751

752752
Table table = bigQueryHelper.getTable(tableRef);
753753
List<TableFieldSchema> tableFieldSchemas = Optional.ofNullable(table)

src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,17 @@ private BigQuerySinkConfig(@Nullable String referenceName, @Nullable String proj
199199
@Nullable String serviceAccountJson,
200200
@Nullable String dataset, @Nullable String table, @Nullable String location,
201201
@Nullable String cmekKey, @Nullable String bucket, @Nullable String jobLabelKeyValue,
202-
@Nullable String timePartitioningType) {
202+
@Nullable String timePartitioningType, @Nullable String writeDisposition,
203+
@Nullable Boolean truncateTable) {
203204
super(new BigQueryConnectorConfig(project, project, serviceAccountType,
204205
serviceFilePath, serviceAccountJson), dataset, cmekKey, bucket);
205206
this.referenceName = referenceName;
206207
this.table = table;
207208
this.location = location;
208209
this.jobLabelKeyValue = jobLabelKeyValue;
209210
this.timePartitioningType = timePartitioningType;
211+
this.writeDisposition = writeDisposition;
212+
this.truncateTable = truncateTable;
210213
}
211214

212215
public String getTable() {
@@ -734,6 +737,9 @@ public static class Builder {
734737
private String bucket;
735738
private String jobLabelKeyValue;
736739
private String timePartitioningType;
740+
private String writeDisposition;
741+
private Boolean truncateTable;
742+
737743

738744
public BigQuerySinkConfig.Builder setReferenceName(@Nullable String referenceName) {
739745
this.referenceName = referenceName;
@@ -794,6 +800,16 @@ public BigQuerySinkConfig.Builder setTimePartitioningType(@Nullable String timeP
794800
return this;
795801
}
796802

803+
public BigQuerySinkConfig.Builder setWriteDisposition(@Nullable String writeDisposition) {
804+
this.writeDisposition = writeDisposition;
805+
return this;
806+
}
807+
808+
public BigQuerySinkConfig.Builder setTruncateTable(@Nullable Boolean truncateTable) {
809+
this.truncateTable = truncateTable;
810+
return this;
811+
}
812+
797813
public BigQuerySinkConfig build() {
798814
return new BigQuerySinkConfig(
799815
referenceName,
@@ -807,7 +823,9 @@ public BigQuerySinkConfig build() {
807823
cmekKey,
808824
bucket,
809825
jobLabelKeyValue,
810-
timePartitioningType
826+
timePartitioningType,
827+
writeDisposition,
828+
truncateTable
811829
);
812830
}
813831

src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfigTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.cdap.plugin.gcp.bigquery.sink;
1818

19+
import com.google.cloud.bigquery.JobInfo;
1920
import com.google.cloud.bigquery.TimePartitioning;
2021
import io.cdap.cdap.api.data.schema.Schema;
2122
import io.cdap.cdap.etl.api.FailureCollector;
@@ -56,6 +57,27 @@ public void setup() throws NoSuchMethodException {
5657
arguments = new HashMap<>();
5758
}
5859

60+
@Test
61+
public void testValidateWriteDisposition() {
62+
BigQuerySinkConfig bigQuerySinkConfig =
63+
BigQuerySinkConfig.builder()
64+
.setTruncateTable(true)
65+
.setWriteDisposition("WRITE_TRUNCATE_DATA")
66+
.build();
67+
Assert.assertEquals("WRITE_TRUNCATE_DATA", bigQuerySinkConfig.getWriteDisposition());
68+
69+
bigQuerySinkConfig = BigQuerySinkConfig.builder().setWriteDisposition("WRITE_APPEND").build();
70+
Assert.assertEquals(bigQuerySinkConfig.getWriteDisposition(),
71+
JobInfo.WriteDisposition.WRITE_APPEND.name());
72+
73+
bigQuerySinkConfig = BigQuerySinkConfig.builder()
74+
.setTruncateTable(true)
75+
.setWriteDisposition("WRITE_TRUNCATE")
76+
.build();
77+
Assert.assertEquals(bigQuerySinkConfig.getWriteDisposition(),
78+
JobInfo.WriteDisposition.WRITE_TRUNCATE.name());
79+
}
80+
5981
@Test
6082
public void testValidateTimePartitioningColumnWithHourAndDate() throws
6183
InvocationTargetException, IllegalAccessException {

widgets/BigQueryMultiTable-batchsink.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,25 @@
124124
"label": "False"
125125
}
126126
}
127+
},
128+
{
129+
"name": "writeDisposition",
130+
"widget-type": "radio-group",
131+
"label": "Write Disposition",
132+
"widget-attributes": {
133+
"layout": "inline",
134+
"default": "WRITE_TRUNCATE",
135+
"options": [
136+
{
137+
"id": "WRITE_TRUNCATE",
138+
"label": "WRITE_TRUNCATE"
139+
},
140+
{
141+
"id": "WRITE_TRUNCATE_DATA",
142+
"label": "WRITE_TRUNCATE_DATA"
143+
}
144+
]
145+
}
127146
}
128147
]
129148
},
@@ -296,6 +315,18 @@
296315
"name": "connection"
297316
}
298317
]
318+
},
319+
{
320+
"name": "showWriteDisposition",
321+
"condition": {
322+
"expression": "truncateTable == true"
323+
},
324+
"show": [
325+
{
326+
"type": "property",
327+
"name": "writeDisposition"
328+
}
329+
]
299330
}
300331
],
301332
"jump-config": {

widgets/BigQueryTable-batchsink.json

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,25 @@
221221
}
222222
}
223223
},
224+
{
225+
"name": "writeDisposition",
226+
"widget-type": "radio-group",
227+
"label": "Write Disposition",
228+
"widget-attributes": {
229+
"layout": "inline",
230+
"default": "WRITE_TRUNCATE",
231+
"options": [
232+
{
233+
"id": "WRITE_TRUNCATE",
234+
"label": "WRITE_TRUNCATE"
235+
},
236+
{
237+
"id": "WRITE_TRUNCATE_DATA",
238+
"label": "WRITE_TRUNCATE_DATA"
239+
}
240+
]
241+
}
242+
},
224243
{
225244
"name": "allowSchemaRelaxation",
226245
"widget-type": "toggle",
@@ -551,6 +570,18 @@
551570
"name": "relationTableKey"
552571
}
553572
]
573+
},
574+
{
575+
"name": "showWriteDisposition",
576+
"condition": {
577+
"expression": "truncateTable == true"
578+
},
579+
"show": [
580+
{
581+
"type": "property",
582+
"name": "writeDisposition"
583+
}
584+
]
554585
}
555586
],
556587
"jump-config": {

0 commit comments

Comments
 (0)