Skip to content

Commit 198ee19

Browse files
Ken LippoldKen Lippold
authored andcommitted
Added initial monitoring and data product services and APIs.
1 parent 3bed83a commit 198ee19

File tree

44 files changed

+3732
-108
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+3732
-108
lines changed

django/hydroserver/celery.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
import os
22
from celery import Celery
33

4+
from processing.orchestration.tracked_task import TrackedTask
5+
46

57
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hydroserver.settings")
68

79
app = Celery("hydroserver")
10+
app.Task = TrackedTask
811
app.config_from_object("django.conf:settings", namespace="CELERY")
912
app.autodiscover_tasks()

django/interfaces/api/schemas/etl/data_connection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class JSONPayloadPatchBody(BasePatchBody, JSONPayloadResponse):
8383

8484
class PlaceholderVariableResponse(BaseGetResponse):
8585
name: str
86-
variable_type: Literal["runTime", "latestObservationTimestamp", "perTask"] = Field(alias="type")
86+
variable_type: Literal["run_time", "latest_observation_timestamp", "per_task"] = Field(alias="type")
8787

8888

8989
class PlaceholderVariablePostBody(BasePostBody, PlaceholderVariableResponse):

django/interfaces/api/schemas/etl/task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ def resolve_latest_run(obj):
106106
"status": obj.latest_run_status,
107107
"started_at": obj.latest_run_started_at,
108108
"finished_at": obj.latest_run_finished_at,
109-
"message": None,
110-
"result": None,
109+
"message": obj.latest_run_message,
110+
"result": obj.latest_run_result,
111111
}
112112

113113
@staticmethod

django/interfaces/api/schemas/monitoring/__init__.py

Whitespace-only changes.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import uuid
2+
from datetime import datetime
3+
from typing import Optional, Literal, Union
4+
5+
from ninja import Field, Query
6+
7+
from core.types import Unset
8+
from interfaces.api.schemas import (
9+
OrderByField,
10+
BaseGetResponse,
11+
BasePostBody,
12+
BasePatchBody,
13+
CollectionQueryParameters,
14+
DatastreamSummaryResponse,
15+
)
16+
17+
18+
WindowUnits = Literal["minutes", "hours", "days"]
19+
20+
21+
class MonitoringRuleOrderBy(OrderByField):
22+
id = ("id", "id")
23+
rule_type = ("ruleType", "rule_type")
24+
datastream_id = ("datastreamId", "datastream_id")
25+
26+
27+
class MonitoringRuleQueryParameters(CollectionQueryParameters):
28+
order_by: list[MonitoringRuleOrderBy] = Query(
29+
[], description="Select one or more fields to order the response by."
30+
)
31+
datastream: list[uuid.UUID] = Query(
32+
[], description="Filter rules by datastream ID.", alias="datastream_id"
33+
)
34+
rule_type: list[str] = Query(
35+
[], description="Filter rules by rule type."
36+
)
37+
38+
39+
# --- Per-type response schemas ---
40+
41+
class RangeRuleResponse(BaseGetResponse):
42+
rule_type: Literal["range"] = Field(alias="type")
43+
min_value: Optional[float] = None
44+
max_value: Optional[float] = None
45+
46+
47+
class RateOfChangeRuleResponse(BaseGetResponse):
48+
rule_type: Literal["rate_of_change"] = Field(alias="type")
49+
max_change: float
50+
window: int
51+
window_units: WindowUnits
52+
53+
54+
class PersistenceRuleResponse(BaseGetResponse):
55+
rule_type: Literal["persistence"] = Field(alias="type")
56+
persist_value: Optional[float] = None
57+
window: int
58+
window_units: WindowUnits
59+
60+
61+
class MissingDataRuleResponse(BaseGetResponse):
62+
rule_type: Literal["missing_data"] = Field(alias="type")
63+
window: int
64+
window_units: WindowUnits
65+
66+
67+
# --- Per-type post body schemas ---
68+
69+
class RangeRulePostBody(BasePostBody, RangeRuleResponse):
70+
...
71+
72+
73+
class RateOfChangeRulePostBody(BasePostBody, RateOfChangeRuleResponse):
74+
...
75+
76+
77+
class PersistenceRulePostBody(BasePostBody, PersistenceRuleResponse):
78+
...
79+
80+
81+
class MissingDataRulePostBody(BasePostBody, MissingDataRuleResponse):
82+
...
83+
84+
85+
# --- Per-type patch body schemas ---
86+
87+
class RangeRulePatchBody(BasePatchBody, RangeRuleResponse):
88+
...
89+
90+
91+
class RateOfChangeRulePatchBody(BasePatchBody, RateOfChangeRuleResponse):
92+
...
93+
94+
95+
class PersistenceRulePatchBody(BasePatchBody, PersistenceRuleResponse):
96+
...
97+
98+
99+
class MissingDataRulePatchBody(BasePatchBody, MissingDataRuleResponse):
100+
...
101+
102+
103+
# --- Shared resolve_rule helper ---
104+
105+
def _resolve_rule(obj) -> dict:
106+
return {
107+
"rule_type": obj.rule_type,
108+
"min_value": obj.min_value,
109+
"max_value": obj.max_value,
110+
"max_change": obj.max_change,
111+
"persist_value": obj.persist_value,
112+
"window": obj.window,
113+
"window_units": obj.window_units,
114+
}
115+
116+
117+
# --- Top-level rule schemas ---
118+
119+
class MonitoringRuleDetailResponse(BaseGetResponse):
120+
"""Rule details without the datastream — used when nested under a MonitoredDatastreamResponse."""
121+
id: uuid.UUID
122+
last_checked_at: Optional[datetime] = None
123+
rule: Union[RangeRuleResponse, RateOfChangeRuleResponse, PersistenceRuleResponse, MissingDataRuleResponse]
124+
125+
@staticmethod
126+
def resolve_rule(obj):
127+
return _resolve_rule(obj)
128+
129+
130+
class MonitoredDatastreamResponse(BaseGetResponse):
131+
"""A datastream and all its rules for a given monitoring task."""
132+
datastream: DatastreamSummaryResponse
133+
rules: list[MonitoringRuleDetailResponse]
134+
135+
136+
class MonitoringRuleResponse(BaseGetResponse):
137+
id: uuid.UUID
138+
datastream: DatastreamSummaryResponse
139+
last_checked_at: Optional[datetime] = None
140+
rule: Union[RangeRuleResponse, RateOfChangeRuleResponse, PersistenceRuleResponse, MissingDataRuleResponse]
141+
142+
@staticmethod
143+
def resolve_datastream(obj):
144+
return obj.datastream
145+
146+
@staticmethod
147+
def resolve_rule(obj):
148+
return _resolve_rule(obj)
149+
150+
151+
class MonitoringRulePostBody(BasePostBody):
152+
uid: uuid.UUID | Unset = Field(Unset, alias="id")
153+
datastream_id: uuid.UUID
154+
rule: Union[RangeRulePostBody, RateOfChangeRulePostBody, PersistenceRulePostBody, MissingDataRulePostBody]
155+
156+
157+
class MonitoringRulePatchBody(BasePatchBody):
158+
rule: Union[RangeRulePatchBody, RateOfChangeRulePatchBody, PersistenceRulePatchBody, MissingDataRulePatchBody]
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import uuid
2+
from typing import Optional, Literal
3+
from collections import defaultdict
4+
5+
from ninja import Field, Query
6+
7+
from core.types import Unset
8+
from interfaces.api.schemas import (
9+
OrderByField,
10+
BaseGetResponse,
11+
BasePostBody,
12+
BasePatchBody,
13+
CollectionQueryParameters,
14+
ThingSummaryResponse,
15+
)
16+
from interfaces.api.schemas.orchestration.schedule import ScheduleResponse, SchedulePostBody, SchedulePatchBody
17+
from interfaces.api.schemas.orchestration.run import TaskRunResponse
18+
from interfaces.api.schemas.monitoring.rule import MonitoredDatastreamResponse
19+
20+
21+
class MonitoringTaskOrderBy(OrderByField):
22+
id = ("id", "id")
23+
name = ("name", "name")
24+
thing_id = ("thingId", "thing_id")
25+
thing_name = ("thingName", "thing__name")
26+
workspace_id = ("workspaceId", "thing__workspace_id")
27+
workspace_name = ("workspaceName", "thing__workspace__name")
28+
latest_run_status = ("latestRunStatus", "latest_run_status")
29+
latest_run_started_at = ("latestRunStartedAt", "latest_run_started_at")
30+
latest_run_finished_at = ("latestRunFinishedAt", "latest_run_finished_at")
31+
32+
33+
class MonitoringTaskQueryParameters(CollectionQueryParameters):
34+
order_by: list[MonitoringTaskOrderBy] = Query(
35+
[], description="Select one or more fields to order the response by."
36+
)
37+
thing: list[uuid.UUID] = Query(
38+
[], description="Filter monitoring tasks by thing ID.", alias="thing_id"
39+
)
40+
workspace: list[uuid.UUID] = Query(
41+
[], description="Filter monitoring tasks by workspace ID.", alias="workspace_id"
42+
)
43+
latest_run_status: list[str | Literal["null"]] = Query(
44+
[], description="Filter monitoring tasks by their most recent run status."
45+
)
46+
datastream: list[uuid.UUID] = Query(
47+
[], description="Filter monitoring tasks by datastream ID.", alias="datastream_id"
48+
)
49+
rule_type: list[str] = Query(
50+
[], description="Filter monitoring tasks by rule type."
51+
)
52+
53+
54+
class MonitoringTaskResponse(BaseGetResponse):
55+
id: uuid.UUID
56+
name: str
57+
description: Optional[str] = None
58+
thing: ThingSummaryResponse
59+
schedule: ScheduleResponse | None = None
60+
latest_run: TaskRunResponse | None = None
61+
monitored_datastreams: list[MonitoredDatastreamResponse]
62+
recipients: list[str]
63+
64+
@staticmethod
65+
def resolve_schedule(obj):
66+
pt = obj.periodic_task
67+
if not pt:
68+
return None
69+
ct = pt.crontab
70+
return {
71+
"enabled": pt.enabled,
72+
"start_time": pt.start_time,
73+
"crontab": f"{ct.minute} {ct.hour} {ct.day_of_month} {ct.month_of_year} {ct.day_of_week}" if ct else None,
74+
"interval": pt.interval.every if pt.interval else None,
75+
"interval_period": pt.interval.period if pt.interval else None,
76+
"next_run_at": obj.next_run_at,
77+
}
78+
79+
@staticmethod
80+
def resolve_latest_run(obj):
81+
if not getattr(obj, "latest_run_id", None):
82+
return None
83+
return {
84+
"id": obj.latest_run_id,
85+
"status": obj.latest_run_status,
86+
"started_at": obj.latest_run_started_at,
87+
"finished_at": obj.latest_run_finished_at,
88+
"message": obj.latest_run_message,
89+
"result": obj.latest_run_result,
90+
}
91+
92+
@staticmethod
93+
def resolve_monitored_datastreams(obj):
94+
groups = defaultdict(list)
95+
for rule in obj.rules.all():
96+
groups[rule.datastream_id].append(rule)
97+
return [
98+
{"datastream": rules[0].datastream, "rules": rules}
99+
for rules in groups.values()
100+
]
101+
102+
@staticmethod
103+
def resolve_recipients(obj):
104+
return [r.email for r in obj.recipients.all()]
105+
106+
107+
class MonitoringTaskPostBody(BasePostBody):
108+
uid: uuid.UUID | Unset = Field(Unset, alias="id")
109+
name: str
110+
description: Optional[str] = None
111+
thing_id: uuid.UUID
112+
schedule: SchedulePostBody | None = None
113+
recipients: list[str] = []
114+
115+
116+
class MonitoringTaskPatchBody(BasePatchBody):
117+
name: str
118+
description: Optional[str] = None
119+
schedule: SchedulePatchBody | None = None
120+
recipients: list[str] = []
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import uuid
2+
from typing import Optional
3+
4+
from ninja import Field, Query
5+
6+
from core.types import Unset
7+
from interfaces.api.schemas import (
8+
OrderByField,
9+
BaseGetResponse,
10+
BasePostBody,
11+
BasePatchBody,
12+
CollectionQueryParameters,
13+
ThingSummaryResponse,
14+
)
15+
16+
17+
class ExpressionOrderBy(OrderByField):
18+
id = ("id", "id")
19+
name = ("name", "name")
20+
thing_id = ("thingId", "thing_id")
21+
thing_name = ("thingName", "thing__name")
22+
workspace_id = ("workspaceId", "thing__workspace_id")
23+
workspace_name = ("workspaceName", "thing__workspace__name")
24+
25+
26+
class ExpressionQueryParameters(CollectionQueryParameters):
27+
order_by: list[ExpressionOrderBy] = Query(
28+
[], description="Select one or more fields to order the response by."
29+
)
30+
thing: list[uuid.UUID] = Query(
31+
[], description="Filter expressions by thing ID.", alias="thing_id"
32+
)
33+
workspace: list[uuid.UUID] = Query(
34+
[], description="Filter expressions by workspace ID.", alias="workspace_id"
35+
)
36+
37+
38+
class ExpressionSegmentResponse(BaseGetResponse):
39+
lower_bound: Optional[float] = None
40+
upper_bound: Optional[float] = None
41+
formula: Optional[str] = None
42+
43+
44+
class ExpressionResponse(BaseGetResponse):
45+
id: uuid.UUID
46+
name: str
47+
description: Optional[str] = None
48+
breakpoint_variable: Optional[str] = None
49+
thing: ThingSummaryResponse
50+
segments: list[ExpressionSegmentResponse]
51+
52+
@staticmethod
53+
def resolve_segments(obj):
54+
return obj.segments.all()
55+
56+
57+
class ExpressionSegmentPostBody(BasePostBody):
58+
lower_bound: Optional[float] = None
59+
upper_bound: Optional[float] = None
60+
formula: Optional[str] = None
61+
62+
63+
class ExpressionPostBody(BasePostBody):
64+
uid: uuid.UUID | Unset = Field(Unset, alias="id")
65+
name: str
66+
description: Optional[str] = None
67+
thing_id: uuid.UUID
68+
breakpoint_variable: Optional[str] = None
69+
segments: list[ExpressionSegmentPostBody] = []
70+
71+
72+
class ExpressionPatchBody(BasePatchBody):
73+
name: str
74+
description: Optional[str] = None
75+
breakpoint_variable: Optional[str] = None
76+
segments: list[ExpressionSegmentPostBody] = []

0 commit comments

Comments
 (0)