Skip to content

Commit a5df38b

Browse files
committed
Allow visit definition to work with a single exposure in multi-snap
This is the true incremental mode where we ingest a new file and immediately run visit definition.
1 parent cb536a1 commit a5df38b

File tree

2 files changed

+145
-8
lines changed

2 files changed

+145
-8
lines changed

python/lsst/obs/base/defineVisits.py

Lines changed: 122 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,50 @@ def __init__(self, config: GroupExposuresConfig, **kwargs: Any):
214214
configBaseType=GroupExposuresConfig,
215215
)
216216

217+
@abstractmethod
218+
def find_missing(
219+
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
220+
) -> list[DimensionRecord]:
221+
"""Determine, if possible, which exposures might be missing.
222+
223+
Parameters
224+
----------
225+
exposures : `list` of `lsst.daf.butler.DimensionRecord`
226+
The exposure records to analyze.
227+
registry : `lsst.daf.butler.Registry`
228+
A butler registry that contains these exposure records.
229+
230+
Returns
231+
-------
232+
missing : `list` of `lsst.daf.butler.DimensionRecord`
233+
Any exposure records present in registry that were related to
234+
the given exposures but were missing from that list and deemed
235+
to be relevant.
236+
237+
Notes
238+
-----
239+
Only some grouping schemes are able to find missing exposures. It
240+
is acceptable to return an empty list.
241+
"""
242+
raise NotImplementedError()
243+
244+
@abstractmethod
245+
def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
246+
"""Group the exposures in a way most natural for this visit definition.
247+
248+
Parameters
249+
----------
250+
exposures : `list` of `lsst.daf.butler.DimensionRecord`
251+
The exposure records to group.
252+
253+
Returns
254+
-------
255+
groups : `dict` [Any, `list` of `DimensionRecord`]
256+
Groupings of exposure records. The key type is relevant to the
257+
specific visit definition and could be a string or a tuple.
258+
"""
259+
raise NotImplementedError()
260+
217261
@abstractmethod
218262
def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
219263
"""Group the given exposures into visits.
@@ -676,6 +720,14 @@ def run(
676720
"visit_system",
677721
{"instrument": instrument, "id": visitSystem.value, "name": str(visitSystem)},
678722
)
723+
724+
# In true incremental we will be given the second snap on its
725+
# own on the assumption that the previous snap was already handled.
726+
# For correct grouping we need access to the other exposures in the
727+
# visit.
728+
if incremental:
729+
exposures.extend(self.groupExposures.find_missing(exposures, self.butler.registry))
730+
679731
# Group exposures into visits, delegating to subtask.
680732
self.log.info("Grouping %d exposure(s) into visits.", len(exposures))
681733
definitions = list(self.groupExposures.group(exposures))
@@ -815,6 +867,16 @@ class _GroupExposuresOneToOneTask(GroupExposuresTask, metaclass=ABCMeta):
815867

816868
ConfigClass = _GroupExposuresOneToOneConfig
817869

870+
def find_missing(
871+
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
872+
) -> list[DimensionRecord]:
873+
# By definition no exposures can be missing.
874+
return []
875+
876+
def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
877+
# No grouping.
878+
return {exposure.id: exposure for exposure in exposures}
879+
818880
def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
819881
# Docstring inherited from GroupExposuresTask.
820882
visit_systems = {VisitSystem.from_name("one-to-one")}
@@ -861,12 +923,37 @@ class _GroupExposuresByGroupMetadataTask(GroupExposuresTask, metaclass=ABCMeta):
861923

862924
ConfigClass = _GroupExposuresByGroupMetadataConfig
863925

864-
def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
865-
# Docstring inherited from GroupExposuresTask.
866-
visit_systems = {VisitSystem.from_name("by-group-metadata")}
926+
def find_missing(
927+
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
928+
) -> list[DimensionRecord]:
929+
groups = self.group_exposures(exposures)
930+
missing_exposures: list[DimensionRecord] = []
931+
for exposures_in_group in groups.values():
932+
# We can not tell how many exposures are expected to be in the
933+
# visit so we have to query every time.
934+
first = exposures_in_group[0]
935+
records = set(
936+
registry.queryDimensionRecords(
937+
"exposure",
938+
where="exposure.group_name = group",
939+
bind={"group": first.group_name},
940+
instrument=first.instrument,
941+
)
942+
)
943+
records.difference_update(set(exposures_in_group))
944+
missing_exposures.extend(list(records))
945+
return missing_exposures
946+
947+
def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
867948
groups = defaultdict(list)
868949
for exposure in exposures:
869950
groups[exposure.group_name].append(exposure)
951+
return groups
952+
953+
def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
954+
# Docstring inherited from GroupExposuresTask.
955+
visit_systems = {VisitSystem.from_name("by-group-metadata")}
956+
groups = self.group_exposures(exposures)
870957
for visitName, exposuresInGroup in groups.items():
871958
instrument = exposuresInGroup[0].instrument
872959
visitId = exposuresInGroup[0].group_id
@@ -914,14 +1001,43 @@ class _GroupExposuresByCounterAndExposuresTask(GroupExposuresTask, metaclass=ABC
9141001

9151002
ConfigClass = _GroupExposuresByCounterAndExposuresConfig
9161003

1004+
def find_missing(
1005+
self, exposures: list[DimensionRecord], registry: lsst.daf.butler.Registry
1006+
) -> list[DimensionRecord]:
1007+
"""Analyze the exposures and return relevant exposures known to
1008+
registry.
1009+
"""
1010+
groups = self.group_exposures(exposures)
1011+
missing_exposures: list[DimensionRecord] = []
1012+
for exposures_in_group in groups.values():
1013+
sorted_exposures = sorted(exposures_in_group, key=lambda e: e.seq_num)
1014+
first = sorted_exposures[0]
1015+
if len(sorted_exposures) < first.seq_end - first.seq_start + 1:
1016+
# Missing something. Check registry.
1017+
records = set(
1018+
registry.queryDimensionRecords(
1019+
"exposure",
1020+
where="exposure.seq_start = seq_start AND exposure.seq_end = seq_end",
1021+
bind={"seq_start": first.seq_start, "seq_end": first.seq_end},
1022+
instrument=first.instrument,
1023+
)
1024+
)
1025+
records.difference_update(set(sorted_exposures))
1026+
missing_exposures.extend(list(records))
1027+
return missing_exposures
1028+
1029+
def group_exposures(self, exposures: list[DimensionRecord]) -> dict[Any, list[DimensionRecord]]:
1030+
groups = defaultdict(list)
1031+
for exposure in exposures:
1032+
groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
1033+
return groups
1034+
9171035
def group(self, exposures: List[DimensionRecord]) -> Iterable[VisitDefinitionData]:
9181036
# Docstring inherited from GroupExposuresTask.
9191037
system_one_to_one = VisitSystem.from_name("one-to-one")
9201038
system_seq_start_end = VisitSystem.from_name("by-seq-start-end")
9211039

922-
groups = defaultdict(list)
923-
for exposure in exposures:
924-
groups[exposure.day_obs, exposure.seq_start, exposure.seq_end].append(exposure)
1040+
groups = self.group_exposures(exposures)
9251041
for visit_key, exposures_in_group in groups.items():
9261042
instrument = exposures_in_group[0].instrument
9271043

tests/test_defineVisits.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def define_visits(
8989
) -> None:
9090
for records in exposures:
9191
self.butler.registry.insertDimensionData("exposure", *ensure_iterable(records))
92+
# Include all records so far in definition.
9293
dataIds = [d for d in self.butler.registry.queryDataIds("exposure", instrument="DummyCam")]
9394
self.task.run(dataIds, incremental=incremental)
9495

@@ -97,19 +98,39 @@ def test_defineVisits(self):
9798
self.define_visits([[r for r in self.records.values()]], incremental=False) # list inside a list
9899
self.assertVisits()
99100

100-
def test_incremental(self):
101+
def test_incremental_cumulative(self):
101102
# Define the visits after each exposure.
102103
self.define_visits([exp for exp in self.records.values()], incremental=True)
103104
self.assertVisits()
104105

105-
def test_incremental_reverse(self):
106+
def test_incremental_cumulative_reverse(self):
106107
# In reverse order we should still eventually end up with the right
107108
# answer.
108109
with self.assertLogs("lsst.defineVisits.groupExposures", level="WARNING") as cm:
109110
self.define_visits(list(reversed(self.records.values())), incremental=True)
110111
self.assertIn("Skipping the multi-snap definition", "\n".join(cm.output))
111112
self.assertVisits()
112113

114+
def define_visits_incrementally(self, exposure: DimensionRecord) -> None:
115+
self.butler.registry.insertDimensionData("exposure", exposure)
116+
dataIds = [
117+
d
118+
for d in self.butler.registry.queryDataIds(
119+
"exposure", instrument="DummyCam", exposure=exposure.id
120+
)
121+
]
122+
self.task.run(dataIds, incremental=True)
123+
124+
def test_incremental(self):
125+
for record in self.records.values():
126+
self.define_visits_incrementally(record)
127+
self.assertVisits()
128+
129+
def test_incremental_reverse(self):
130+
for record in reversed(self.records.values()):
131+
self.define_visits_incrementally(record)
132+
self.assertVisits()
133+
113134
def testPickleTask(self):
114135
stream = pickle.dumps(self.task)
115136
copy = pickle.loads(stream)

0 commit comments

Comments
 (0)