@@ -634,115 +634,6 @@ def generate_input_file(self, path: pathlib.Path) -> None:
634634 path .write_text (json .dumps (inputs , indent = 2 ))
635635 input ("Please fill up the input file and press ENTER to continue..." )
636636
637- def run (self , json_path : pathlib .Path , save_dir : pathlib .Path ) -> None :
638- """
639- Run the benchmark workflow locally (for test purposes only).
640-
641- :param json_path:
642- :type json_path: pathlib.Path
643- :param: save_dir:
644- :type save_dir: pathlib.Path
645- """
646- self .logger .debug ("Running" )
647-
648- try :
649- wf = json .loads (json_path .read_text ())
650- with save_dir .joinpath (f"run.txt" ).open ("w+" ) as fp :
651- print ("Starting run..." )
652- has_executed : Set [str ] = set ()
653- procs : List [subprocess .Popen ] = []
654- print ("Workflow tasks:" , len (wf ["workflow" ]["specification" ]["tasks" ]))
655- print ("Has executed:" , len (has_executed ))
656-
657- while len (has_executed ) < len (wf ["workflow" ]["specification" ]["tasks" ]):
658- print ("In while loop" )
659- for task in wf ["workflow" ]["specification" ]["tasks" ]:
660- input_files = {}
661- if task ["name" ] in has_executed :
662- print (f'{ task ["name" ]} has executed...' )
663- continue
664-
665- # Collect input files
666- for input_file_name in task ["inputFiles" ]:
667- input_files ["name" ] = input_file_name
668-
669- for entry in wf ["workflow" ]["specification" ]["files" ]:
670- if input_file_name in entry ["id" ]:
671- print (f"Entry: { entry } " )
672- sizeInBytes = entry [input_file_name ]["sizeInBytes" ]
673- input_files [input_file_name ]["size" ] = sizeInBytes
674-
675- # Generate the input files
676- if input_files :
677- print (f"Creating files: { input_files } " )
678- generate_sys_data (num_files = 1 ,
679- tasks = input_files ,
680- save_dir = save_dir )
681-
682- real_file_names = [f"{ save_dir .joinpath (input_file )} " for input_file in input_files ]
683- if ready_to_execute := all ([
684- pathlib .Path (input_file ).exists ()
685- for input_file in real_file_names
686- ]):
687- print ("Ready to execute:" , ready_to_execute )
688- else :
689- # Print the files that are not ready to execute
690- print ("Not ready to execute:" , [
691- input_file for input_file in real_file_names
692- if not pathlib .Path (input_file ).exists ()
693- ])
694- continue
695-
696- print (f"Executing task: { task ['name' ]} " )
697- has_executed .add (task ["name" ])
698-
699- executable = task ["command" ]["program" ]
700- executable = str (this_dir .parent .parent .joinpath (f"bin/{ executable } " ))
701- arguments = task ["command" ]["arguments" ]
702- # Function to clean and adjust the list entries
703- arguments = [clean_entry (entry ) for entry in arguments ]
704-
705- # arguments = [
706- # # --[opt] [value] -> --[opt]=[value]
707- # re.sub(r'--(.*?) (.*)', r'--\1=\2', argument)
708- # for argument in arguments
709- # ]
710- print ("ARGUMENTS" , arguments )
711-
712- for arg in arguments :
713- if "--out" in arg :
714- files = assigning_correct_files (task )
715- print ("FILES" , files )
716- program = ["time" , "python" ,
717- executable , * arguments , * files ]
718- else :
719- program = ["time" , "python" ,
720- executable , * arguments ]
721-
722-
723- print ("Prog:" , program )
724-
725- # folder = pathlib.Path(f"wfbench_execution/{uuid.uuid4()}")
726- # folder.mkdir(exist_ok=True, parents=True)
727- proc = subprocess .Popen (program , stdout = fp , stderr = fp , cwd = save_dir )
728- print (proc .args )
729- procs .append (proc )
730-
731- print ("#Tasks executed:" , len (has_executed ))
732- time .sleep (1 )
733-
734- for proc in procs :
735- proc .wait ()
736-
737- cleanup_sys_files ()
738-
739- except Exception as e :
740- subprocess .Popen (["killall" , "stress-ng" ])
741- cleanup_sys_files ()
742- import traceback
743- traceback .print_exc ()
744- raise FileNotFoundError ("Not able to find the executable." )
745-
746637
747638def generate_sys_data (num_files : int , tasks : Dict [str , int ], save_dir : pathlib .Path ) -> None :
748639 """Generate workflow's input data
0 commit comments