Skip to content

Commit bb9ffa7

Browse files
authored
Merge pull request #8441 from fstagni/cherry-pick-2-8c8262ebb7-integration
[sweep:integration] JobAgent - do not fail already rescheduled job
2 parents c792eb6 + 4945c3f commit bb9ffa7

File tree

3 files changed

+34
-22
lines changed

3 files changed

+34
-22
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
"""
2-
The Job Agent class instantiates a CE that acts as a client to a
3-
compute resource and also to the WMS.
4-
The Job Agent constructs a classAd based on the local resource description in the CS
5-
and the current resource status that is used for matching.
2+
The Job Agent class instantiates a CE that acts as a client to a
3+
compute resource and also to the WMS.
4+
The Job Agent constructs a classAd based on the local resource description in the CS
5+
and the current resource status that is used for matching.
66
"""
7+
78
import os
89
import re
910
import sys
@@ -34,6 +35,7 @@
3435
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
3536
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
3637
from DIRAC.WorkloadManagementSystem.Client.PilotManagerClient import PilotManagerClient
38+
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapper import RESCHEDULED
3739
from DIRAC.WorkloadManagementSystem.Utilities.Utils import createJobWrapper
3840

3941

@@ -712,9 +714,9 @@ def _checkSubmittedJobs(self):
712714
self._rescheduleFailedJob(jobID, result["Message"])
713715
self.hostFailureCount += 1
714716

715-
# The payload failed (if result["Value"] is not 0)
716-
elif result["Value"]:
717-
# In order to avoid overriding perfectly valid states, the status is updated iff the job was running
717+
# The payload failed (if result["Value"] is not 0 and the job was not rescheduled)
718+
elif result["Value"] and result["Value"] != RESCHEDULED:
719+
# In order to avoid overriding perfectly valid states, the status is updated if the job was running
718720
res = JobMonitoringClient().getJobsStatus(jobID)
719721
if not res["OK"]:
720722
return res

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapper.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@
5656

5757
CHILD_PID_POLL_INTERVALS = list(range(5, 40, 5))
5858

59+
SUBMISSION_FAILED = -1
60+
SUBMISSION_REPORT_FAILED = -2
61+
JOBWRAPPER_EXCEPTION = -3
62+
INITIALIZATION_FAILED = 1
63+
PAYLOAD_FAILED = 2
64+
FINALIZATION_FAILED = 3
65+
RESCHEDULED = 4
66+
5967

6068
class JobWrapper:
6169
"""The only user of the JobWrapper is the JobWrapperTemplate"""

src/DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
#!/usr/bin/env python
2-
""" This template will become the job wrapper that's actually executed.
2+
"""This template will become the job wrapper that's actually executed.
33
4-
The JobWrapperTemplate is completed and invoked by the jobAgent and uses functionalities from JobWrapper module.
5-
It has to be an executable.
4+
The JobWrapperTemplate is completed and invoked by the jobAgent and uses functionalities from JobWrapper module.
5+
It has to be an executable.
66
7-
The JobWrapperTemplate will reschedule the job according to certain criteria:
8-
- the working directory could not be created
9-
- the jobWrapper initialization phase failed
10-
- the inputSandbox download failed
11-
- the resolution of the inpt data failed
12-
- the JobWrapper ended with the status DErrno.EWMSRESC
7+
The JobWrapperTemplate will reschedule the job according to certain criteria:
8+
- the working directory could not be created
9+
- the jobWrapper initialization phase failed
10+
- the inputSandbox download failed
11+
- the resolution of the inpt data failed
1312
"""
13+
1414
import json
1515
import os
1616
import sys
@@ -24,6 +24,8 @@
2424
Script.parseCommandLine()
2525

2626
from DIRAC import gLogger
27+
28+
from DIRAC.WorkloadManagementSystem.JobWrapper import JobWrapper as JW
2729
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
2830
from DIRAC.WorkloadManagementSystem.JobWrapper.JobWrapperUtilities import (
2931
createAndEnterWorkingDirectory,
@@ -52,7 +54,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
5254
if "InputSandbox" in arguments["Job"]:
5355
jobReport.commit()
5456
if not transferInputSandbox(job, arguments["Job"]["InputSandbox"]):
55-
return 1
57+
return JW.INITIALIZATION_FAILED
5658
else:
5759
gLogger.verbose("Job has no InputSandbox requirement")
5860

@@ -61,7 +63,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
6163
if "InputData" in arguments["Job"]:
6264
if arguments["Job"]["InputData"]:
6365
if not resolveInputData(job):
64-
return 1
66+
return JW.INITIALIZATION_FAILED
6567
else:
6668
gLogger.verbose("Job has a null InputData requirement:")
6769
gLogger.verbose(arguments)
@@ -71,7 +73,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
7173
jobReport.commit()
7274

7375
if not executePayload(job):
74-
return 1
76+
return JW.INITIALIZATION_FAILED
7577

7678
if "OutputSandbox" in arguments["Job"] or "OutputData" in arguments["Job"]:
7779
if not processJobOutputs(job):
@@ -85,7 +87,7 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
8587
##########################################################
8688

8789

88-
ret = -3
90+
ret = JW.JOBWRAPPER_EXCEPTION
8991
try:
9092
jsonFileName = os.path.realpath(__file__) + ".json"
9193
with open(jsonFileName) as f:
@@ -105,9 +107,9 @@ def execute(jobID: int, arguments: dict, jobReport: JobReport):
105107
gLogger.exception("JobWrapperTemplate exception")
106108
try:
107109
jobReport.commit()
108-
ret = -1
110+
ret = JW.SUBMISSION_FAILED
109111
except Exception: # pylint: disable=broad-except
110112
gLogger.exception("Could not commit the job report")
111-
ret = -2
113+
ret = JW.SUBMISSION_REPORT_FAILED
112114

113115
sys.exit(ret)

0 commit comments

Comments
 (0)