-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathconfiguration.sql
More file actions
87 lines (78 loc) · 3.37 KB
/
configuration.sql
File metadata and controls
87 lines (78 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
-- =========================================
-- PIPELINE: System-wide configuration, e.g. intermediate task outputs, whether to run post-processors, etc.
-- =========================================
CREATE TABLE if not exists configuration_pipeline (
id INTEGER AUTOINCREMENT PRIMARY KEY,
intermediate_output_type STRING NOT NULL,
intermediate_output_s3_bucket STRING,
intermediate_output_dev_profile STRING,
intermediate_output_local_output_path STRING,
run_post_processors BOOLEAN NOT NULL DEFAULT TRUE,
publish_load_finished_events BOOLEAN NOT NULL DEFAULT FALSE,
metrics_type STRING
);
-- =========================================
-- SOURCES: utilities that should get Airflow DAGs
-- =========================================
CREATE TABLE if not exists configuration_sources (
id INTEGER AUTOINCREMENT PRIMARY KEY,
type STRING NOT NULL,
org_id STRING NOT NULL UNIQUE,
timezone STRING NOT NULL,
config VARIANT DEFAULT OBJECT_CONSTRUCT() -- holds type-specific config
);
-- Example: query sources of type 'aclara' and extract sftp_host:
-- SELECT id, config:sftp_host::STRING AS sftp_host FROM configuration_sources WHERE type='aclara';
-- =========================================
-- SINKS: AMI data storage systems (Snowflake, S3, etc.)
-- =========================================
CREATE TABLE if not exists configuration_sinks (
id STRING PRIMARY KEY, -- e.g. "cadc_snowflake"
type STRING NOT NULL
);
-- =========================================
-- SOURCES ↔ SINKS (many-to-many)
-- =========================================
CREATE TABLE if not exists configuration_source_sinks (
source_id INTEGER REFERENCES configuration_sources(id) ON DELETE CASCADE,
sink_id STRING REFERENCES configuration_sinks(id) ON DELETE CASCADE,
PRIMARY KEY (source_id, sink_id)
);
-- =========================================
-- SINK CHECKS: data quality checks linked to a sink
-- =========================================
CREATE TABLE if not exists configuration_sink_checks (
id INTEGER AUTOINCREMENT PRIMARY KEY,
sink_id STRING REFERENCES configuration_sinks(id) ON DELETE CASCADE,
check_name STRING NOT NULL
);
-- =========================================
-- NOTIFICATIONS: keyed by event type (dag_failure, etc.)
-- =========================================
CREATE TABLE if not exists configuration_notifications (
id INTEGER AUTOINCREMENT PRIMARY KEY,
event_type STRING NOT NULL UNIQUE,
sns_arn STRING NOT NULL
);
-- =========================================
-- BACKFILLS: automated backfills
-- =========================================
CREATE TABLE if not exists configuration_backfills (
id INTEGER AUTOINCREMENT PRIMARY KEY,
org_id STRING NOT NULL REFERENCES configuration_sources(org_id) ON DELETE CASCADE,
start_date DATE NOT NULL,
end_date DATE NOT NULL,
interval_days INTEGER NOT NULL,
schedule STRING NOT NULL -- cron format
);
-- =========================================
-- Example JSON queries
-- =========================================
-- Get all sources with use_raw_data_cache = true
-- SELECT id, org_id, config:use_raw_data_cache::BOOLEAN AS use_cache
-- FROM configuration_sources
-- WHERE config:use_raw_data_cache::BOOLEAN = TRUE;
-- Update a source's sftp_host
-- UPDATE configuration_sources
-- SET config = OBJECT_INSERT(config, 'sftp_host', 'new_host.example.com')
-- WHERE org_id = 'cadc_coastside';