-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathtask.py
More file actions
89 lines (67 loc) · 2.48 KB
/
task.py
File metadata and controls
89 lines (67 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
from abc import ABC
from os.path import join
from pathlib import Path
from typing import TYPE_CHECKING, TypeVar, Generic
from utz import err
from ctbk.has_url import HasURL
from ctbk.paths import S3
if TYPE_CHECKING:
from dvx.run.artifact import Artifact, Computation
T = TypeVar("T")
class Task(HasURL, ABC, Generic[T]):
DIR = None
NAMES = []
def __init__(self):
HasURL.__init__(self)
def _create(self) -> T | None:
raise NotImplementedError
def create(self) -> T:
url = self.url
err(f'Writing {url}')
return self._create()
def read(self) -> T:
raise NotImplementedError
@property
def dir(self) -> str:
assert self.DIR
return join(S3, self.DIR)
# DVX integration methods
def deps(self) -> list["Task"]:
"""Return upstream Task dependencies.
Override in subclasses to specify what this task depends on.
Default is no dependencies (leaf node).
"""
return []
@property
def cmd(self) -> str | None:
"""CLI command that produces this artifact.
Override in subclasses to specify the command.
Default is None (leaf node / imported data).
"""
return None
def dep_artifacts(self) -> list["Artifact"]:
"""Return dependency artifacts for this task.
Override in subclasses that have non-Task dependencies (e.g., DvcBlob).
Default implementation converts deps() tasks to artifacts.
"""
return [dep.to_artifact() for dep in self.deps()]
def to_artifact(self) -> "Artifact":
"""Convert this Task to a DVX Artifact with computation info.
Returns:
Artifact with path, computation (if cmd is set), and optional hash/size
"""
from dvx.run.artifact import Artifact, Computation
path = Path(self.url)
cmd = self.cmd
dep_artifacts = self.dep_artifacts()
if cmd and dep_artifacts:
# Computed artifact with dependencies
computation = Computation(cmd=cmd, deps=dep_artifacts)
return Artifact(path=path, computation=computation)
elif cmd:
# Computed artifact with no deps (unusual but possible)
computation = Computation(cmd=cmd)
return Artifact(path=path, computation=computation)
else:
# Leaf node - just return artifact from existing .dvc file if possible
return Artifact.from_dvc(path) or Artifact(path=path)