diff --git a/clarifai/runners/pipelines/codegen.py b/clarifai/runners/pipelines/codegen.py index 8277cf72..8d699e2c 100644 --- a/clarifai/runners/pipelines/codegen.py +++ b/clarifai/runners/pipelines/codegen.py @@ -1,6 +1,7 @@ import ast import inspect import os +import shutil import subprocess import textwrap from typing import List, Sequence, Set @@ -37,6 +38,8 @@ def _get_node_source(source_lines: Sequence[str], node: ast.AST) -> str: } ) +_RESERVED_STEP_ASSET_NAMES = frozenset({'pipeline_step.py'}) + def _is_dsl_only_import(node: ast.AST) -> bool: """Return True if an import node pulls exclusively from DSL-only modules.""" @@ -149,7 +152,62 @@ def _build_step_script(step_definition) -> str: ) +def _resolve_step_assets(step_definition) -> list: + """Resolve and validate asset paths for a step. + + Returns a list of (normalized_path, asset_name) tuples. + Raises ValueError if any asset path does not exist, basenames are duplicated, + or a basename collides with a generated file. + """ + source_file = inspect.getsourcefile(step_definition.func) + if source_file is None: + raise ValueError(f'Could not determine source file for step {step_definition.id}') + + source_dir = os.path.dirname(os.path.abspath(source_file)) + resolved = [] + seen_names = set() + assets = step_definition.assets + if isinstance(assets, (str, os.PathLike)): + assets = [assets] + + for asset in assets: + if not str(asset).strip(): + continue + raw_path = str(asset).strip() + asset_path = raw_path if os.path.isabs(raw_path) else os.path.join(source_dir, raw_path) + normalized_asset_path = os.path.normpath(asset_path) + + if not os.path.exists(normalized_asset_path): + raise ValueError(f"Step '{step_definition.id}' asset path does not exist: {raw_path}") + + asset_name = os.path.basename(normalized_asset_path) + if asset_name in _RESERVED_STEP_ASSET_NAMES: + raise ValueError( + f"Step '{step_definition.id}' asset basename is reserved: {asset_name}" + ) + if asset_name in seen_names: + raise ValueError( + f"Step '{step_definition.id}' has duplicate asset basename: {asset_name}" + ) + seen_names.add(asset_name) + resolved.append((normalized_asset_path, asset_name)) + + return resolved + + +def _copy_step_assets(resolved: list, version_dir: str) -> None: + for normalized_asset_path, asset_name in resolved: + destination = os.path.join(version_dir, asset_name) + if os.path.isdir(normalized_asset_path): + shutil.copytree(normalized_asset_path, destination, dirs_exist_ok=True) + else: + shutil.copy2(normalized_asset_path, destination) + + def generate_step_directory(step_definition, output_dir: str, user_id: str, app_id: str) -> str: + # Validate assets early so we get a clear error before doing any file I/O. + resolved_assets = _resolve_step_assets(step_definition) + step_dir = os.path.join(output_dir, step_definition.id) version_dir = os.path.join(step_dir, '1') os.makedirs(version_dir, exist_ok=True) @@ -177,6 +235,8 @@ def generate_step_directory(step_definition, output_dir: str, user_id: str, app_ with open(step_script_path, 'w', encoding='utf-8') as handle: handle.write(_build_step_script(step_definition)) + _copy_step_assets(resolved_assets, version_dir) + _format_file(step_script_path) return step_dir diff --git a/clarifai/runners/pipelines/step.py b/clarifai/runners/pipelines/step.py index cb102fee..45130045 100644 --- a/clarifai/runners/pipelines/step.py +++ b/clarifai/runners/pipelines/step.py @@ -95,6 +95,7 @@ def __init__( *, id: Optional[str] = None, requirements=None, + assets=None, compute: Optional[ComputeInfo] = None, python_version: str = '3.12', secrets: Optional[Dict[str, str]] = None, @@ -102,6 +103,7 @@ def __init__( self.func = func self.id = id or func.__name__.replace('_', '-') self.requirements = requirements or [] + self.assets = assets or [] self.compute = compute or ComputeInfo() self.python_version = python_version self.secrets = secrets or {} @@ -210,6 +212,7 @@ def step( *, id: Optional[str] = None, requirements=None, + assets=None, compute: Optional[ComputeInfo] = None, python_version: str = '3.12', secrets: Optional[Dict[str, str]] = None, @@ -219,6 +222,7 @@ def decorator(func: Callable[..., Any]) -> StepDefinition: func, id=id, requirements=requirements, + assets=assets, compute=compute, python_version=python_version, secrets=secrets, diff --git a/examples/pipeline_dsl_text_pipeline.py b/examples/pipeline_dsl_text_pipeline.py index 164196b4..e74248a7 100644 --- a/examples/pipeline_dsl_text_pipeline.py +++ b/examples/pipeline_dsl_text_pipeline.py @@ -33,18 +33,29 @@ def normalize_text(value: str) -> str: """Small helper intentionally kept outside the step for codegen extraction.""" - return " ".join(value.strip().split()) + import importlib.util + from pathlib import Path + + module_path = Path(__file__).with_name("text_utils.py") + spec = importlib.util.spec_from_file_location("text_utils", module_path) + if spec is None or spec.loader is None: + raise RuntimeError(f"Could not load helper module from {module_path}") + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module.clean_text(value) @step( id="prepare-text", requirements=["transformers>=4.0"], + assets=["./text_utils.py"], compute=ComputeInfo(cpu_limit="500m", cpu_memory="500Mi"), ) def prepare_text(input_text: str) -> str: """Normalize text before downstream processing.""" cleaned = normalize_text(input_text) - return cleaned.lower() + return cleaned summarize = step_ref.from_url( diff --git a/examples/text_utils.py b/examples/text_utils.py new file mode 100644 index 00000000..09fd8897 --- /dev/null +++ b/examples/text_utils.py @@ -0,0 +1,2 @@ +def clean_text(value: str) -> str: + return ' '.join(value.strip().split()).lower() diff --git a/tests/cli/test_pipeline_dsl_cli.py b/tests/cli/test_pipeline_dsl_cli.py index aa95f0ae..3d438b12 100644 --- a/tests/cli/test_pipeline_dsl_cli.py +++ b/tests/cli/test_pipeline_dsl_cli.py @@ -65,5 +65,6 @@ def test_generate_real_example_pipeline_writes_mixed_step_config(tmp_path: Path) assert config['pipeline']['step_directories'] == ['prepare-text', 'assemble-report'] assert tasks['summarize']['templateRef']['name'].endswith('/versions/summary-v1') assert tasks['classify-sentiment']['templateRef']['name'].endswith('/versions/sentiment-v3') + assert (output_dir / 'prepare-text' / '1' / 'text_utils.py').exists() assert not (output_dir / 'summarize').exists() assert not (output_dir / 'classify-sentiment').exists() diff --git a/tests/runners/invalid_reserved_asset_pipeline.py b/tests/runners/invalid_reserved_asset_pipeline.py new file mode 100644 index 00000000..c3faf403 --- /dev/null +++ b/tests/runners/invalid_reserved_asset_pipeline.py @@ -0,0 +1,11 @@ +from clarifai.runners.pipelines import Pipeline, step + + +@step(id='collision-step', assets=['./pipeline_step.py']) +def collision_step(input_text: str) -> str: + return input_text + + +with Pipeline(id='asset-pipeline', user_id='me', app_id='my-app') as pipeline: + raw_text = pipeline.input('input_text') + collision_step(input_text=raw_text) diff --git a/tests/runners/pipeline_step.py b/tests/runners/pipeline_step.py new file mode 100644 index 00000000..ec855757 --- /dev/null +++ b/tests/runners/pipeline_step.py @@ -0,0 +1 @@ +print('asset file') diff --git a/tests/runners/sample_module.py b/tests/runners/sample_module.py new file mode 100644 index 00000000..09fd8897 --- /dev/null +++ b/tests/runners/sample_module.py @@ -0,0 +1,2 @@ +def clean_text(value: str) -> str: + return ' '.join(value.strip().split()).lower() diff --git a/tests/runners/test_pipeline_dsl.py b/tests/runners/test_pipeline_dsl.py index 161d2e0c..8a83d32b 100644 --- a/tests/runners/test_pipeline_dsl.py +++ b/tests/runners/test_pipeline_dsl.py @@ -4,17 +4,33 @@ import pytest import yaml -from clarifai.runners.pipelines import ComputeInfo, Pipeline, step, step_ref +from clarifai.runners.pipelines import ( + ComputeInfo, + Pipeline, + load_pipeline_from_file, + step, + step_ref, +) from clarifai.runners.utils.pipeline_validation import PipelineConfigValidator def normalize_text(value: str) -> str: - return ' '.join(value.strip().split()) + import importlib.util + + module_path = Path(__file__).with_name('sample_module.py') + spec = importlib.util.spec_from_file_location('sample_module', module_path) + if spec is None or spec.loader is None: + raise RuntimeError(f'Could not load helper module from {module_path}') + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + return module.clean_text(value) @step( id='prepare-text', requirements=['transformers>=4.0'], + assets=['../assets/sample.txt', '../assets/sample_texts', './sample_module.py'], compute=ComputeInfo(cpu_limit='500m', cpu_memory='500Mi'), ) def prepare_text(input_text: str) -> str: @@ -104,6 +120,9 @@ def test_pipeline_generate_writes_helper_functions_and_expected_files(tmp_path: assert (tmp_path / 'prepare-text' / 'requirements.txt').exists() step_script = tmp_path / 'prepare-text' / '1' / 'pipeline_step.py' assert step_script.exists() + assert (tmp_path / 'prepare-text' / '1' / 'sample.txt').exists() + assert (tmp_path / 'prepare-text' / '1' / 'sample_module.py').exists() + assert (tmp_path / 'prepare-text' / '1' / 'sample_texts' / 'sample1.txt').exists() step_script_content = step_script.read_text(encoding='utf-8') requirements_content = (tmp_path / 'prepare-text' / 'requirements.txt').read_text( @@ -111,12 +130,55 @@ def test_pipeline_generate_writes_helper_functions_and_expected_files(tmp_path: ) assert 'def normalize_text(value: str) -> str:' in step_script_content + assert "with_name('sample_module.py')" in step_script_content assert '@step' not in step_script_content assert 'transformers>=4.0' in requirements_content assert not (tmp_path / 'summarize').exists() assert not (tmp_path / 'classify-sentiment').exists() +def test_pipeline_generate_raises_for_missing_step_asset(tmp_path: Path): + @step(id='missing-asset', assets=['./does-not-exist.txt']) + def missing_asset_step(input_text: str) -> str: + return input_text + + with Pipeline(id='asset-pipeline', user_id='me', app_id='my-app') as pipeline: + raw_text = pipeline.input('input_text') + missing_asset_step(input_text=raw_text) + + with pytest.raises(ValueError, match='asset path does not exist'): + pipeline.generate(str(tmp_path)) + + +def test_pipeline_generate_raises_for_duplicate_asset_basename(tmp_path: Path): + dir_a = tmp_path / 'a' + dir_b = tmp_path / 'b' + dir_a.mkdir() + dir_b.mkdir() + (dir_a / 'helper.py').write_text('# helper a') + (dir_b / 'helper.py').write_text('# helper b') + + @step(id='dup-asset', assets=[str(dir_a / 'helper.py'), str(dir_b / 'helper.py')]) + def dup_asset_step(input_text: str) -> str: + return input_text + + with Pipeline(id='dup-pipeline', user_id='me', app_id='my-app') as pipeline: + raw_text = pipeline.input('input_text') + dup_asset_step(input_text=raw_text) + + with pytest.raises(ValueError, match='duplicate asset basename'): + pipeline.generate(str(tmp_path / 'generated')) + + +def test_pipeline_generate_raises_for_reserved_step_asset_name(tmp_path: Path): + pipeline = load_pipeline_from_file( + str(Path(__file__).with_name('invalid_reserved_asset_pipeline.py')) + ) + + with pytest.raises(ValueError, match='asset basename is reserved'): + pipeline.generate(str(tmp_path / 'generated')) + + def test_validator_collects_only_managed_steps_without_versions(): pipeline = build_pipeline()