Skip to content

Commit c23c53e

Browse files
committed
fix: getting cpu work left from a single source of truth
1 parent 7b80cd8 commit c23c53e

File tree

6 files changed

+320
-139
lines changed

6 files changed

+320
-139
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
2828
from DIRAC.RequestManagementSystem.Client.Request import Request
2929
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
30-
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
3130
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
3231
from DIRAC.WorkloadManagementSystem.Client import JobStatus, PilotStatus
3332
from DIRAC.WorkloadManagementSystem.Client.JobManagerClient import JobManagerClient
@@ -81,10 +80,9 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties=None):
8180
self.defaultWrapperLocation = "DIRAC/WorkloadManagementSystem/JobWrapper/JobWrapperTemplate.py"
8281

8382
# Timeleft
84-
self.initTimes = os.times()
8583
self.initTimeLeft = 0.0
8684
self.timeLeft = self.initTimeLeft
87-
self.timeLeftUtil = None
85+
self.initTime = time.time()
8886
self.pilotInfoReportedFlag = False
8987

9088
# Attributes related to the processed jobs, it should take the following form:
@@ -109,16 +107,11 @@ def initialize(self):
109107
if not result["OK"]:
110108
return result
111109

112-
result = self._getCEDict(self.computingElement)
113-
if not result["OK"]:
114-
return result
115-
ceDict = result["Value"][0]
116-
117-
self.initTimeLeft = ceDict.get("CPUTime", self.initTimeLeft)
118-
self.initTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", self.initTimeLeft)
110+
# Read initial CPU work left from config (seeded by pilot via dirac-wms-get-queue-cpu-time)
111+
self.initTimeLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", self.initTimeLeft)
119112
self.timeLeft = self.initTimeLeft
120113

121-
self.initTimes = os.times()
114+
self.initTime = time.time()
122115
# Localsite options
123116
self.siteName = siteName()
124117
self.pilotReference = gConfig.getValue("/LocalSite/PilotReference", self.pilotReference)
@@ -136,9 +129,6 @@ def initialize(self):
136129
self.logLevel = self.am_getOption("DefaultLogLevel", self.logLevel)
137130
self.defaultWrapperLocation = self.am_getOption("JobWrapperTemplate", self.defaultWrapperLocation)
138131

139-
# Utilities
140-
self.timeLeftUtil = TimeLeft()
141-
142132
# Some innerCEs may want to make use of CGroup2 support, so we prepare it globally here
143133
res = CG2Manager().setUp()
144134
if res["OK"]:
@@ -403,24 +393,19 @@ def _checkCEAvailability(self, computingElement):
403393
return S_OK()
404394

405395
#############################################################################
406-
def _computeCPUWorkLeft(self, processors=1):
396+
def _computeCPUWorkLeft(self):
407397
"""
408-
Compute CPU Work Left in hepspec06 seconds
398+
Compute CPU Work Left in hepspec06 seconds.
399+
400+
Uses a simple wall-clock countdown from the initial value (seeded by the pilot
401+
via dirac-wms-get-queue-cpu-time). The elapsed wall-clock time is multiplied by
402+
the CPU normalization factor to get the consumed CPU work.
409403
410-
:param int processors: number of processors available
411404
:return: cpu work left (cpu time left * cpu power of the cpus)
412405
"""
413-
# Sum all times but the last one (elapsed_time) and remove times at init (is this correct?)
414-
cpuTimeConsumed = sum(os.times()[:-1]) - sum(self.initTimes[:-1])
415-
result = self.timeLeftUtil.getTimeLeft(cpuTimeConsumed, processors)
416-
if not result["OK"]:
417-
self.log.warn("There were errors calculating time left using the Timeleft utility", result["Message"])
418-
self.log.warn("The time left will be calculated using os.times() and the info in our possession")
419-
self.log.info(f"Current raw CPU time consumed is {cpuTimeConsumed}")
420-
if self.cpuFactor:
421-
return self.initTimeLeft - cpuTimeConsumed * self.cpuFactor
422-
return self.timeLeft
423-
return result["Value"]
406+
elapsed = time.time() - self.initTime
407+
cpuWorkConsumed = elapsed * self.cpuFactor
408+
return self.initTimeLeft - cpuWorkConsumed
424409

425410
def _checkCPUWorkLeft(self, cpuWorkLeft):
426411
"""Check that fillingMode is enabled and time left is sufficient to continue the execution"""

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from DIRAC.Core.Security.X509Chain import X509Chain # pylint: disable=import-error
1212

1313
from DIRAC import S_ERROR, S_OK, gLogger
14-
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
1514
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
1615
from DIRAC.Resources.Computing.test.Test_PoolComputingElement import badJobScript, jobScript
1716
from DIRAC.WorkloadManagementSystem.Agent.JobAgent import JobAgent
@@ -150,28 +149,27 @@ def test__checkCEAvailability(mocker, ceType, mockCEReply, expectedResult):
150149

151150

152151
@pytest.mark.parametrize(
153-
"initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft",
152+
"initTimeLeft, cpuFactor, elapsedSeconds, expectedTimeLeft",
154153
[
155-
(100000, 75000, None, {"OK": False, "Message": "Error"}, 75000),
156-
(100000, 75000, 10, {"OK": False, "Message": "Error"}, 100000),
157-
(100000, 75000, 10, {"OK": True, "Value": 25000}, 25000),
154+
# No CPU factor: no work consumed, time left equals initial
155+
(100000, 0, 100, 100000),
156+
# With CPU factor: elapsed * cpuFactor is subtracted from initTimeLeft
157+
(100000, 10, 100, 99000),
158+
# Longer elapsed time
159+
(100000, 10, 5000, 50000),
158160
],
159161
)
160-
def test__computeCPUWorkLeft(mocker, initTimeLeft, timeLeft, cpuFactor, mockTimeLeftReply, expectedTimeLeft):
162+
def test__computeCPUWorkLeft(mocker, initTimeLeft, cpuFactor, elapsedSeconds, expectedTimeLeft):
161163
"""Test JobAgent()._computeCPUWorkLeft()"""
162164
mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
163-
mocker.patch(
164-
"DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft.TimeLeft.getTimeLeft", return_value=mockTimeLeftReply
165-
)
166165

167166
jobAgent = JobAgent("Test", "Test1")
168167
jobAgent.log = gLogger
169168
jobAgent.log.setLevel("DEBUG")
170-
jobAgent.timeLeftUtil = TimeLeft()
171169

172170
jobAgent.initTimeLeft = initTimeLeft
173-
jobAgent.timeLeft = timeLeft
174171
jobAgent.cpuFactor = cpuFactor
172+
jobAgent.initTime = time.time() - elapsedSeconds
175173
result = jobAgent._computeCPUWorkLeft()
176174

177175
assert abs(result - expectedTimeLeft) < 10

src/DIRAC/WorkloadManagementSystem/Client/CPUNormalization.py

Lines changed: 50 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,11 @@
1414

1515

1616
def getCPUTime(cpuNormalizationFactor):
17-
"""Trying to get CPUTime left for execution (in seconds).
17+
"""Compute the initial CPUTime left for execution (in seconds).
1818
19-
It will first look to get the work left looking for batch system information useing the TimeLeft utility.
20-
If it succeeds, it will convert it in real second, and return it.
21-
22-
If it fails, it tries to get it from the static info found in CS.
23-
If it fails, it returns the default, which is a large 9999999, that we may consider as "Infinite".
24-
25-
This is a generic method, independent from the middleware of the resource if TimeLeft doesn't return a value
19+
This is called at pilot bootstrap (via dirac-wms-get-queue-cpu-time) to seed
20+
the initial CPUTimeLeft value. It queries the batch system first, then falls
21+
back to static CS configuration.
2622
2723
args:
2824
cpuNormalizationFactor (float): the CPU power of the current Worker Node.
@@ -31,55 +27,58 @@ def getCPUTime(cpuNormalizationFactor):
3127
returns:
3228
cpuTimeLeft (int): the CPU time left, in seconds
3329
"""
34-
cpuTimeLeft = 0.0
35-
cpuWorkLeft = gConfig.getValue("/LocalSite/CPUTimeLeft", 0)
36-
37-
if not cpuWorkLeft:
38-
# Try and get the information from the CPU left utility
39-
result = TimeLeft().getTimeLeft()
40-
if result["OK"]:
41-
cpuWorkLeft = result["Value"]
42-
43-
if cpuWorkLeft > 0:
44-
# This is in HS06sseconds
45-
# We need to convert in real seconds
46-
if not cpuNormalizationFactor: # if cpuNormalizationFactor passed in is 0, try get it from the local cfg
30+
31+
# 1. Try to compute time left from the batch system (sacct, qstat, etc.)
32+
result = TimeLeft().getTimeLeft()
33+
if result["OK"]:
34+
cpuWorkLeft = result["Value"]
35+
# Batch system answered — trust it, even if 0
36+
if not cpuNormalizationFactor:
4737
cpuNormalizationFactor = gConfig.getValue("/LocalSite/CPUNormalizationFactor", 0.0)
4838
if cpuNormalizationFactor:
49-
cpuTimeLeft = cpuWorkLeft / cpuNormalizationFactor
39+
return int(cpuWorkLeft / cpuNormalizationFactor)
40+
return 0
5041

51-
if not cpuTimeLeft:
52-
# now we know that we have to find the CPUTimeLeft by looking in the CS
53-
# this is not granted to be correct as the CS units may not be real seconds
54-
gridCE = gConfig.getValue("/LocalSite/GridCE")
55-
ceQueue = gConfig.getValue("/LocalSite/CEQueue")
56-
if not ceQueue:
57-
# we have to look for a ceQueue in the CS
58-
# A bit hacky. We should better profit from something generic
59-
gLogger.warn("No CEQueue in local configuration, looking to find one in CS")
60-
siteName = DIRAC.siteName()
61-
queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues"
62-
res = gConfig.getSections(queueSection)
63-
if not res["OK"]:
64-
raise RuntimeError(res["Message"])
65-
queues = res["Value"]
66-
cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 9999999.0) for queue in queues]
67-
# These are (real, wall clock) minutes - damn BDII!
42+
cpuTimeLeft = 0.0
43+
44+
# 2. Fall back to queue configuration in the CS.
45+
# These values are wall-clock minutes from BDII, so we convert to seconds.
46+
gridCE = gConfig.getValue("/LocalSite/GridCE")
47+
ceQueue = gConfig.getValue("/LocalSite/CEQueue")
48+
if not ceQueue:
49+
# we have to look for a ceQueue in the CS
50+
# A bit hacky. We should better profit from something generic
51+
gLogger.warn("No CEQueue in local configuration, looking to find one in CS")
52+
siteName = DIRAC.siteName()
53+
queueSection = f"/Resources/Sites/{siteName.split('.')[0]}/{siteName}/CEs/{gridCE}/Queues"
54+
res = gConfig.getSections(queueSection)
55+
if not res["OK"]:
56+
raise RuntimeError(res["Message"])
57+
queues = res["Value"]
58+
cpuTimes = [gConfig.getValue(queueSection + "/" + queue + "/maxCPUTime", 0.0) for queue in queues]
59+
cpuTimes = [t for t in cpuTimes if t > 0]
60+
if cpuTimes:
6861
cpuTimeLeft = min(cpuTimes) * 60
62+
else:
63+
queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}")
64+
if not queueInfo["OK"] or not queueInfo["Value"]:
65+
gLogger.warn("Can't find a CE/queue in CS")
6966
else:
70-
queueInfo = getQueueInfo(f"{gridCE}/{ceQueue}")
71-
cpuTimeLeft = 9999999.0
72-
if not queueInfo["OK"] or not queueInfo["Value"]:
73-
gLogger.warn("Can't find a CE/queue, defaulting CPUTime to %d" % cpuTimeLeft)
67+
queueCSSection = queueInfo["Value"]["QueueCSSection"]
68+
cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0)
69+
if cpuTimeInMinutes:
70+
cpuTimeLeft = cpuTimeInMinutes * 60.0
71+
gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}")
7472
else:
75-
queueCSSection = queueInfo["Value"]["QueueCSSection"]
76-
# These are (real, wall clock) minutes - damn BDII!
77-
cpuTimeInMinutes = gConfig.getValue(f"{queueCSSection}/maxCPUTime", 0.0)
78-
if cpuTimeInMinutes:
79-
cpuTimeLeft = cpuTimeInMinutes * 60.0
80-
gLogger.info(f"CPUTime for {queueCSSection}: {cpuTimeLeft:f}")
81-
else:
82-
gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}, defaulting CPUTime to {cpuTimeLeft:f}")
73+
gLogger.warn(f"Can't find maxCPUTime for {queueCSSection}")
74+
75+
if not cpuTimeLeft:
76+
# 3. Last resort: global default from CS, or 0 (fail safe: match no more jobs)
77+
cpuTimeLeft = gConfig.getValue("/Resources/Computing/CEDefaults/MaxCPUTime", 0)
78+
if cpuTimeLeft:
79+
gLogger.warn(f"Using fallback MaxCPUTime: {cpuTimeLeft}")
80+
else:
81+
gLogger.warn("Could not determine CPUTime left")
8382

8483
return int(cpuTimeLeft)
8584

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
"""Unit tests for CPUNormalization.getCPUTime()"""
2+
from unittest.mock import patch
3+
4+
from DIRAC import S_OK, S_ERROR
5+
6+
7+
@patch("DIRAC.WorkloadManagementSystem.Client.CPUNormalization.TimeLeft")
8+
@patch("DIRAC.WorkloadManagementSystem.Client.CPUNormalization.gConfig")
9+
class TestGetCPUTime:
10+
"""Tests for getCPUTime() fallback chain."""
11+
12+
def _import_getCPUTime(self):
13+
from DIRAC.WorkloadManagementSystem.Client.CPUNormalization import getCPUTime
14+
15+
return getCPUTime
16+
17+
def test_from_batch_system(self, mock_gConfig, mock_TimeLeft):
18+
"""Primary path: batch system returns CPU work left."""
19+
mock_gConfig.getValue.return_value = 0
20+
mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(30000) # HS06*s
21+
22+
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)
23+
24+
# 30000 / 10.0 = 3000 seconds
25+
assert result == 3000
26+
mock_TimeLeft.return_value.getTimeLeft.assert_called_once()
27+
28+
def test_batch_system_returns_zero(self, mock_gConfig, mock_TimeLeft):
29+
"""When batch system reports 0 time left, trust it and return 0."""
30+
mock_gConfig.getValue.return_value = 0
31+
mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(0)
32+
33+
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)
34+
35+
assert result == 0
36+
# Should NOT fall through to CS fallbacks
37+
mock_gConfig.getValue.assert_not_called()
38+
39+
def test_from_queue_cs(self, mock_gConfig, mock_TimeLeft):
40+
"""Fallback: batch system fails, uses queue maxCPUTime from CS."""
41+
mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info")
42+
43+
config_values = {
44+
"/LocalSite/GridCE": "ce.example.com",
45+
"/LocalSite/CEQueue": "default",
46+
"/LocalSite/Site": "LCG.Example.com",
47+
}
48+
49+
def mock_getValue(key, default=0):
50+
if key in config_values:
51+
return config_values[key]
52+
# maxCPUTime in minutes
53+
if "maxCPUTime" in key:
54+
return 120.0 # 120 minutes
55+
return default
56+
57+
mock_gConfig.getValue.side_effect = mock_getValue
58+
59+
with patch(
60+
"DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo",
61+
return_value=S_OK(
62+
{"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"}
63+
),
64+
):
65+
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)
66+
67+
# 120 minutes * 60 = 7200 seconds
68+
assert result == 7200
69+
70+
def test_fallback_max_cpu_time(self, mock_gConfig, mock_TimeLeft):
71+
"""Last resort: everything fails, uses /Resources/Computing/CEDefaults/MaxCPUTime."""
72+
mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info")
73+
74+
config_values = {
75+
"/LocalSite/GridCE": "ce.example.com",
76+
"/LocalSite/CEQueue": "default",
77+
"/LocalSite/Site": "LCG.Example.com",
78+
"/Resources/Computing/CEDefaults/MaxCPUTime": 86400,
79+
}
80+
81+
def mock_getValue(key, default=0):
82+
if key in config_values:
83+
return config_values[key]
84+
return default
85+
86+
mock_gConfig.getValue.side_effect = mock_getValue
87+
88+
with patch(
89+
"DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo",
90+
return_value=S_OK(
91+
{"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"}
92+
),
93+
):
94+
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)
95+
96+
# maxCPUTime from queue returned 0, so falls through to CEDefaults/MaxCPUTime
97+
assert result == 86400
98+
99+
def test_nothing_available_returns_zero(self, mock_gConfig, mock_TimeLeft):
100+
"""Fail safe: no batch info, no CS config, returns 0."""
101+
mock_TimeLeft.return_value.getTimeLeft.return_value = S_ERROR("No batch info")
102+
103+
config_values = {
104+
"/LocalSite/GridCE": "ce.example.com",
105+
"/LocalSite/CEQueue": "default",
106+
"/LocalSite/Site": "LCG.Example.com",
107+
}
108+
109+
def mock_getValue(key, default=0):
110+
if key in config_values:
111+
return config_values[key]
112+
return default
113+
114+
mock_gConfig.getValue.side_effect = mock_getValue
115+
116+
with patch(
117+
"DIRAC.WorkloadManagementSystem.Client.CPUNormalization.getQueueInfo",
118+
return_value=S_OK(
119+
{"QueueCSSection": "/Resources/Sites/LCG/LCG.Example.com/CEs/ce.example.com/Queues/default"}
120+
),
121+
):
122+
result = self._import_getCPUTime()(cpuNormalizationFactor=10.0)
123+
124+
assert result == 0
125+
126+
def test_cpu_normalization_factor_from_config(self, mock_gConfig, mock_TimeLeft):
127+
"""When cpuNormalizationFactor=0, it should be read from local config."""
128+
mock_TimeLeft.return_value.getTimeLeft.return_value = S_OK(50000) # HS06*s
129+
130+
mock_gConfig.getValue.side_effect = lambda key, default=0: {
131+
"/LocalSite/CPUNormalizationFactor": 5.0,
132+
}.get(key, default)
133+
134+
result = self._import_getCPUTime()(cpuNormalizationFactor=0)
135+
136+
# 50000 / 5.0 = 10000 seconds
137+
assert result == 10000

0 commit comments

Comments
 (0)