Skip to content

Commit 09e3e3c

Browse files
jeffcrouseclaude
andcommitted
fix: prevent concurrent scans with Redis lock, add analysis start endpoint
- Add POST /api/v1/library/analysis/start endpoint to manually trigger analysis - Use Redis-based locking to prevent concurrent scans across uvicorn workers - Trigger analysis after scan for relocated tracks (not just new/updated) - Check lock expiry and heartbeat staleness to avoid permanent locks 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 9b190e4 commit 09e3e3c

File tree

3 files changed

+86
-6
lines changed

3 files changed

+86
-6
lines changed

backend/app/api/routes/library.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,44 @@ async def get_analysis_status(db: DbSession) -> AnalysisStatus:
393393
)
394394

395395

396+
class AnalysisStartResponse(BaseModel):
397+
"""Response for starting analysis."""
398+
399+
status: str
400+
queued: int = 0
401+
message: str
402+
403+
404+
@router.post("/analysis/start", response_model=AnalysisStartResponse)
405+
async def start_analysis(limit: int = 500) -> AnalysisStartResponse:
406+
"""Manually trigger analysis for unanalyzed tracks.
407+
408+
This queues tracks for analysis in the background. Use GET /analysis/status
409+
to monitor progress.
410+
"""
411+
from app.services.tasks import queue_unanalyzed_tracks
412+
413+
try:
414+
queued = await queue_unanalyzed_tracks(limit=limit)
415+
if queued == 0:
416+
return AnalysisStartResponse(
417+
status="complete",
418+
queued=0,
419+
message="All tracks are already analyzed",
420+
)
421+
return AnalysisStartResponse(
422+
status="started",
423+
queued=queued,
424+
message=f"Queued {queued} tracks for analysis",
425+
)
426+
except Exception as e:
427+
return AnalysisStartResponse(
428+
status="error",
429+
queued=0,
430+
message=f"Failed to start analysis: {e}",
431+
)
432+
433+
396434
# ============================================================================
397435
# Missing Tracks API
398436
# ============================================================================

backend/app/services/background.py

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,54 @@ async def shutdown(self) -> None:
120120
logger.info("BackgroundManager shutdown complete")
121121

122122
def is_scan_running(self) -> bool:
123-
"""Check if a library scan is currently running."""
123+
"""Check if a library scan is currently running (across all workers).
124+
125+
Uses Redis to coordinate across multiple uvicorn workers.
126+
"""
127+
# Check local task first
124128
if self._current_scan_task and not self._current_scan_task.done():
125129
return True
126130

127-
# Also check Redis for stale progress
131+
# Check Redis lock (shared across all workers)
128132
try:
133+
if self.redis.get("familiar:scan:lock"):
134+
return True
135+
136+
# Also check progress status
129137
data: bytes | None = self.redis.get("familiar:scan:progress") # type: ignore[assignment]
130138
if data:
131139
progress = json.loads(data)
132-
return progress.get("status") == "running"
140+
if progress.get("status") == "running":
141+
# Check if heartbeat is recent (within 2 minutes)
142+
heartbeat = progress.get("last_heartbeat")
143+
if heartbeat:
144+
from datetime import datetime, timedelta
145+
try:
146+
hb_time = datetime.fromisoformat(heartbeat)
147+
if datetime.now() - hb_time < timedelta(minutes=2):
148+
return True
149+
except (ValueError, TypeError):
150+
pass
133151
except Exception:
134152
pass
135153

136154
return False
137155

156+
def _acquire_scan_lock(self) -> bool:
157+
"""Try to acquire the scan lock in Redis. Returns True if acquired."""
158+
try:
159+
# SET NX (only if not exists) with 1 hour expiry
160+
return bool(self.redis.set("familiar:scan:lock", "1", nx=True, ex=3600))
161+
except Exception:
162+
return False
163+
164+
def _release_scan_lock(self) -> None:
165+
"""Release the scan lock in Redis."""
166+
try:
167+
self.redis.delete("familiar:scan:lock")
168+
except Exception:
169+
pass
170+
138171
async def run_cpu_bound(self, func: Callable, *args: Any) -> Any:
139172
"""Run CPU-bound function in process pool (spawned, not forked).
140173
@@ -148,11 +181,17 @@ async def run_scan(self, full_scan: bool = False) -> dict[str, Any]:
148181
"""Start a library scan in the background.
149182
150183
Returns immediately with status. Progress is reported via Redis.
184+
Uses Redis-based locking to prevent concurrent scans across workers.
151185
"""
152186
async with self._lock:
187+
# Check if already running (local or other worker)
153188
if self.is_scan_running():
154189
return {"status": "already_running"}
155190

191+
# Try to acquire Redis lock
192+
if not self._acquire_scan_lock():
193+
return {"status": "already_running"}
194+
156195
# Create task and store reference
157196
self._current_scan_task = asyncio.create_task(
158197
self._do_scan(full_scan)
@@ -178,6 +217,7 @@ async def _do_scan(self, full_scan: bool) -> dict[str, Any]:
178217
return {"status": "error", "error": str(e)}
179218
finally:
180219
self._current_scan_task = None
220+
self._release_scan_lock()
181221

182222
async def run_analysis(self, track_id: str) -> dict[str, Any]:
183223
"""Queue a track for analysis.

backend/app/services/tasks.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,11 @@ async def run_library_scan(full_scan: bool = False) -> dict[str, Any]:
355355
recovered=results.get("recovered", 0),
356356
)
357357

358-
# Queue analysis for new/updated tracks
359-
if results["new"] > 0 or results["updated"] > 0:
360-
queued_count = await queue_unanalyzed_tracks(limit=results["new"] + results["updated"])
358+
# Queue analysis for unanalyzed tracks (new, updated, or relocated)
359+
# Also catches any tracks that were missed in previous scans
360+
unanalyzed_count = results["new"] + results["updated"] + results.get("relocated", 0)
361+
if unanalyzed_count > 0:
362+
queued_count = await queue_unanalyzed_tracks(limit=max(unanalyzed_count, 500))
361363
logger.info(f"Queued {queued_count} tracks for analysis")
362364

363365
logger.info(f"Scan complete: {results}")

0 commit comments

Comments
 (0)