Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions clarifai/runners/pipelines/codegen.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import ast
import inspect
import os
import shutil
import subprocess
import textwrap
from typing import List, Sequence, Set
Expand Down Expand Up @@ -37,6 +38,8 @@ def _get_node_source(source_lines: Sequence[str], node: ast.AST) -> str:
}
)

_RESERVED_STEP_ASSET_NAMES = frozenset({'pipeline_step.py'})


def _is_dsl_only_import(node: ast.AST) -> bool:
"""Return True if an import node pulls exclusively from DSL-only modules."""
Expand Down Expand Up @@ -149,7 +152,62 @@ def _build_step_script(step_definition) -> str:
)


def _resolve_step_assets(step_definition) -> list:
"""Resolve and validate asset paths for a step.

Returns a list of (normalized_path, asset_name) tuples.
Raises ValueError if any asset path does not exist, basenames are duplicated,
or a basename collides with a generated file.
"""
source_file = inspect.getsourcefile(step_definition.func)
if source_file is None:
raise ValueError(f'Could not determine source file for step {step_definition.id}')

source_dir = os.path.dirname(os.path.abspath(source_file))
resolved = []
seen_names = set()
assets = step_definition.assets
if isinstance(assets, (str, os.PathLike)):
assets = [assets]

for asset in assets:
if not str(asset).strip():
continue
raw_path = str(asset).strip()
asset_path = raw_path if os.path.isabs(raw_path) else os.path.join(source_dir, raw_path)
normalized_asset_path = os.path.normpath(asset_path)

if not os.path.exists(normalized_asset_path):
raise ValueError(f"Step '{step_definition.id}' asset path does not exist: {raw_path}")

asset_name = os.path.basename(normalized_asset_path)
if asset_name in _RESERVED_STEP_ASSET_NAMES:
raise ValueError(
f"Step '{step_definition.id}' asset basename is reserved: {asset_name}"
)
if asset_name in seen_names:
raise ValueError(
f"Step '{step_definition.id}' has duplicate asset basename: {asset_name}"
Comment thread
nitinbhojwani marked this conversation as resolved.
)
seen_names.add(asset_name)
resolved.append((normalized_asset_path, asset_name))

return resolved


def _copy_step_assets(resolved: list, version_dir: str) -> None:
for normalized_asset_path, asset_name in resolved:
destination = os.path.join(version_dir, asset_name)
if os.path.isdir(normalized_asset_path):
shutil.copytree(normalized_asset_path, destination, dirs_exist_ok=True)
else:
shutil.copy2(normalized_asset_path, destination)


def generate_step_directory(step_definition, output_dir: str, user_id: str, app_id: str) -> str:
# Validate assets early so we get a clear error before doing any file I/O.
resolved_assets = _resolve_step_assets(step_definition)

step_dir = os.path.join(output_dir, step_definition.id)
version_dir = os.path.join(step_dir, '1')
os.makedirs(version_dir, exist_ok=True)
Expand Down Expand Up @@ -177,6 +235,8 @@ def generate_step_directory(step_definition, output_dir: str, user_id: str, app_
with open(step_script_path, 'w', encoding='utf-8') as handle:
handle.write(_build_step_script(step_definition))

_copy_step_assets(resolved_assets, version_dir)

_format_file(step_script_path)

return step_dir
Expand Down
4 changes: 4 additions & 0 deletions clarifai/runners/pipelines/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ def __init__(
*,
id: Optional[str] = None,
requirements=None,
assets=None,
compute: Optional[ComputeInfo] = None,
python_version: str = '3.12',
secrets: Optional[Dict[str, str]] = None,
):
self.func = func
self.id = id or func.__name__.replace('_', '-')
self.requirements = requirements or []
self.assets = assets or []
self.compute = compute or ComputeInfo()
self.python_version = python_version
self.secrets = secrets or {}
Expand Down Expand Up @@ -210,6 +212,7 @@ def step(
*,
id: Optional[str] = None,
requirements=None,
assets=None,
compute: Optional[ComputeInfo] = None,
python_version: str = '3.12',
secrets: Optional[Dict[str, str]] = None,
Expand All @@ -219,6 +222,7 @@ def decorator(func: Callable[..., Any]) -> StepDefinition:
func,
id=id,
requirements=requirements,
assets=assets,
compute=compute,
python_version=python_version,
secrets=secrets,
Expand Down
15 changes: 13 additions & 2 deletions examples/pipeline_dsl_text_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,29 @@

def normalize_text(value: str) -> str:
"""Small helper intentionally kept outside the step for codegen extraction."""
return " ".join(value.strip().split())
import importlib.util
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a clean Dev experience and introduces a dependency between this helper and any calling steps (e.g., all steps using this helper must include the text_utils asset.

Two options:

  1. avoid use of importlib directly, and just do import text_utils. This is cleaner, but does not fix the cascading dependency issue
  2. declare dependencies on this function, and have the step compiler collect assets from called functions

from pathlib import Path

module_path = Path(__file__).with_name("text_utils.py")
spec = importlib.util.spec_from_file_location("text_utils", module_path)
if spec is None or spec.loader is None:
raise RuntimeError(f"Could not load helper module from {module_path}")

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.clean_text(value)


@step(
id="prepare-text",
requirements=["transformers>=4.0"],
assets=["./text_utils.py"],
compute=ComputeInfo(cpu_limit="500m", cpu_memory="500Mi"),
)
def prepare_text(input_text: str) -> str:
"""Normalize text before downstream processing."""
cleaned = normalize_text(input_text)
return cleaned.lower()
return cleaned


summarize = step_ref.from_url(
Expand Down
2 changes: 2 additions & 0 deletions examples/text_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def clean_text(value: str) -> str:
return ' '.join(value.strip().split()).lower()
1 change: 1 addition & 0 deletions tests/cli/test_pipeline_dsl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,6 @@ def test_generate_real_example_pipeline_writes_mixed_step_config(tmp_path: Path)
assert config['pipeline']['step_directories'] == ['prepare-text', 'assemble-report']
assert tasks['summarize']['templateRef']['name'].endswith('/versions/summary-v1')
assert tasks['classify-sentiment']['templateRef']['name'].endswith('/versions/sentiment-v3')
assert (output_dir / 'prepare-text' / '1' / 'text_utils.py').exists()
assert not (output_dir / 'summarize').exists()
assert not (output_dir / 'classify-sentiment').exists()
11 changes: 11 additions & 0 deletions tests/runners/invalid_reserved_asset_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from clarifai.runners.pipelines import Pipeline, step


@step(id='collision-step', assets=['./pipeline_step.py'])
def collision_step(input_text: str) -> str:
return input_text


with Pipeline(id='asset-pipeline', user_id='me', app_id='my-app') as pipeline:
raw_text = pipeline.input('input_text')
collision_step(input_text=raw_text)
1 change: 1 addition & 0 deletions tests/runners/pipeline_step.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
print('asset file')
2 changes: 2 additions & 0 deletions tests/runners/sample_module.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
def clean_text(value: str) -> str:
return ' '.join(value.strip().split()).lower()
66 changes: 64 additions & 2 deletions tests/runners/test_pipeline_dsl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,33 @@
import pytest
import yaml

from clarifai.runners.pipelines import ComputeInfo, Pipeline, step, step_ref
from clarifai.runners.pipelines import (
ComputeInfo,
Pipeline,
load_pipeline_from_file,
step,
step_ref,
)
from clarifai.runners.utils.pipeline_validation import PipelineConfigValidator


def normalize_text(value: str) -> str:
return ' '.join(value.strip().split())
import importlib.util

module_path = Path(__file__).with_name('sample_module.py')
spec = importlib.util.spec_from_file_location('sample_module', module_path)
if spec is None or spec.loader is None:
raise RuntimeError(f'Could not load helper module from {module_path}')

module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module.clean_text(value)


@step(
id='prepare-text',
requirements=['transformers>=4.0'],
assets=['../assets/sample.txt', '../assets/sample_texts', './sample_module.py'],
compute=ComputeInfo(cpu_limit='500m', cpu_memory='500Mi'),
)
def prepare_text(input_text: str) -> str:
Expand Down Expand Up @@ -104,19 +120,65 @@ def test_pipeline_generate_writes_helper_functions_and_expected_files(tmp_path:
assert (tmp_path / 'prepare-text' / 'requirements.txt').exists()
step_script = tmp_path / 'prepare-text' / '1' / 'pipeline_step.py'
assert step_script.exists()
assert (tmp_path / 'prepare-text' / '1' / 'sample.txt').exists()
assert (tmp_path / 'prepare-text' / '1' / 'sample_module.py').exists()
assert (tmp_path / 'prepare-text' / '1' / 'sample_texts' / 'sample1.txt').exists()

step_script_content = step_script.read_text(encoding='utf-8')
requirements_content = (tmp_path / 'prepare-text' / 'requirements.txt').read_text(
encoding='utf-8'
)

assert 'def normalize_text(value: str) -> str:' in step_script_content
assert "with_name('sample_module.py')" in step_script_content
assert '@step' not in step_script_content
assert 'transformers>=4.0' in requirements_content
assert not (tmp_path / 'summarize').exists()
assert not (tmp_path / 'classify-sentiment').exists()


def test_pipeline_generate_raises_for_missing_step_asset(tmp_path: Path):
@step(id='missing-asset', assets=['./does-not-exist.txt'])
def missing_asset_step(input_text: str) -> str:
return input_text

with Pipeline(id='asset-pipeline', user_id='me', app_id='my-app') as pipeline:
raw_text = pipeline.input('input_text')
missing_asset_step(input_text=raw_text)

with pytest.raises(ValueError, match='asset path does not exist'):
pipeline.generate(str(tmp_path))


def test_pipeline_generate_raises_for_duplicate_asset_basename(tmp_path: Path):
dir_a = tmp_path / 'a'
dir_b = tmp_path / 'b'
dir_a.mkdir()
dir_b.mkdir()
(dir_a / 'helper.py').write_text('# helper a')
(dir_b / 'helper.py').write_text('# helper b')

@step(id='dup-asset', assets=[str(dir_a / 'helper.py'), str(dir_b / 'helper.py')])
def dup_asset_step(input_text: str) -> str:
return input_text

with Pipeline(id='dup-pipeline', user_id='me', app_id='my-app') as pipeline:
raw_text = pipeline.input('input_text')
dup_asset_step(input_text=raw_text)

with pytest.raises(ValueError, match='duplicate asset basename'):
pipeline.generate(str(tmp_path / 'generated'))


def test_pipeline_generate_raises_for_reserved_step_asset_name(tmp_path: Path):
pipeline = load_pipeline_from_file(
str(Path(__file__).with_name('invalid_reserved_asset_pipeline.py'))
)

with pytest.raises(ValueError, match='asset basename is reserved'):
pipeline.generate(str(tmp_path / 'generated'))


def test_validator_collects_only_managed_steps_without_versions():
pipeline = build_pipeline()

Expand Down
Loading