From 582b1dec0e2dd4c4cd786e2c5e9a73d1bceb9de9 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 14:42:03 +0100 Subject: [PATCH 01/18] register cci1 and cci2 --- cads_worker/worker.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cads_worker/worker.py b/cads_worker/worker.py index ccbde30..9b5f38d 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -13,8 +13,10 @@ import dask import dask.config import distributed.worker +import fsspec import structlog from distributed import get_worker +from fsspec.implementations.local import LocalFileSystem from . import config, utils @@ -26,6 +28,9 @@ DB_CONNECTION_RETRIES = int(os.getenv("WORKER_DB_CONNECTION_RETRIES", 3)) +fsspec.register_implementation("cci1", LocalFileSystem) +fsspec.register_implementation("cci2", LocalFileSystem) + @functools.lru_cache def create_session_maker() -> cads_broker.database.sa.orm.sessionmaker: From 9c2f418b55dc35e4937198623e22b0f8898b1c5f Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 16:51:27 +0100 Subject: [PATCH 02/18] use pyproject --- cads_worker/worker.py | 5 ----- pyproject.toml | 5 +++++ 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cads_worker/worker.py b/cads_worker/worker.py index 9b5f38d..ccbde30 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -13,10 +13,8 @@ import dask import dask.config import distributed.worker -import fsspec import structlog from distributed import get_worker -from fsspec.implementations.local import LocalFileSystem from . import config, utils @@ -28,9 +26,6 @@ DB_CONNECTION_RETRIES = int(os.getenv("WORKER_DB_CONNECTION_RETRIES", 3)) -fsspec.register_implementation("cci1", LocalFileSystem) -fsspec.register_implementation("cci2", LocalFileSystem) - @functools.lru_cache def create_session_maker() -> cads_broker.database.sa.orm.sessionmaker: diff --git a/pyproject.toml b/pyproject.toml index 68e9946..cd76529 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "cacholote", "cads-adaptors@git+https://github.com/ecmwf-projects/cads-adaptors.git", "distributed", + "fsspec", "structlog", "typer" ] @@ -27,6 +28,10 @@ license = {file = "LICENSE"} name = "cads-worker" readme = "README.md" +[project.entry-points."fsspec.specs"] +cci1 = "fsspec.implementations.local:LocalFileSystem" +cci2 = "fsspec.implementations.local:LocalFileSystem" + [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" expire-cache-entries = "cads_worker.entry_points:expire_cache_entries" From 2a0128d9e4a7787a299e8f6660e60c4f0be34751 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 16:55:05 +0100 Subject: [PATCH 03/18] revert --- pyproject.toml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cd76529..68e9946 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,7 +18,6 @@ dependencies = [ "cacholote", "cads-adaptors@git+https://github.com/ecmwf-projects/cads-adaptors.git", "distributed", - "fsspec", "structlog", "typer" ] @@ -28,10 +27,6 @@ license = {file = "LICENSE"} name = "cads-worker" readme = "README.md" -[project.entry-points."fsspec.specs"] -cci1 = "fsspec.implementations.local:LocalFileSystem" -cci2 = "fsspec.implementations.local:LocalFileSystem" - [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" expire-cache-entries = "cads_worker.entry_points:expire_cache_entries" From 6bf7564c0dfbb350be68124efc7d7763dcd2e3b0 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 17:02:18 +0100 Subject: [PATCH 04/18] add ccis in pyproject --- pyproject.toml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 68e9946..cd76529 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "cacholote", "cads-adaptors@git+https://github.com/ecmwf-projects/cads-adaptors.git", "distributed", + "fsspec", "structlog", "typer" ] @@ -27,6 +28,10 @@ license = {file = "LICENSE"} name = "cads-worker" readme = "README.md" +[project.entry-points."fsspec.specs"] +cci1 = "fsspec.implementations.local:LocalFileSystem" +cci2 = "fsspec.implementations.local:LocalFileSystem" + [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" expire-cache-entries = "cads_worker.entry_points:expire_cache_entries" From 811c21b3cce33b90bf2c13c3a51dadc377552a92 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 19:00:13 +0100 Subject: [PATCH 05/18] add filesystems with protocols --- filesystems.py | 9 +++++++++ pyproject.toml | 4 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 filesystems.py diff --git a/filesystems.py b/filesystems.py new file mode 100644 index 0000000..e3336e4 --- /dev/null +++ b/filesystems.py @@ -0,0 +1,9 @@ +import fsspec.implementations.local + + +class CCI1FileSystem(fsspec.implementations.local.LocalFileSystem): + protocol = "cci1" + + +class CCI2FileSystem(fsspec.implementations.local.LocalFileSystem): + protocol = "cci2" diff --git a/pyproject.toml b/pyproject.toml index cd76529..08a1d3a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,8 +29,8 @@ name = "cads-worker" readme = "README.md" [project.entry-points."fsspec.specs"] -cci1 = "fsspec.implementations.local:LocalFileSystem" -cci2 = "fsspec.implementations.local:LocalFileSystem" +cci1 = "cads_worker.filesystems:CCI1FileSystem" +cci2 = "cads_worker.filesystems:CCI2FileSystem" [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" From cd1855496029bf6515a902b8f03695f00449c099 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 19:11:04 +0100 Subject: [PATCH 06/18] fix filesystem --- pyproject.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 08a1d3a..706338b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,8 +29,8 @@ name = "cads-worker" readme = "README.md" [project.entry-points."fsspec.specs"] -cci1 = "cads_worker.filesystems:CCI1FileSystem" -cci2 = "cads_worker.filesystems:CCI2FileSystem" +cci1 = "filesystems:CCI1FileSystem" +cci2 = "filesystems:CCI2FileSystem" [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" From 58206d053e0224654eaf8799e4bf739e9a5f44d0 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 24 Feb 2026 19:19:58 +0100 Subject: [PATCH 07/18] mv module --- filesystems.py => cads_worker/filesystems.py | 0 pyproject.toml | 4 ++-- 2 files changed, 2 insertions(+), 2 deletions(-) rename filesystems.py => cads_worker/filesystems.py (100%) diff --git a/filesystems.py b/cads_worker/filesystems.py similarity index 100% rename from filesystems.py rename to cads_worker/filesystems.py diff --git a/pyproject.toml b/pyproject.toml index 706338b..08a1d3a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,8 +29,8 @@ name = "cads-worker" readme = "README.md" [project.entry-points."fsspec.specs"] -cci1 = "filesystems:CCI1FileSystem" -cci2 = "filesystems:CCI2FileSystem" +cci1 = "cads_worker.filesystems:CCI1FileSystem" +cci2 = "cads_worker.filesystems:CCI2FileSystem" [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" From 925dc28a8d679640a3cfbc917ee32947856fb7a3 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 25 Feb 2026 09:51:54 +0100 Subject: [PATCH 08/18] fix strip protocol --- cads_worker/filesystems.py | 22 ++++++++++++++++++++-- ci/environment-ci.yml | 1 + pyproject.toml | 1 + tests/test_50_filesystems.py | 10 ++++++++++ 4 files changed, 32 insertions(+), 2 deletions(-) create mode 100644 tests/test_50_filesystems.py diff --git a/cads_worker/filesystems.py b/cads_worker/filesystems.py index e3336e4..2c13f77 100644 --- a/cads_worker/filesystems.py +++ b/cads_worker/filesystems.py @@ -1,9 +1,27 @@ import fsspec.implementations.local +from fsspec.utils import stringify_path -class CCI1FileSystem(fsspec.implementations.local.LocalFileSystem): +class CCIFileSystem(fsspec.implementations.local.LocalFileSystem): + protocol = "cci" + + @classmethod + def _strip_protocol(cls, path): + assert isinstance(cls.protocol, str) + path = stringify_path(path) + if path.startswith(f"{cls.protocol}:"): + path = path.replace(cls.protocol, "file", 1) + return super()._strip_protocol(path) + + def unstrip_protocol(self, name): + assert isinstance(self.protocol, str) + name = self._strip_protocol(name) + return f"{self.protocol}://{name}" + + +class CCI1FileSystem(CCIFileSystem): protocol = "cci1" -class CCI2FileSystem(fsspec.implementations.local.LocalFileSystem): +class CCI2FileSystem(CCIFileSystem): protocol = "cci2" diff --git a/ci/environment-ci.yml b/ci/environment-ci.yml index df23e29..a9248ba 100644 --- a/ci/environment-ci.yml +++ b/ci/environment-ci.yml @@ -18,3 +18,4 @@ dependencies: - pip: - git+https://github.com/ecmwf-projects/cacholote - git+https://github.com/ecmwf-projects/cads-adaptors + - git+https://github.com/ecmwf-projects/cads-broker diff --git a/pyproject.toml b/pyproject.toml index 08a1d3a..9b1771b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ classifiers = [ dependencies = [ "cacholote", "cads-adaptors@git+https://github.com/ecmwf-projects/cads-adaptors.git", + "cads-broker@git+https://github.com/ecmwf-projects/cads-broker.git", "distributed", "fsspec", "structlog", diff --git a/tests/test_50_filesystems.py b/tests/test_50_filesystems.py new file mode 100644 index 0000000..d7179b8 --- /dev/null +++ b/tests/test_50_filesystems.py @@ -0,0 +1,10 @@ +import os + +import fsspec +import pytest + + +@pytest.mark.parametrize("protocol", ["cci1", "cci2"]) +def test_unstrip_protocol(protocol: str) -> None: + fs = fsspec.filesystem(protocol) + assert fs.unstrip_protocol(".") == f"{protocol}://{os.getcwd()}" From 87b44a8b820618968e4575266aff530525444be3 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Wed, 25 Feb 2026 12:57:42 +0100 Subject: [PATCH 09/18] fix cci working dir --- cads_worker/worker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cads_worker/worker.py b/cads_worker/worker.py index ccbde30..0e0e57f 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -13,6 +13,7 @@ import dask import dask.config import distributed.worker +import fsspec.implementations.local import structlog from distributed import get_worker @@ -244,7 +245,11 @@ def submit_workflow( adaptor_class = cads_adaptors.get_adaptor_class(entry_point, setup_code) try: with utils.enter_tmp_working_dir() as working_dir: - base_dir = dirname if "file" in fs.protocol else working_dir + base_dir = ( + dirname + if isinstance(fs, fsspec.implementations.local.LocalFileSystem) + else working_dir + ) with utils.make_cache_tmp_path(base_dir) as cache_tmp_path: adaptor = adaptor_class( form=form, From ef0703454e507f6edbede22c46246a5ceff8269e Mon Sep 17 00:00:00 2001 From: malmans2 Date: Thu, 12 Mar 2026 15:26:09 +0100 Subject: [PATCH 10/18] fix chmod --- cads_worker/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cads_worker/worker.py b/cads_worker/worker.py index 4e54989..158f17a 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -241,6 +241,8 @@ def submit_workflow( tag=collection_id, ) fs, dirname = cacholote.utils.get_cache_files_fs_dirname() + if "s3" in fs.protocol: + cacholote.config.set(io_chmod="public-read") adaptor_class = cads_adaptors.get_adaptor_class(entry_point, setup_code) try: @@ -264,8 +266,6 @@ def submit_workflow( context.error(f"{err.__class__.__name__}: {str(err)}") raise - if "s3" in fs.protocol: - fs.chmod(result.result["args"][0]["file:local_path"], acl="public-read") with context.session_maker() as session: request = cads_broker.database.set_request_cache_id( request_uid=job_id, From d30c80b37b7fd2a253977eeb458c2ed6042ebadb Mon Sep 17 00:00:00 2001 From: malmans2 Date: Thu, 12 Mar 2026 15:36:25 +0100 Subject: [PATCH 11/18] fix chmod when not cached --- cads_worker/worker.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cads_worker/worker.py b/cads_worker/worker.py index 158f17a..e39d32f 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -241,8 +241,6 @@ def submit_workflow( tag=collection_id, ) fs, dirname = cacholote.utils.get_cache_files_fs_dirname() - if "s3" in fs.protocol: - cacholote.config.set(io_chmod="public-read") adaptor_class = cads_adaptors.get_adaptor_class(entry_point, setup_code) try: @@ -266,6 +264,8 @@ def submit_workflow( context.error(f"{err.__class__.__name__}: {str(err)}") raise + if result.counter == 1 and "s3" in fs.protocol: + fs.chmod(result.result["args"][0]["file:local_path"], acl="public-read") with context.session_maker() as session: request = cads_broker.database.set_request_cache_id( request_uid=job_id, From 57afae3a435f1b012d68714cb52111b3a6ece569 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Mon, 16 Mar 2026 13:08:46 +0100 Subject: [PATCH 12/18] get volumes config from yaml --- cads_worker/entry_points.py | 29 ++++++++++++++--------------- cads_worker/models.py | 23 +++++++++++++++++++++++ cads_worker/utils.py | 17 ++++++++++------- cads_worker/worker.py | 4 ++-- ci/environment-ci.yml | 1 + environment.yml | 2 ++ pyproject.toml | 9 ++++++++- tests/test_10_cache_cleaner.py | 4 ++-- tests/test_30_utils.py | 22 +++++++++++++--------- 9 files changed, 75 insertions(+), 36 deletions(-) create mode 100644 cads_worker/models.py diff --git a/cads_worker/entry_points.py b/cads_worker/entry_points.py index f216ed6..67d23be 100644 --- a/cads_worker/entry_points.py +++ b/cads_worker/entry_points.py @@ -35,22 +35,21 @@ class CleanerKwargs(TypedDict): def _cache_cleaner() -> None: use_database = strtobool(os.environ.get("USE_DATABASE", "1")) - cleaner_kwargs = CleanerKwargs( - maxsize=int(os.environ.get("MAX_SIZE", 1_000_000_000)), - method=os.environ.get("METHOD", "LRU"), - delete_unknown_files=not use_database, - lock_validity_period=float(os.environ.get("LOCK_VALIDITY_PERIOD", 86400)), - use_database=use_database, - depth=int(os.getenv("CACHE_DEPTH", 2)), - batch_size=int(os.getenv("BATCH_SIZE", 0)) or None, - batch_delay=float(os.getenv("BATCH_DELAY", 0)), - ) - for cache_files_urlpath in utils.parse_data_volumes_config(): - cacholote.config.set(cache_files_urlpath=cache_files_urlpath) + data_volumes = utils.parse_data_volumes_config().volumes + for volume, volume_config in data_volumes.items(): + cleaner_kwargs = CleanerKwargs( + maxsize=volume_config.max_size, + method=os.environ.get("METHOD", "LRU"), + delete_unknown_files=not use_database, + lock_validity_period=float(os.environ.get("LOCK_VALIDITY_PERIOD", 86400)), + use_database=use_database, + depth=int(os.getenv("CACHE_DEPTH", 2)), + batch_size=int(os.getenv("BATCH_SIZE", 0)) or None, + batch_delay=float(os.getenv("BATCH_DELAY", 0)), + ) + cacholote.config.set(cache_files_urlpath=volume) LOGGER.info( - "Running cache cleaner", - cache_files_urlpath=cache_files_urlpath, - **cleaner_kwargs, + "Running cache cleaner", cache_files_urlpath=volume, **cleaner_kwargs ) try: cacholote.clean_cache_files(**cleaner_kwargs) diff --git a/cads_worker/models.py b/cads_worker/models.py new file mode 100644 index 0000000..d7a95f6 --- /dev/null +++ b/cads_worker/models.py @@ -0,0 +1,23 @@ +import os +import random + +from pydantic import BaseModel, Field, NonNegativeInt + + +def get_env_max_size() -> int: + return int(os.getenv("MAX_SIZE", "1_000_000_000")) + + +class DataVolumeConfig(BaseModel): + weight: NonNegativeInt = 1 + max_size: NonNegativeInt = Field(default_factory=get_env_max_size) + + +class DataVolumes(BaseModel): + volumes: dict[str, DataVolumeConfig] + + def get_random_volume(self) -> str: + choices = [] + for volume, config in self.volumes.items(): + choices.extend([volume] * config.weight) + return random.choice(choices) diff --git a/cads_worker/utils.py b/cads_worker/utils.py index 868f4fc..5ee00b3 100644 --- a/cads_worker/utils.py +++ b/cads_worker/utils.py @@ -4,17 +4,20 @@ import tempfile from collections.abc import Iterator +import yaml -def parse_data_volumes_config(path: str | None = None) -> list[str]: +from .models import DataVolumeConfig, DataVolumes + + +def parse_data_volumes_config(path: str | None = None) -> DataVolumes: if path is None: path = os.environ["DATA_VOLUMES_CONFIG"] - data_volumes = [] - with open(path) as fp: - for line in fp: - if data_volume := os.path.expandvars(line.rstrip("\n")): - data_volumes.append(data_volume) - return data_volumes + with open(path) as f: + raw_dict = yaml.safe_load(f) + return DataVolumes( + volumes={k: DataVolumeConfig(**(v or {})) for k, v in raw_dict.items()} + ) @contextlib.contextmanager diff --git a/cads_worker/worker.py b/cads_worker/worker.py index e39d32f..7f8b6a4 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -2,7 +2,6 @@ import functools import logging import os -import random import socket import time from typing import Any @@ -221,7 +220,8 @@ def submit_workflow( structlog.contextvars.bind_contextvars(event_type="DATASET_COMPUTE", job_id=job_id) - cache_files_urlpath = random.choice(utils.parse_data_volumes_config()) + data_volumes = utils.parse_data_volumes_config() + cache_files_urlpath = data_volumes.get_random_volume() depth = int(os.getenv("CACHE_DEPTH", 1)) if depth == 2: cache_files_urlpath = os.path.join( diff --git a/ci/environment-ci.yml b/ci/environment-ci.yml index a9248ba..642456b 100644 --- a/ci/environment-ci.yml +++ b/ci/environment-ci.yml @@ -13,6 +13,7 @@ dependencies: - pytest-cov - sphinx - sphinx-autoapi +- types-PyYAML # DO NOT EDIT ABOVE THIS LINE, ADD DEPENDENCIES BELOW - pip - pip: diff --git a/environment.yml b/environment.yml index 42529b4..4cbf9ae 100644 --- a/environment.yml +++ b/environment.yml @@ -10,6 +10,8 @@ channels: dependencies: - distributed - psycopg2 +- pydantic +- pyyaml # See: https://github.com/fsspec/s3fs/pull/910 - s3fs!=2024.10.0 - s3fs>=2023.12.2 diff --git a/pyproject.toml b/pyproject.toml index 9b1771b..e2748f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,8 @@ dependencies = [ "cads-broker@git+https://github.com/ecmwf-projects/cads-broker.git", "distributed", "fsspec", + "pydantic", + "pyyaml", "structlog", "typer" ] @@ -46,9 +48,14 @@ strict = true [[tool.mypy.overrides]] ignore_missing_imports = true module = [ - "cads_adaptors.*" + "cads_adaptors.*", + "fsspec.*" ] +[[tool.mypy.overrides]] +ignore_errors = true +module = "cads_worker.filesystems" + [tool.ruff] # Same as Black. indent-width = 4 diff --git a/tests/test_10_cache_cleaner.py b/tests/test_10_cache_cleaner.py index 0ad8880..edf6e08 100644 --- a/tests/test_10_cache_cleaner.py +++ b/tests/test_10_cache_cleaner.py @@ -26,8 +26,8 @@ def test_cache_cleaner( assert cached_path.exists() # create data nodes config - data_volumes_config = tmp_path / "data-volumes.config" - data_volumes_config.write_text(cache_files_urlpath) + data_volumes_config = tmp_path / "data-volumes.yaml" + data_volumes_config.write_text(f"{cache_files_urlpath}:") monkeypatch.setenv("DATA_VOLUMES_CONFIG", str(data_volumes_config)) # clean cache diff --git a/tests/test_30_utils.py b/tests/test_30_utils.py index 13f2ce6..69aaf16 100644 --- a/tests/test_30_utils.py +++ b/tests/test_30_utils.py @@ -11,15 +11,19 @@ def test_utils_parse_data_volumes_config( tmp_path: pathlib.Path, monkeypatch: pytest.MonkeyPatch, ) -> None: - monkeypatch.setenv("FOO", "foo") - monkeypatch.setenv("BAR", "bar") - monkeypatch.setenv("BAZ", "") - data_volumes_config = tmp_path / "data-volumes.config" - data_volumes_config.write_text("\n\n$FOO\n\n${BAR}\n\n$BAZ\n\n") - assert utils.parse_data_volumes_config(str(data_volumes_config)) == ["foo", "bar"] - - monkeypatch.setenv("DATA_VOLUMES_CONFIG", str(data_volumes_config)) - assert utils.parse_data_volumes_config(None) == ["foo", "bar"] + monkeypatch.setenv("MAX_SIZE", "10") + data_volumes_config = tmp_path / "data-volumes.yaml" + data_volumes_config.write_text("\nfoo:\nbar:\n weight: 0\n max_size: 20\n") + + volumes = utils.parse_data_volumes_config(str(data_volumes_config)) + assert volumes.model_dump() == { + "volumes": { + "foo": {"weight": 1, "max_size": 10}, + "bar": {"weight": 0, "max_size": 20}, + } + } + + assert volumes.get_random_volume() == "foo" def test_utils_enter_tmp_working_dir() -> None: From 238b1954b9da046860dbda0876c10404ce06a85f Mon Sep 17 00:00:00 2001 From: malmans2 Date: Mon, 16 Mar 2026 13:27:23 +0100 Subject: [PATCH 13/18] implement from_yaml --- cads_worker/entry_points.py | 12 ++++++---- cads_worker/models.py | 13 ++++++++++ cads_worker/utils.py | 15 ------------ cads_worker/worker.py | 6 ++--- tests/test_01_models.py | 24 +++++++++++++++++++ ..._filesystems.py => test_02_filesystems.py} | 0 tests/test_30_utils.py | 21 ---------------- 7 files changed, 47 insertions(+), 44 deletions(-) create mode 100644 tests/test_01_models.py rename tests/{test_50_filesystems.py => test_02_filesystems.py} (100%) diff --git a/cads_worker/entry_points.py b/cads_worker/entry_points.py index 67d23be..bf829a6 100644 --- a/cads_worker/entry_points.py +++ b/cads_worker/entry_points.py @@ -7,7 +7,7 @@ import typer from typer import Option -from . import config, utils +from . import config, models config.configure_logger() LOGGER = structlog.get_logger(__name__) @@ -35,8 +35,8 @@ class CleanerKwargs(TypedDict): def _cache_cleaner() -> None: use_database = strtobool(os.environ.get("USE_DATABASE", "1")) - data_volumes = utils.parse_data_volumes_config().volumes - for volume, volume_config in data_volumes.items(): + volumes = models.DataVolumes.from_yaml().volumes + for cache_files_urlpath, volume_config in volumes.items(): cleaner_kwargs = CleanerKwargs( maxsize=volume_config.max_size, method=os.environ.get("METHOD", "LRU"), @@ -47,9 +47,11 @@ def _cache_cleaner() -> None: batch_size=int(os.getenv("BATCH_SIZE", 0)) or None, batch_delay=float(os.getenv("BATCH_DELAY", 0)), ) - cacholote.config.set(cache_files_urlpath=volume) + cacholote.config.set(cache_files_urlpath=cache_files_urlpath) LOGGER.info( - "Running cache cleaner", cache_files_urlpath=volume, **cleaner_kwargs + "Running cache cleaner", + cache_files_urlpath=cache_files_urlpath, + **cleaner_kwargs, ) try: cacholote.clean_cache_files(**cleaner_kwargs) diff --git a/cads_worker/models.py b/cads_worker/models.py index d7a95f6..5f34efd 100644 --- a/cads_worker/models.py +++ b/cads_worker/models.py @@ -1,6 +1,8 @@ import os import random +from typing import Self +import yaml from pydantic import BaseModel, Field, NonNegativeInt @@ -21,3 +23,14 @@ def get_random_volume(self) -> str: for volume, config in self.volumes.items(): choices.extend([volume] * config.weight) return random.choice(choices) + + @classmethod + def from_yaml(cls, path: str | None = None) -> Self: + if path is None: + path = os.environ["DATA_VOLUMES_CONFIG"] + + with open(path) as f: + raw_dict = yaml.safe_load(f) + return cls( + volumes={k: DataVolumeConfig(**(v or {})) for k, v in raw_dict.items()} + ) diff --git a/cads_worker/utils.py b/cads_worker/utils.py index 5ee00b3..8f17e3f 100644 --- a/cads_worker/utils.py +++ b/cads_worker/utils.py @@ -4,21 +4,6 @@ import tempfile from collections.abc import Iterator -import yaml - -from .models import DataVolumeConfig, DataVolumes - - -def parse_data_volumes_config(path: str | None = None) -> DataVolumes: - if path is None: - path = os.environ["DATA_VOLUMES_CONFIG"] - - with open(path) as f: - raw_dict = yaml.safe_load(f) - return DataVolumes( - volumes={k: DataVolumeConfig(**(v or {})) for k, v in raw_dict.items()} - ) - @contextlib.contextmanager def enter_tmp_working_dir() -> Iterator[str]: diff --git a/cads_worker/worker.py b/cads_worker/worker.py index 7f8b6a4..071d796 100644 --- a/cads_worker/worker.py +++ b/cads_worker/worker.py @@ -16,7 +16,7 @@ import structlog from distributed import get_worker -from . import config, utils +from . import config, models, utils config.configure_logger(os.getenv("WORKER_LOG_LEVEL", "NOT_SET").upper()) @@ -220,8 +220,8 @@ def submit_workflow( structlog.contextvars.bind_contextvars(event_type="DATASET_COMPUTE", job_id=job_id) - data_volumes = utils.parse_data_volumes_config() - cache_files_urlpath = data_volumes.get_random_volume() + volumes = models.DataVolumes.from_yaml() + cache_files_urlpath = volumes.get_random_volume() depth = int(os.getenv("CACHE_DEPTH", 1)) if depth == 2: cache_files_urlpath = os.path.join( diff --git a/tests/test_01_models.py b/tests/test_01_models.py new file mode 100644 index 0000000..9e8a577 --- /dev/null +++ b/tests/test_01_models.py @@ -0,0 +1,24 @@ +import pathlib + +import pytest + +from cads_worker.models import DataVolumes + + +def test_data_volumes_from_yaml( + tmp_path: pathlib.Path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("MAX_SIZE", "10") + data_volumes_config = tmp_path / "data-volumes.yaml" + data_volumes_config.write_text("\nfoo:\nbar:\n weight: 0\n max_size: 20\n") + + volumes = DataVolumes.from_yaml(str(data_volumes_config)) + assert volumes.model_dump() == { + "volumes": { + "foo": {"weight": 1, "max_size": 10}, + "bar": {"weight": 0, "max_size": 20}, + } + } + + assert volumes.get_random_volume() == "foo" diff --git a/tests/test_50_filesystems.py b/tests/test_02_filesystems.py similarity index 100% rename from tests/test_50_filesystems.py rename to tests/test_02_filesystems.py diff --git a/tests/test_30_utils.py b/tests/test_30_utils.py index 69aaf16..aecf079 100644 --- a/tests/test_30_utils.py +++ b/tests/test_30_utils.py @@ -2,30 +2,9 @@ import pathlib import tempfile -import pytest - from cads_worker import utils -def test_utils_parse_data_volumes_config( - tmp_path: pathlib.Path, - monkeypatch: pytest.MonkeyPatch, -) -> None: - monkeypatch.setenv("MAX_SIZE", "10") - data_volumes_config = tmp_path / "data-volumes.yaml" - data_volumes_config.write_text("\nfoo:\nbar:\n weight: 0\n max_size: 20\n") - - volumes = utils.parse_data_volumes_config(str(data_volumes_config)) - assert volumes.model_dump() == { - "volumes": { - "foo": {"weight": 1, "max_size": 10}, - "bar": {"weight": 0, "max_size": 20}, - } - } - - assert volumes.get_random_volume() == "foo" - - def test_utils_enter_tmp_working_dir() -> None: with utils.enter_tmp_working_dir() as tmp_working_dir: assert os.getcwd() == tmp_working_dir From aa46b6f6cffab1cd7b97bc883a02eb657f3f3299 Mon Sep 17 00:00:00 2001 From: malmans2 Date: Tue, 17 Mar 2026 11:17:28 +0100 Subject: [PATCH 14/18] cleanup --- cads_worker/models.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/cads_worker/models.py b/cads_worker/models.py index 5f34efd..c2d0873 100644 --- a/cads_worker/models.py +++ b/cads_worker/models.py @@ -3,7 +3,7 @@ from typing import Self import yaml -from pydantic import BaseModel, Field, NonNegativeInt +from pydantic import BaseModel, Field, NonNegativeFloat, NonNegativeInt def get_env_max_size() -> int: @@ -11,7 +11,7 @@ def get_env_max_size() -> int: class DataVolumeConfig(BaseModel): - weight: NonNegativeInt = 1 + weight: NonNegativeFloat = 1 max_size: NonNegativeInt = Field(default_factory=get_env_max_size) @@ -19,10 +19,12 @@ class DataVolumes(BaseModel): volumes: dict[str, DataVolumeConfig] def get_random_volume(self) -> str: - choices = [] - for volume, config in self.volumes.items(): - choices.extend([volume] * config.weight) - return random.choice(choices) + (volume,) = random.choices( + list(self.volumes), + weights=[config.weight for config in self.volumes.values()], + k=1, + ) + return volume @classmethod def from_yaml(cls, path: str | None = None) -> Self: @@ -32,5 +34,8 @@ def from_yaml(cls, path: str | None = None) -> Self: with open(path) as f: raw_dict = yaml.safe_load(f) return cls( - volumes={k: DataVolumeConfig(**(v or {})) for k, v in raw_dict.items()} + volumes={ + k: DataVolumeConfig(**v) if v else DataVolumeConfig() + for k, v in raw_dict.items() + } ) From 6c6f563790f7f1f49615b3d1de8f220faa10c727 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 19 Mar 2026 11:34:04 +0100 Subject: [PATCH 15/18] add init_buckets cli --- cads_worker/entry_points.py | 19 +++++++++++++++++++ pyproject.toml | 1 + 2 files changed, 20 insertions(+) diff --git a/cads_worker/entry_points.py b/cads_worker/entry_points.py index bf829a6..e6ac4fd 100644 --- a/cads_worker/entry_points.py +++ b/cads_worker/entry_points.py @@ -5,6 +5,7 @@ import cacholote import structlog import typer +from cads_broker import object_storage from typer import Option from . import config, models @@ -116,9 +117,27 @@ def _expire_cache_entries( return count +def _init_buckets() -> None: + object_storage_url = os.environ["OBJECT_STORAGE_URL"] + storage_kws: dict[str, str] = { + "aws_access_key_id": os.environ["STORAGE_ADMIN"], + "aws_secret_access_key": os.environ["STORAGE_PASSWORD"], + } + data_volumes = models.DataVolumes.from_yaml().volumes + for data_volume in data_volumes: + if data_volume.startswith("s3://"): + object_storage.create_data_volume( + data_volume, object_storage_url, **storage_kws + ) + + def cache_cleaner() -> None: typer.run(_cache_cleaner) def expire_cache_entries() -> None: typer.run(_expire_cache_entries) + + +def init_buckets() -> None: + typer.run(_init_buckets) diff --git a/pyproject.toml b/pyproject.toml index e2748f2..619fbc5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ cci2 = "cads_worker.filesystems:CCI2FileSystem" [project.scripts] cache-cleaner = "cads_worker.entry_points:cache_cleaner" expire-cache-entries = "cads_worker.entry_points:expire_cache_entries" +init-buckets = "cads_worker.entry_points:init_buckets" [tool.coverage.run] branch = true From 219b72ba7b69ae892ff5152e84b00efcfccb7c7f Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 19 Mar 2026 12:41:26 +0100 Subject: [PATCH 16/18] fix attributeerror --- cads_worker/entry_points.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cads_worker/entry_points.py b/cads_worker/entry_points.py index e6ac4fd..7028db0 100644 --- a/cads_worker/entry_points.py +++ b/cads_worker/entry_points.py @@ -3,9 +3,9 @@ from typing import Annotated, TypedDict import cacholote +import cads_broker.object_storage import structlog import typer -from cads_broker import object_storage from typer import Option from . import config, models @@ -126,7 +126,7 @@ def _init_buckets() -> None: data_volumes = models.DataVolumes.from_yaml().volumes for data_volume in data_volumes: if data_volume.startswith("s3://"): - object_storage.create_data_volume( + cads_broker.object_storage.create_data_volume( data_volume, object_storage_url, **storage_kws ) From ee8144b2aa669e0b2d28100029b5420c910da262 Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 19 Mar 2026 12:43:00 +0100 Subject: [PATCH 17/18] fix init_buckets cli --- cads_worker/entry_points.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cads_worker/entry_points.py b/cads_worker/entry_points.py index 7028db0..7bdb9fa 100644 --- a/cads_worker/entry_points.py +++ b/cads_worker/entry_points.py @@ -126,7 +126,7 @@ def _init_buckets() -> None: data_volumes = models.DataVolumes.from_yaml().volumes for data_volume in data_volumes: if data_volume.startswith("s3://"): - cads_broker.object_storage.create_data_volume( + cads_broker.object_storage.create_download_bucket( data_volume, object_storage_url, **storage_kws ) From 8f8af00a2a360bbba8d61a37af635d5fe34f693d Mon Sep 17 00:00:00 2001 From: Francesco Nazzaro Date: Thu, 19 Mar 2026 12:49:24 +0100 Subject: [PATCH 18/18] add logs --- cads_worker/entry_points.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/cads_worker/entry_points.py b/cads_worker/entry_points.py index 7bdb9fa..2e842c1 100644 --- a/cads_worker/entry_points.py +++ b/cads_worker/entry_points.py @@ -123,12 +123,15 @@ def _init_buckets() -> None: "aws_access_key_id": os.environ["STORAGE_ADMIN"], "aws_secret_access_key": os.environ["STORAGE_PASSWORD"], } + LOGGER.info("Initializing buckets", object_storage_url=object_storage_url) data_volumes = models.DataVolumes.from_yaml().volumes for data_volume in data_volumes: if data_volume.startswith("s3://"): + LOGGER.info("Initializing bucket", data_volume=data_volume) cads_broker.object_storage.create_download_bucket( data_volume, object_storage_url, **storage_kws ) + LOGGER.info("Buckets initialized") def cache_cleaner() -> None: