Skip to content

Commit c5d70d7

Browse files
committed
BQ Source Pushdown Fixes
Remove Snapshot logic from BQ Source Pushdown. Updated table copy logic to include all fields from original table.
1 parent 724e092 commit c5d70d7

File tree

1 file changed

+11
-37
lines changed

1 file changed

+11
-37
lines changed

src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java

Lines changed: 11 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@ private enum BigQueryJobType { QUERY, COPY, COPY_SNAPSHOT };
8484
public static final String SQL_INPUT_CONFIG = "config";
8585
public static final String SQL_INPUT_FIELDS = "fields";
8686
public static final String SQL_INPUT_SCHEMA = "schema";
87-
public static final String BQ_COPY_SNAPSHOT_OP_TYPE = "SNAPSHOT";
8887
private static final java.lang.reflect.Type LIST_OF_STRINGS_TYPE = new TypeToken<ArrayList<String>>() { }.getType();
8988
private static final String BQ_PUSHDOWN_OPERATION_TAG = "read";
9089

@@ -214,24 +213,9 @@ private SQLReadResult readInternal(SQLReadRequest readRequest,
214213
tableTTL = Instant.now().toEpochMilli() + ttlMillis;
215214
}
216215

217-
// no Filter + no view + no material + no external
218-
StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
219-
Type type = tableDefinition.getType();
220-
if (!(type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL)
221-
&& sourceConfig.getFilter() == null) {
222-
// TRY SNAPSHOT
223-
JobConfiguration jobConfiguration = getBQSnapshotJobConf(sourceTableId, destinationTableId);
224-
SQLReadResult snapshotResult = executeBigQueryJob(jobConfiguration, sourceTable, sourceTableId,
225-
BigQueryJobType.COPY_SNAPSHOT, jobLocation);
226-
if (snapshotResult.isSuccessful()) {
227-
BigQuerySQLEngineUtils.updateTableExpiration(bigQuery, destinationTableId, tableTTL);
228-
return snapshotResult;
229-
}
230-
LOG.warn("Big Query Snapshot process used for direct BigQuery read failed. Using fallback table copy strategy.");
231-
}
232-
233-
JobConfiguration queryConfig = getBQQueryJobConfiguration(sourceTable, sourceTableId,
234-
fields,
216+
// Create configuration for table copy job
217+
JobConfiguration queryConfig = getBQQueryJobConfiguration(sourceTable,
218+
sourceTableId,
235219
sourceConfig.getFilter(),
236220
sourceConfig.getPartitionFrom(),
237221
sourceConfig.getPartitionTo(),
@@ -284,19 +268,19 @@ private SQLReadResult executeBigQueryJob(JobConfiguration jobConfiguration,
284268
return SQLReadResult.success(datasetName, this);
285269
}
286270

287-
JobConfiguration getBQQueryJobConfiguration(Table sourceTable, TableId sourceTableId,
288-
List<String> fields,
289-
String filter,
290-
String partitionFromDate,
291-
String partitionToDate,
292-
Long tableTTL) {
271+
private JobConfiguration getBQQueryJobConfiguration(Table sourceTable,
272+
TableId sourceTableId,
273+
String filter,
274+
String partitionFromDate,
275+
String partitionToDate,
276+
Long tableTTL) {
293277

294278
BigQuerySQLEngineUtils.createEmptyTableWithSourceConfig(bigQuery, destinationTableId.getProject(),
295279
destinationTableId.getDataset(), destinationTableId.getTable(),
296280
sourceTable, tableTTL);
297281

298-
String query = String.format("SELECT %s FROM `%s.%s.%s`",
299-
String.join(",", fields),
282+
// Select all fields from source table into destination table
283+
String query = String.format("SELECT * FROM `%s.%s.%s`",
300284
sourceTableId.getProject(),
301285
sourceTableId.getDataset(),
302286
sourceTableId.getTable());
@@ -389,16 +373,6 @@ QueryJobConfiguration.Builder getQueryBuilder(Table sourceTable, TableId sourceT
389373
.setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG));
390374
}
391375

392-
private JobConfiguration getBQSnapshotJobConf(TableId sourceTable, TableId destinationTable) {
393-
CopyJobConfiguration copyJobConfiguration =
394-
CopyJobConfiguration.newBuilder(destinationTable, sourceTable)
395-
.setOperationType(BQ_COPY_SNAPSHOT_OP_TYPE)
396-
.setLabels(BigQuerySQLEngineUtils.getJobTags(BQ_PUSHDOWN_OPERATION_TAG))
397-
.build();
398-
399-
return copyJobConfiguration;
400-
}
401-
402376
/**
403377
* Try to delete this table while handling exception
404378
*

0 commit comments

Comments
 (0)