Skip to content

Commit dd6748b

Browse files
committed
fix(bay): prevent duplicate startup runs and GC deletes
Delay the first background scheduler cycle when startup execution has already run so GC and browser learning do not execute twice immediately. Also skip orphan cargo deletion when a managed runtime still references the cargo, avoiding premature cleanup during runtime teardown. Add regression tests for scheduler startup timing, orphan cargo protection, and warm pool reconciliation with a live runtime.
1 parent 4efda16 commit dd6748b

File tree

7 files changed

+220
-12
lines changed

7 files changed

+220
-12
lines changed

pkgs/bay/app/services/gc/scheduler.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -182,15 +182,27 @@ async def stop(self) -> None:
182182
self._log.info("gc.scheduler.stopped")
183183

184184
async def _background_loop(self) -> None:
185-
"""Internal background loop."""
185+
"""Internal background loop.
186+
187+
Note:
188+
- If run_on_startup is enabled, lifecycle already executed one cycle.
189+
Sleep before the first loop cycle to avoid immediate duplicate execution.
190+
"""
191+
first_iteration = True
192+
186193
while self._running:
194+
should_sleep = (first_iteration and self._config.run_on_startup) or (
195+
not first_iteration
196+
)
197+
if should_sleep:
198+
try:
199+
await asyncio.sleep(self._config.interval_seconds)
200+
except asyncio.CancelledError:
201+
break
202+
203+
first_iteration = False
204+
187205
try:
188206
await self.run_once()
189207
except Exception as e:
190208
self._log.exception("gc.scheduler.cycle_error", error=str(e))
191-
192-
# Sleep until next cycle (or until cancelled)
193-
try:
194-
await asyncio.sleep(self._config.interval_seconds)
195-
except asyncio.CancelledError:
196-
break

pkgs/bay/app/services/gc/tasks/orphan_cargo.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ async def run(self) -> GCResult:
6363

6464
for cargo_id in orphans:
6565
try:
66+
if await self._has_runtime_references(cargo_id):
67+
result.skipped_count += 1
68+
self._log.info(
69+
"gc.orphan_cargo.skip.runtime_in_use",
70+
cargo_id=cargo_id,
71+
)
72+
continue
73+
6674
await self._cargo_mgr.delete_internal_by_id(cargo_id)
6775
result.cleaned_count += 1
6876
self._log.info(
@@ -79,6 +87,20 @@ async def run(self) -> GCResult:
7987

8088
return result
8189

90+
async def _has_runtime_references(self, cargo_id: str) -> bool:
91+
"""Check whether any runtime instance still references the cargo.
92+
93+
Conservative by design: if any runtime instance still carries the cargo label,
94+
skip deletion for this GC cycle and let a later cycle retry after runtime cleanup.
95+
"""
96+
instances = await self._driver.list_runtime_instances(
97+
labels={
98+
"bay.cargo_id": cargo_id,
99+
"bay.managed": "true",
100+
}
101+
)
102+
return len(instances) > 0
103+
82104
async def _find_orphans(self) -> list[str]:
83105
"""Find orphan managed cargo IDs."""
84106
orphan_ids: list[str] = []

pkgs/bay/app/services/skills/scheduler.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -466,12 +466,27 @@ async def stop(self) -> None:
466466
self._log.info("skills.browser.scheduler.stopped")
467467

468468
async def _background_loop(self) -> None:
469+
"""Internal background loop.
470+
471+
Note:
472+
- If run_on_startup is enabled, lifecycle already executed one cycle.
473+
Sleep before the first loop cycle to avoid immediate duplicate execution.
474+
"""
475+
first_iteration = True
476+
469477
while self._running:
478+
should_sleep = (first_iteration and self._config.run_on_startup) or (
479+
not first_iteration
480+
)
481+
if should_sleep:
482+
try:
483+
await asyncio.sleep(self._config.interval_seconds)
484+
except asyncio.CancelledError:
485+
break
486+
487+
first_iteration = False
488+
470489
try:
471490
await self.run_once()
472491
except Exception as exc:
473492
self._log.exception("skills.browser.scheduler.cycle_failed", error=str(exc))
474-
try:
475-
await asyncio.sleep(self._config.interval_seconds)
476-
except asyncio.CancelledError:
477-
break

pkgs/bay/tests/unit/gc/test_gc_scheduler.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,31 @@ async def test_stop_idempotent(self, gc_config):
213213

214214
assert not scheduler.is_running
215215

216+
@pytest.mark.asyncio
217+
async def test_background_loop_sleeps_first_when_startup_cycle_already_ran(self, gc_config):
218+
"""When run_on_startup is enabled, background loop should not immediately run again."""
219+
task = FakeGCTask("task1")
220+
gc_config.run_on_startup = True
221+
gc_config.interval_seconds = 0.2
222+
223+
scheduler = GCScheduler(
224+
tasks=[task],
225+
config=gc_config,
226+
)
227+
228+
# Simulate lifecycle startup run happening before background loop starts.
229+
await scheduler.run_once()
230+
assert task.run_count == 1
231+
232+
await scheduler.start()
233+
await asyncio.sleep(0.05)
234+
235+
# Background loop should still be in initial sleep window.
236+
assert task.run_count == 1
237+
238+
await scheduler.stop()
239+
assert not scheduler.is_running
240+
216241

217242
class TestNoopCoordinator:
218243
"""Tests for NoopCoordinator."""

pkgs/bay/tests/unit/gc/test_gc_tasks.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,11 +413,12 @@ class TestOrphanCargoGC:
413413

414414
@pytest.mark.asyncio
415415
async def test_finds_orphan_cargos(self):
416-
"""Should find managed cargos without a valid sandbox."""
416+
"""Should delete orphan cargos when no runtime still references them."""
417417
from app.services.gc.tasks.orphan_cargo import OrphanCargoGC
418418
from tests.fakes import FakeDriver
419419

420420
driver = FakeDriver()
421+
driver.list_runtime_instances = AsyncMock(return_value=[])
421422
db_session = AsyncMock()
422423

423424
# Mock cargo manager
@@ -429,14 +430,49 @@ async def test_finds_orphan_cargos(self):
429430
result = await task.run()
430431

431432
assert result.cleaned_count == 2
433+
assert result.skipped_count == 0
432434
assert task._cargo_mgr.delete_internal_by_id.call_count == 2
433435

436+
@pytest.mark.asyncio
437+
async def test_orphan_cargo_skips_when_runtime_still_references_volume(self):
438+
"""Should skip deleting orphan cargo when runtime instances still reference it."""
439+
from app.services.gc.tasks.orphan_cargo import OrphanCargoGC
440+
from tests.fakes import FakeDriver
441+
442+
driver = FakeDriver()
443+
driver.list_runtime_instances = AsyncMock(
444+
return_value=[
445+
RuntimeInstance(
446+
id="container-1",
447+
name="bay-session-sess-1",
448+
labels={
449+
"bay.cargo_id": "ws-orphan-1",
450+
"bay.managed": "true",
451+
},
452+
state="running",
453+
)
454+
]
455+
)
456+
db_session = AsyncMock()
457+
458+
task = OrphanCargoGC(driver, db_session)
459+
task._find_orphans = AsyncMock(return_value=["ws-orphan-1"])
460+
task._cargo_mgr = MagicMock()
461+
task._cargo_mgr.delete_internal_by_id = AsyncMock()
462+
463+
result = await task.run()
464+
465+
assert result.cleaned_count == 0
466+
assert result.skipped_count == 1
467+
task._cargo_mgr.delete_internal_by_id.assert_not_called()
468+
434469
@pytest.mark.asyncio
435470
async def test_orphan_cargo_no_orphans(self):
436471
"""Should handle case when no orphan cargos are found."""
437472
from tests.fakes import FakeDriver
438473

439474
driver = FakeDriver()
475+
driver.list_runtime_instances = AsyncMock(return_value=[])
440476
db_session = AsyncMock()
441477

442478
from app.services.gc.tasks.orphan_cargo import OrphanCargoGC

pkgs/bay/tests/unit/managers/test_browser_learning_scheduler.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -602,3 +602,34 @@ async def fake_background_loop() -> None:
602602

603603
# stop again should be noop
604604
await scheduler.stop()
605+
606+
607+
@pytest.mark.asyncio
608+
async def test_scheduler_background_loop_sleeps_first_when_startup_cycle_already_ran(
609+
monkeypatch: pytest.MonkeyPatch,
610+
learning_config: BrowserLearningConfig,
611+
):
612+
config = learning_config.model_copy(update={"run_on_startup": True, "interval_seconds": 0.2})
613+
scheduler = scheduler_module.BrowserLearningScheduler(config=config)
614+
615+
run_once_calls = 0
616+
617+
async def fake_run_once():
618+
nonlocal run_once_calls
619+
run_once_calls += 1
620+
return scheduler_module.BrowserLearningCycleResult()
621+
622+
monkeypatch.setattr(scheduler, "run_once", fake_run_once)
623+
624+
# Simulate lifecycle startup run happening before background loop starts.
625+
await scheduler.run_once()
626+
assert run_once_calls == 1
627+
628+
await scheduler.start()
629+
await asyncio.sleep(0.05)
630+
631+
# Background loop should still be in its initial sleep window.
632+
assert run_once_calls == 1
633+
634+
await scheduler.stop()
635+
assert scheduler.is_running is False

pkgs/bay/tests/unit/warm_pool/test_warm_pool_manager.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,3 +448,70 @@ async def __aexit__(self, exc_type, exc, tb):
448448
updated_sandbox = sandbox_result.scalars().first()
449449
assert updated_sandbox is not None
450450
assert updated_sandbox.warm_state == WarmState.AVAILABLE.value
451+
452+
@pytest.mark.asyncio
453+
async def test_reconcile_keeps_available_sandbox_when_single_runtime_alive(
454+
self,
455+
db_session,
456+
driver,
457+
monkeypatch,
458+
):
459+
sandbox_mgr = SandboxManager(driver=driver, db_session=db_session)
460+
sandbox = await sandbox_mgr.create_warm_sandbox(profile_id="python-default")
461+
await sandbox_mgr.mark_warm_available(sandbox.id)
462+
463+
session = Session(
464+
id="sess-warm-single",
465+
sandbox_id=sandbox.id,
466+
profile_id="python-default",
467+
container_id="live-container",
468+
endpoint="http://live-runtime",
469+
observed_state=SessionStatus.RUNNING,
470+
desired_state=SessionStatus.RUNNING,
471+
)
472+
db_session.add(session)
473+
sandbox.current_session_id = session.id
474+
await db_session.commit()
475+
476+
queue = _FakeWarmupQueue()
477+
scheduler = WarmPoolScheduler(
478+
config=SimpleNamespace(interval_seconds=60, run_on_startup=False),
479+
warmup_queue=queue,
480+
)
481+
482+
driver.set_status_override(
483+
"live-container",
484+
ContainerInfo(
485+
container_id="live-container",
486+
status=ContainerStatus.RUNNING,
487+
endpoint="http://live-runtime",
488+
),
489+
)
490+
491+
class _SessionFactory:
492+
async def __aenter__(self):
493+
return db_session
494+
495+
async def __aexit__(self, exc_type, exc, tb):
496+
return False
497+
498+
monkeypatch.setattr(
499+
"app.db.session.get_async_session",
500+
lambda: _SessionFactory(),
501+
)
502+
monkeypatch.setattr("app.api.dependencies.get_driver", lambda: driver)
503+
504+
reconciled = await scheduler._reconcile_profile_runtime_state("python-default")
505+
assert reconciled == 0
506+
assert queue.enqueued == []
507+
508+
sandbox_result = await db_session.execute(select(Sandbox).where(Sandbox.id == sandbox.id))
509+
updated_sandbox = sandbox_result.scalars().first()
510+
assert updated_sandbox is not None
511+
assert updated_sandbox.warm_state == WarmState.AVAILABLE.value
512+
513+
session_result = await db_session.execute(select(Session).where(Session.id == session.id))
514+
updated_session = session_result.scalars().first()
515+
assert updated_session is not None
516+
assert updated_session.observed_state == SessionStatus.RUNNING
517+
assert updated_session.endpoint == "http://live-runtime"

0 commit comments

Comments
 (0)