Skip to content

Commit 886ee8f

Browse files
committed
[IMP] queue_job: refactor job acquisition
In this commit we cleanly separate the job acquisition (i.e. verifying the job is in the exepected state, marking it started and locking it) from job execution. We also avoid trying to start the job if it is already locked by using SKIP LOCKED and exiting early. Indeed in such situations the job is likely already being handled by another worker so there is no point trying to start it, so we exit early and let it be handled either by the other worker or the dead job requeuer.
1 parent 6792545 commit 886ee8f

File tree

2 files changed

+45
-31
lines changed

2 files changed

+45
-31
lines changed

queue_job/controllers/main.py

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,47 @@
2727

2828

2929
class RunJobController(http.Controller):
30-
def _try_perform_job(self, env, job):
31-
"""Try to perform the job."""
30+
@staticmethod
31+
def _acquire_job(env: api.Environment, job_uuid: str) -> Job | None:
32+
"""Acquire a job for execution.
33+
34+
- make sure it is in ENQUEUED state
35+
- mark it as STARTED and commit the state change
36+
- acquire the job lock
37+
38+
If successful, return the Job instance, otherwise return None. This
39+
function may fail to acquire the job is not in the expected state or is
40+
already locked by another worker.
41+
"""
42+
env.cr.execute(
43+
"SELECT 1 FROM queue_job WHERE uuid=%s AND state=%s "
44+
"FOR UPDATE SKIP LOCKED",
45+
(job_uuid, ENQUEUED),
46+
)
47+
if not env.cr.fetchone():
48+
_logger.warning(
49+
"was requested to run job %s, but it does not exist, "
50+
"or is not in state %s, or is being handled by another worker",
51+
job_uuid,
52+
ENQUEUED,
53+
)
54+
return None
55+
job = Job.load(env, job_uuid)
56+
assert job and job.state == ENQUEUED
3257
job.set_started()
3358
job.store()
3459
env.cr.commit()
35-
job.lock()
60+
if not job.lock():
61+
_logger.warning(
62+
"was requested to run job %s, but it could not be locked",
63+
job_uuid,
64+
)
65+
return None
66+
return job
3667

68+
def _try_perform_job(self, env, job):
69+
"""Try to perform the job, mark it done and commit if successful."""
3770
_logger.debug("%s started", job)
38-
3971
job.perform()
4072
# Triggers any stored computed fields before calling 'set_done'
4173
# so that will be part of the 'exec_time'
@@ -94,23 +126,10 @@ def retry_postpone(job, message, seconds=None):
94126
job.set_pending(reset_retry=False)
95127
job.store()
96128

97-
# ensure the job to run is in the correct state and lock the record
98-
env.cr.execute(
99-
"SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE",
100-
(job_uuid, ENQUEUED),
101-
)
102-
if not env.cr.fetchone():
103-
_logger.warning(
104-
"was requested to run job %s, but it does not exist, "
105-
"or is not in state %s",
106-
job_uuid,
107-
ENQUEUED,
108-
)
129+
job = self._acquire_job(env, job_uuid)
130+
if not job:
109131
return ""
110132

111-
job = Job.load(env, job_uuid)
112-
assert job and job.state == ENQUEUED
113-
114133
try:
115134
try:
116135
self._try_perform_job(env, job)

queue_job/job.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ def load_many(cls, env, job_uuids):
221221
recordset = cls.db_records_from_uuids(env, job_uuids)
222222
return {cls._load_from_db_record(record) for record in recordset}
223223

224-
def add_lock_record(self):
224+
def add_lock_record(self) -> None:
225225
"""
226226
Create row in db to be locked while the job is being performed.
227227
"""
@@ -241,13 +241,11 @@ def add_lock_record(self):
241241
[self.uuid],
242242
)
243243

244-
def lock(self):
245-
"""
246-
Lock row of job that is being performed
244+
def lock(self) -> bool:
245+
"""Lock row of job that is being performed.
247246
248-
If a job cannot be locked,
249-
it means that the job wasn't started,
250-
a RetryableJobError is thrown.
247+
Return False if a job cannot be locked: it means that the job is not in
248+
STARTED state or is already locked by another worker.
251249
"""
252250
self.env.cr.execute(
253251
"""
@@ -265,16 +263,13 @@ def lock(self):
265263
uuid = %s
266264
AND state='started'
267265
)
268-
FOR UPDATE;
266+
FOR UPDATE SKIP LOCKED;
269267
""",
270268
[self.uuid],
271269
)
272270

273271
# 1 job should be locked
274-
if 1 != len(self.env.cr.fetchall()):
275-
raise RetryableJobError(
276-
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
277-
)
272+
return bool(self.env.cr.fetchall())
278273

279274
@classmethod
280275
def _load_from_db_record(cls, job_db_record):

0 commit comments

Comments
 (0)