Skip to content

Commit ce7a816

Browse files
committed
Updated the code for the Hierarchical Pegasus logger
Excluded the Hierarchical Pegasus logger from testing
1 parent 96f8a4f commit ce7a816

File tree

3 files changed

+30
-17
lines changed

3 files changed

+30
-17
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ addopts="""
6161
--cov-report term-missing \
6262
--cov ./wfcommons \
6363
--ignore wfcommons/wfbench/translator/templates \
64+
--ignore wfcommons/wfinstances/logs/pegasusrec.py \
6465
--no-cov-on-fail \
6566
-ra \
6667
-W ignore"""

wfcommons/wfinstances/instance_analyzer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def build_summary(self,
116116
if extension[1:].isnumeric():
117117
extension = path.splitext(infile.file_id.replace(extension, ''))[1]
118118

119-
# Check if the file is definetly an input
119+
# Check if the file is definitely an input
120120
assert infile.link == FileLink.INPUT, f"{infile.file_id} is not set as input"
121121
_append_file_to_dict(extension, inputs_dict, infile.size)
122122

wfcommons/wfinstances/logs/pegasusrec.py

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030

3131
class HierarchicalPegasusLogsParser(LogsParser):
3232
"""
33-
Parse Pegasus submit directory to generate workflow instance.
34-
This specific parser targets Pegasus submit dir with Hierachical workflows
35-
Which means that some jobs are sub-workflows and must be parse as well.
33+
[OUT OF DATE AND NO LONGER FUNCTIONING]
34+
[LIKELY TO BE DEPRECATED/REMOVED AT SOME POINT ANYWAY]
35+
Parse Pegasus submit directory to generate workflow instance.
36+
This specific parser targets Pegasus submit dir with Hierarchical workflows
37+
Which means that some jobs are sub-workflows and must be parsed as well.
3638
This parser recursively parse each sub workflow and rebuild a coherent workflow.
3739
3840
WARNING: one level of recursion for now (i.e., sub-workflow cannot have sub-workflow)
@@ -204,9 +206,9 @@ def _parse_braindump(self):
204206
# create base workflow instance object
205207
self.workflow = Workflow(name=self.workflow_name,
206208
description=self.description,
207-
wms_name=self.wms_name,
208-
wms_version=wms_version,
209-
wms_url=self.wms_url,
209+
runtime_system_name=self.wms_name,
210+
runtime_system_version=wms_version,
211+
runtime_system_url=self.wms_url,
210212
executed_at=executed_at)
211213

212214
def _parse_workflow(self):
@@ -232,12 +234,15 @@ def _parse_workflow(self):
232234
task_name = f"{j['name']}_{j['id']}"
233235

234236
list_files = [File(
235-
name=f['lfn'],
237+
file_id=f['lfn'],
236238
size=0,
237239
link=FileLink(f['type']),
238240
logger=self.logger
239241
) for f in j['uses']]
240242

243+
input_files = [f for f in list_files if f.link == FileLink.INPUT]
244+
output_files = [f for f in list_files if f.link == FileLink.OUTPUT]
245+
241246
self.workflow.add_node(
242247
task_name,
243248
task=Task(
@@ -248,7 +253,8 @@ def _parse_workflow(self):
248253
runtime=0,
249254
args=j['arguments'],
250255
cores=0,
251-
files=list_files,
256+
input_files=input_files,
257+
output_files=output_files,
252258
logger=self.logger
253259
)
254260
)
@@ -267,7 +273,8 @@ def _parse_workflow(self):
267273
runtime=0,
268274
args=j['arguments'],
269275
cores=0,
270-
files=list_files,
276+
input_files=input_files,
277+
output_files=output_files,
271278
logger=self.logger
272279
)
273280
)
@@ -311,12 +318,15 @@ def _parse_dax(self):
311318
task_name = str(j.get('name')) + '_' + str(j.get('id'))
312319

313320
list_files = [File(
314-
name=f.get('name') if not f.get('name') is None else f.get('file'),
321+
file_id=f.get('name') if not f.get('name') is None else f.get('file'),
315322
size=0,
316323
link=FileLink(f.get('link')),
317324
logger=self.logger
318325
) for f in j.findall('{http://pegasus.isi.edu/schema/DAX}uses')]
319326

327+
input_files = [f for f in list_files if f.link == FileLink.INPUT]
328+
output_files = [f for f in list_files if f.link == FileLink.OUTPUT]
329+
320330
self.workflow.add_node(
321331
task_name,
322332
task=Task(
@@ -327,7 +337,8 @@ def _parse_dax(self):
327337
runtime=0,
328338
args=[],
329339
cores=0,
330-
files=list_files,
340+
input_files=input_files,
341+
output_files=output_files,
331342
logger=self.logger
332343
)
333344
)
@@ -391,7 +402,8 @@ def _parse_dag(self):
391402
runtime=0,
392403
args=[],
393404
cores=0,
394-
files=[],
405+
input_files=[],
406+
output_files=[],
395407
logger=self.logger
396408
)
397409
)
@@ -646,15 +658,15 @@ def _parse_job_output_legacy(self, task: Task, output_file_path: pathlib.Path) -
646658
machine['memory'] = int(r.get('total'))
647659
for c in u.findall('{http://pegasus.isi.edu/schema/invocation}cpu'):
648660
machine['cpu'] = {
649-
'count': int(c.get('count')),
650-
'speed': int(c.get('speed')),
661+
'coreCount': int(c.get('count')),
662+
'speedInMHz': int(c.get('speed')),
651663
'vendor': c.get('vendor')
652664
}
653665
task.machine = Machine(
654666
name=machine['nodeName'],
655667
cpu={
656-
'count': machine['cpu']['count'],
657-
'speed': machine['cpu']['speed'],
668+
'coreCount': machine['cpu']['count'],
669+
'speedInMHz': machine['cpu']['speed'],
658670
'vendor': machine['cpu']['vendor']
659671
},
660672
system=machine['system'],

0 commit comments

Comments
 (0)