Skip to content

Commit 20a42d2

Browse files
authored
Merge pull request #61 from bsc-dom/cyclops
isolating _dc_meta type (a Pydantic BaseModel) from serialization/des…
2 parents a569d22 + f06ae97 commit 20a42d2

File tree

4 files changed

+38
-17
lines changed

4 files changed

+38
-17
lines changed

src/dataclay/backend/api.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from threadpoolctl import threadpool_limits
1111

1212
from dataclay import utils
13-
from dataclay.config import set_runtime, settings
13+
from dataclay.config import LEGACY_DEPS, set_runtime, settings
1414
from dataclay.event_loop import dc_to_thread_io
1515
from dataclay.exceptions import (
1616
DataClayException,
@@ -19,6 +19,7 @@
1919
ObjectWithWrongBackendIdError,
2020
)
2121
from dataclay.lock_manager import lock_manager
22+
from ..metadata.kvdata import ObjectMetadata
2223
from dataclay.runtime import BackendRuntime
2324
from dataclay.utils.serialization import dcdumps, dcloads, recursive_dcloads
2425
from dataclay.utils.telemetry import trace
@@ -72,22 +73,32 @@ async def is_ready(self, timeout: Optional[float] = None, pause: float = 0.5):
7273
async def register_objects(self, serialized_objects: Iterable[bytes], make_replica: bool):
7374
logger.debug("Receiving (%d) objects to register", len(serialized_objects))
7475
for object_bytes in serialized_objects:
75-
state, getstate = await dcloads(object_bytes)
76-
instance = await self.runtime.get_object_by_id(state["_dc_meta"].id)
76+
metadata_dict, dc_properties, getstate = await dcloads(object_bytes)
77+
78+
if LEGACY_DEPS:
79+
dc_meta = ObjectMetadata.parse_obj(metadata_dict)
80+
else:
81+
dc_meta = ObjectMetadata.model_validate(metadata_dict)
82+
83+
instance = await self.runtime.get_object_by_id(dc_meta.id)
7784

7885
if instance._dc_is_local:
7986
assert instance._dc_is_replica
8087
if make_replica:
8188
logger.warning("Replica already exists with id=%s", instance._dc_meta.id)
8289
continue
8390

91+
state = {"_dc_meta": dc_meta}
92+
8493
async with lock_manager.get_lock(instance._dc_meta.id).writer_lock:
8594
# Update object state and flags
8695
state["_dc_is_loaded"] = True
8796
state["_dc_is_local"] = True
8897
vars(instance).update(state)
8998
if getstate:
9099
instance.__setstate__(getstate)
100+
else:
101+
vars(instance).update(dc_properties)
91102
self.runtime.data_manager.add_hard_reference(instance)
92103

93104
if make_replica:

src/dataclay/data_manager.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,20 +141,18 @@ async def load_object(self, instance: DataClayObject):
141141
path = f"{settings.storage_path}/{object_id}"
142142
# TODO: Is it necessary dc_to_thread_cpu? Should be blocking
143143
# to avoid bugs with parallel loads?
144-
state, getstate = await dc_to_thread_cpu(pickle.load, open(path, "rb"))
144+
metadata_dict, dc_properties, getstate = await dc_to_thread_cpu(pickle.load, open(path, "rb"))
145145
self.dataclay_stored_objects.dec()
146146
except Exception as e:
147147
raise ObjectNotFound(object_id) from e
148148

149-
# Delete outdated metadata (SSOT stored in Redis)
150-
del state["_dc_meta"]
151-
vars(instance).update(state)
152-
153149
# NOTE: We need to set _dc_is_loaded before calling __setstate__
154150
# to avoid infinite recursion
155151
instance._dc_is_loaded = True
156152
if getstate is not None:
157153
instance.__setstate__(getstate)
154+
else:
155+
vars(instance).update(dc_properties)
158156

159157
self.add_hard_reference(instance)
160158
logger.debug("(%s) Loaded '%s'", object_id, instance.__class__.__name__)

src/dataclay/dataclay_object.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -313,12 +313,16 @@ def _dc_properties(self) -> dict[str, Any]:
313313
@property
314314
def _dc_state(self) -> tuple[dict, Any]:
315315
"""Returns the object state"""
316-
state = {"_dc_meta": self._dc_meta}
316+
317+
if LEGACY_DEPS:
318+
metadata_dict = self._dc_meta.dict()
319+
else:
320+
metadata_dict = self._dc_meta.model_dump()
321+
317322
if hasattr(self, "__getstate__") and hasattr(self, "__setstate__"):
318-
return state, self.__getstate__()
323+
return metadata_dict, None, self.__getstate__()
319324
else:
320-
state.update(self._dc_properties)
321-
return state, None
325+
return metadata_dict, self._dc_properties, None
322326

323327
@property
324328
def _dc_all_backend_ids(self) -> set[UUID]:

src/dataclay/utils/serialization.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010
from uuid import UUID
1111

1212
from dataclay import utils
13-
from dataclay.config import get_runtime
13+
from dataclay.config import LEGACY_DEPS, get_runtime
1414
from dataclay.dataclay_object import DataClayObject
1515
from dataclay.event_loop import dc_to_thread_cpu, get_dc_event_loop
16+
from dataclay.metadata.kvdata import ObjectMetadata
1617

1718
logger = logging.getLogger(__name__)
1819

@@ -151,22 +152,29 @@ async def recursive_dcloads(object_binary, unserialized_objects: dict[UUID, Data
151152
unserialized_objects = {}
152153

153154
# Use dc_to_thread_cpu to avoid blocking the event loop in `get_by_id_sync`
154-
object_dict, state = await dc_to_thread_cpu(
155+
object_metadata_dict, dc_properties, state = await dc_to_thread_cpu(
155156
RecursiveDataClayObjectUnpickler(io.BytesIO(object_binary), unserialized_objects).load
156157
)
157158

158-
object_id = object_dict["_dc_meta"].id
159+
if LEGACY_DEPS:
160+
dc_meta = ObjectMetadata.parse_obj(object_metadata_dict)
161+
else:
162+
dc_meta = ObjectMetadata.model_validate(object_metadata_dict)
163+
164+
object_id = dc_meta.id
159165
try:
160166
# In case it was already unserialized by a reference
161167
proxy_object = unserialized_objects[object_id]
162168
except KeyError:
163-
cls: type[DataClayObject] = utils.get_class_by_name(object_dict["_dc_meta"].class_name)
169+
cls: type[DataClayObject] = utils.get_class_by_name(dc_meta.class_name)
164170
proxy_object = cls.new_proxy_object()
165171
unserialized_objects[object_id] = proxy_object
166172

167-
vars(proxy_object).update(object_dict)
173+
vars(proxy_object)["_dc_meta"] = dc_meta
168174
if state is not None:
169175
proxy_object.__setstate__(state)
176+
else:
177+
vars(proxy_object).update(dc_properties)
170178
return proxy_object
171179

172180

0 commit comments

Comments
 (0)