Skip to content

Commit 6bad11a

Browse files
author
Bo Peng
committed
Fix slot manager #884
1 parent c5efce6 commit 6bad11a

File tree

3 files changed

+18
-14
lines changed

3 files changed

+18
-14
lines changed

src/sos/step_executor.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,11 +1256,13 @@ def run(self):
12561256
else:
12571257
sm = SlotManager()
12581258
# because the master process pool will count one worker in (step)
1259-
requested = sm.acquire(len(self._groups) - 1, env.config.get('max_procs', max(int(os.cpu_count() / 2), 1)))
1260-
if requested == 0:
1259+
gotten = sm.acquire(len(self._groups) - 1, env.config.get('max_procs', max(int(os.cpu_count() / 2), 1)))
1260+
if gotten == 0:
1261+
env.logger.debug(f'Input group executed sequencially due to -j constraint')
12611262
self.concurrent_input_group = False
12621263
else:
1263-
self.worker_pool = Pool(requested + 1)
1264+
env.logger.debug(f'Using process pool with size {gotten+1}')
1265+
self.worker_pool = Pool(gotten + 1)
12641266

12651267
try:
12661268
for idx, (g, v) in enumerate(zip(self._groups, self._vars)):

src/sos/utils.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -917,14 +917,16 @@ def sos_handle_parameter_(key, defvalue):
917917
# return True
918918

919919
class SlotManager(object):
920-
def __init__(self):
920+
def __init__(self, reset=False):
921921
manager_id = env.config.get('master_md5', 'general')
922922
self.lock_file = os.path.join(os.path.expanduser('~'), '.sos', f'manager_{manager_id}.lck')
923923
self.slot_file = os.path.join(os.path.expanduser('~'), '.sos', f'manager_{manager_id}.slot')
924+
if reset and os.path.isfile(self.slot_file):
925+
os.remove(self.slot_file)
924926

925927
def acquire(self, num=None, max_slots=10, force=False):
926928
# if num == None, request as many as possible slots
927-
if not num:
929+
if num is None:
928930
num = max_slots
929931
with fasteners.InterProcessLock(self.lock_file) as lock:
930932
if not os.path.isfile(self.slot_file):
@@ -934,14 +936,14 @@ def acquire(self, num=None, max_slots=10, force=False):
934936
with open(self.slot_file, 'r') as slot:
935937
slots = int(slot.read())
936938
except Exception as e:
937-
print(e)
939+
env.logger.error(e)
938940
slots = 0
939941
# return all available slots
940942
avail = max_slots - slots
941-
ret = num if force else min(num, avail)
943+
ret = num if force else max(min(num, avail), 0)
942944
with open(self.slot_file, 'w') as slot:
943945
slot.write(str(ret + slots))
944-
env.logger.debug(f'{ret} slots requested from {avail} avail')
946+
env.logger.debug(f'{num} slots requested {ret} returned ({slots} active, force={force})')
945947
return ret
946948

947949
def release(self, num):
@@ -951,7 +953,7 @@ def release(self, num):
951953
# return all available slots
952954
with open(self.slot_file, 'w') as slot:
953955
slot.write(str(max(0, slots - num)))
954-
env.logger.debug(f'{num} slots released')
956+
env.logger.debug(f'{num} slots released from {slots} active, {slots - num} remain')
955957
return max(0, slots - num)
956958

957959

src/sos/workflow_executor.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ def is_pending(self):
238238

239239
class ExecutionManager(object):
240240
# this class managers workers and their status ...
241-
def __init__(self, max_workers):
241+
def __init__(self, max_workers, master=True):
242242
#
243243
# running processes. It consisists of
244244
#
@@ -255,7 +255,7 @@ def __init__(self, max_workers):
255255
# process pool that is used to pool temporarily unused processed.
256256
self.pool = []
257257

258-
self.slot_manager = SlotManager()
258+
self.slot_manager = SlotManager(reset=master)
259259
self.last_num_procs = None
260260

261261
self.max_workers = max_workers
@@ -281,8 +281,8 @@ def add_placeholder_worker(self, runnable, pipe):
281281
def all_busy(self):
282282
n = len([x for x in self.procs if x and not x.is_pending()])
283283
if self.last_num_procs is None:
284-
# we force the increase of numbers because `n` is observed
285-
self.slot_manager.acquire(n, self.max_workers, force=True)
284+
# clear counter file if already exists
285+
self.slot_manager.acquire(n, self.max_workers)
286286
self.last_num_procs = n
287287
elif n != self.last_num_procs:
288288
if self.last_num_procs > n:
@@ -832,7 +832,7 @@ def i_am():
832832
# node: node that is being executed, which is a dummy node
833833
# created on the fly for steps passed from nested workflow
834834
#
835-
manager = ExecutionManager(env.config['max_procs'])
835+
manager = ExecutionManager(env.config['max_procs'], master=not nested)
836836
#
837837
wf_result = {'__workflow_id__': my_workflow_id, 'shared': {}}
838838
#

0 commit comments

Comments
 (0)