Skip to content

Commit e4de33b

Browse files
committed
feat: removed JobDB JobParameters table
1 parent c379267 commit e4de33b

File tree

5 files changed

+11
-101
lines changed

5 files changed

+11
-101
lines changed

docs/source/DeveloperGuide/Systems/WorkloadManagement/index.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,5 +100,5 @@ It is based on layered architecture and is based on DIRAC architecture:
100100
SandboxMetadataDB class is a front-end to the metadata for sandboxes.
101101

102102
* JobParametersDB
103-
JobParametersDB class is a front-end to the Elastic/OpenSearch based index providing Job Parameters.
104-
It is used in most of the WMS components and is based on Elastic/OpenSearch.
103+
JobParametersDB class is a front-end to the OpenSearch based index providing Job Parameters.
104+
It is used in most of the WMS components and is based on OpenSearch.

src/DIRAC/WorkloadManagementSystem/Agent/JobCleaningAgent.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
""" The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
1+
"""The Job Cleaning Agent controls removing jobs from the WMS in the end of their life cycle.
22
33
This agent will take care of:
44
- removing all jobs that are in status JobStatus.DELETED
@@ -22,6 +22,7 @@
2222
than 0.
2323
2424
"""
25+
2526
import datetime
2627
import os
2728

@@ -40,7 +41,7 @@
4041
from DIRAC.WorkloadManagementSystem.DB.SandboxMetadataDB import SandboxMetadataDB
4142
from DIRAC.WorkloadManagementSystem.DB.StatusUtils import kill_delete_jobs
4243
from DIRAC.WorkloadManagementSystem.Service.JobPolicy import RIGHT_DELETE
43-
from DIRAC.WorkloadManagementSystem.Utilities.JobParameters import getJobParameters
44+
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
4445

4546

4647
class JobCleaningAgent(AgentModule):
@@ -298,8 +299,8 @@ def deleteJobOversizedSandbox(self, jobIDList):
298299
failed = {}
299300
successful = {}
300301

301-
jobIDs = [int(jobID) for jobID in jobIDList]
302-
result = getJobParameters(jobIDs, "OutputSandboxLFN")
302+
jobIDList = [int(jobID) for jobID in jobIDList]
303+
result = JobParametersDB().getJobParameters(jobIDList, ["OutputSandboxLFN"])
303304
if not result["OK"]:
304305
return result
305306
osLFNDict = result["Value"]

src/DIRAC/WorkloadManagementSystem/DB/JobDB.py

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -110,56 +110,6 @@ def getDistinctJobAttributes(self, attribute, condDict=None, older=None, newer=N
110110
"Jobs", attribute, condDict=condDict, older=older, newer=newer, timeStamp=timeStamp
111111
)
112112

113-
#############################################################################
114-
def getJobParameters(self, jobID, paramList=None):
115-
"""Get Job Parameters defined for jobID.
116-
Returns a dictionary with the Job Parameters.
117-
If parameterList is empty - all the parameters are returned.
118-
"""
119-
jobIDList = [jobID] if isinstance(jobID, (str, int)) else jobID
120-
121-
resultDict = {}
122-
if paramList:
123-
if isinstance(paramList, str):
124-
paramList = paramList.split(",")
125-
paramNameList = []
126-
for pn in paramList:
127-
ret = self._escapeString(pn)
128-
if not ret["OK"]:
129-
return ret
130-
paramNameList.append(ret["Value"])
131-
cmd = "SELECT JobID, Name, Value FROM JobParameters WHERE JobID IN ({}) AND Name IN ({})".format(
132-
",".join(str(int(j)) for j in jobIDList),
133-
",".join(paramNameList),
134-
)
135-
result = self._query(cmd)
136-
if result["OK"]:
137-
if result["Value"]:
138-
for res_jobID, res_name, res_value in result["Value"]:
139-
try:
140-
res_value = res_value.decode(errors="replace") # account for use of BLOBs
141-
except AttributeError:
142-
pass
143-
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value
144-
145-
return S_OK(resultDict) # there's a slim chance that this is an empty dictionary
146-
else:
147-
return S_ERROR("JobDB.getJobParameters: failed to retrieve parameters")
148-
149-
else:
150-
result = self.getFields("JobParameters", ["JobID", "Name", "Value"], {"JobID": jobID})
151-
if not result["OK"]:
152-
return result
153-
154-
for res_jobID, res_name, res_value in result["Value"]:
155-
try:
156-
res_value = res_value.decode(errors="replace") # account for use of BLOBs
157-
except AttributeError:
158-
pass
159-
resultDict.setdefault(int(res_jobID), {})[res_name] = res_value
160-
161-
return S_OK(resultDict) # there's a slim chance that this is an empty dictionary
162-
163113
#############################################################################
164114
def getAtticJobParameters(self, jobID, paramList=None, rescheduleCounter=-1):
165115
"""Get Attic Job Parameters defined for a job with jobID.
@@ -274,16 +224,6 @@ def getJobAttribute(self, jobID, attribute):
274224
return result
275225
return S_OK(result["Value"].get(attribute))
276226

277-
#############################################################################
278-
@deprecated("Use JobParametersDB instead")
279-
def getJobParameter(self, jobID, parameter):
280-
"""Get the given parameter of a job specified by its jobID"""
281-
282-
result = self.getJobParameters(jobID, [parameter])
283-
if not result["OK"]:
284-
return result
285-
return S_OK(result.get("Value", {}).get(int(jobID), {}).get(parameter))
286-
287227
#############################################################################
288228
def getJobOptParameter(self, jobID, parameter):
289229
"""Get optimizer parameters for the given job."""
@@ -1023,7 +963,6 @@ def removeJobFromDB(self, jobIDs):
1023963

1024964
for table in [
1025965
"InputData",
1026-
"JobParameters",
1027966
"AtticJobParameters",
1028967
"HeartBeatLoggingInfo",
1029968
"OptimizerParameters",
@@ -1101,10 +1040,6 @@ def rescheduleJob(self, jobID):
11011040
return ret
11021041
e_jobID = ret["Value"]
11031042

1104-
res = self._update(f"DELETE FROM JobParameters WHERE JobID={e_jobID}")
1105-
if not res["OK"]:
1106-
return res
1107-
11081043
# Delete optimizer parameters
11091044
if not self._update(f"DELETE FROM OptimizerParameters WHERE JobID={e_jobID}")["OK"]:
11101045
return S_ERROR("JobDB.removeJobOptParameter: operation failed.")

src/DIRAC/WorkloadManagementSystem/DB/JobDB.sql

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,16 +79,6 @@ CREATE TABLE `InputData` (
7979
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
8080
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
8181

82-
-- ------------------------------------------------------------------------------
83-
DROP TABLE IF EXISTS `JobParameters`;
84-
CREATE TABLE `JobParameters` (
85-
`JobID` INT(11) UNSIGNED NOT NULL,
86-
`Name` VARCHAR(100) NOT NULL,
87-
`Value` TEXT NOT NULL,
88-
PRIMARY KEY (`JobID`,`Name`),
89-
FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`)
90-
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
91-
9282
-- ------------------------------------------------------------------------------
9383
DROP TABLE IF EXISTS `OptimizerParameters`;
9484
CREATE TABLE `OptimizerParameters` (

src/DIRAC/WorkloadManagementSystem/Utilities/JobParameters.py

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -154,19 +154,19 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
154154
:rtype: dict
155155
"""
156156
from DIRAC.WorkloadManagementSystem.DB.JobParametersDB import JobParametersDB
157-
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
158157

159158
elasticJobParametersDB = JobParametersDB()
160-
jobDB = JobDB()
161159

162160
if vo: # a user is connecting, with a proxy
163161
res = elasticJobParametersDB.getJobParameters(jobIDs, vo, parName)
164162
if not res["OK"]:
165163
return res
166164
parameters = res["Value"]
167165
else: # a service is connecting, no proxy, e.g. StalledJobAgent
166+
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
167+
168168
q = f"SELECT JobID, VO FROM Jobs WHERE JobID IN ({','.join([str(jobID) for jobID in jobIDs])})"
169-
res = jobDB._query(q)
169+
res = JobDB()._query(q)
170170
if not res["OK"]:
171171
return res
172172
if not res["Value"]:
@@ -184,23 +184,7 @@ def getJobParameters(jobIDs: list[int], parName: str | None, vo: str = "") -> di
184184
if not res["OK"]:
185185
return res
186186
parameters.update(res["Value"])
187-
188-
# Need anyway to get also from JobDB, for those jobs with parameters registered in MySQL or in both backends
189-
res = jobDB.getJobParameters(jobIDs, parName)
190-
if not res["OK"]:
191-
return res
192-
parametersM = res["Value"]
193-
194-
# and now combine
195-
final = dict(parametersM)
196-
# if job in JobDB, update with parameters from ES if any
197-
for jobID in final:
198-
final[jobID].update(parameters.get(jobID, {}))
199-
# if job in ES and not in JobDB, take ES
200-
for jobID in parameters:
201-
if jobID not in final:
202-
final[jobID] = parameters[jobID]
203-
return S_OK(final)
187+
return S_OK(parameters)
204188

205189

206190
def getAvailableRAM(siteName=None, gridCE=None, queue=None):

0 commit comments

Comments
 (0)