Skip to content

Commit a5eeb9a

Browse files
committed
Sync OSS release
1 parent 437abf5 commit a5eeb9a

File tree

4 files changed

+157
-32
lines changed

4 files changed

+157
-32
lines changed

crates/runtime/src/api/server.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -353,24 +353,21 @@ impl HttpApiServer {
353353
.layer(middleware::from_fn(auth_middleware))
354354
.with_state(provider.clone());
355355

356-
// Protected routes (workflows + metrics) with authentication
356+
// Protected routes (workflows + metrics + scheduler health) with authentication.
357+
// Note: /api/v1/health (basic) remains public for load-balancer probes;
358+
// /api/v1/health/scheduler exposes job counts and run stats so it requires auth.
357359
let protected_router = Router::new()
358360
.route("/api/v1/workflows/execute", post(execute_workflow))
359361
.route("/api/v1/metrics", get(get_metrics))
360-
.layer(middleware::from_fn(auth_middleware))
361-
.with_state(provider.clone());
362-
363-
// Health routes — no auth so load-balancer probes work without credentials
364-
let health_router = Router::new()
365362
.route("/api/v1/health/scheduler", get(get_scheduler_health))
363+
.layer(middleware::from_fn(auth_middleware))
366364
.with_state(provider.clone());
367365

368366
router = router
369367
.merge(agent_router)
370368
.merge(schedule_router)
371369
.merge(channel_router)
372-
.merge(protected_router)
373-
.merge(health_router);
370+
.merge(protected_router);
374371
}
375372

376373
// Conditionally serve AGENTS.md at well-known paths (auth-gated, no provider state needed)

crates/runtime/src/lib.rs

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,10 @@ impl RuntimeApiProvider for AgentRuntime {
609609
}
610610

611611
async fn delete_agent(&self, agent_id: AgentId) -> Result<DeleteAgentResponse, RuntimeError> {
612-
// Placeholder implementation - validate input and return success
612+
self.scheduler
613+
.delete_agent(agent_id)
614+
.await
615+
.map_err(RuntimeError::Scheduler)?;
613616

614617
Ok(DeleteAgentResponse {
615618
id: agent_id.to_string(),
@@ -622,8 +625,24 @@ impl RuntimeApiProvider for AgentRuntime {
622625
agent_id: AgentId,
623626
request: ExecuteAgentRequest,
624627
) -> Result<ExecuteAgentResponse, RuntimeError> {
628+
// Ensure the agent exists in the registry
629+
if !self.scheduler.has_agent(agent_id) {
630+
return Err(RuntimeError::Internal(format!(
631+
"Agent {} not found",
632+
agent_id
633+
)));
634+
}
635+
636+
// Re-schedule from stored config if the agent isn't currently active
625637
let status = self.get_agent_status(agent_id).await?;
626-
if status.state != AgentState::Running {
638+
if status.state == AgentState::Completed {
639+
if let Some(config) = self.scheduler.get_agent_config(agent_id) {
640+
self.scheduler
641+
.schedule_agent(config)
642+
.await
643+
.map_err(RuntimeError::Scheduler)?;
644+
}
645+
} else if status.state != AgentState::Running {
627646
self.lifecycle
628647
.start_agent(agent_id)
629648
.await
@@ -654,7 +673,9 @@ impl RuntimeApiProvider for AgentRuntime {
654673
&self,
655674
_agent_id: AgentId,
656675
) -> Result<GetAgentHistoryResponse, RuntimeError> {
657-
// For now, return empty history as the model logger API is not yet implemented
676+
// TODO(T-59): Requires a persistent execution-log storage layer (SQLite
677+
// or in-memory ring buffer). The per-schedule history
678+
// (/schedules/{id}/history) already works via CronScheduler.
658679
let history = vec![];
659680
Ok(GetAgentHistoryResponse { history })
660681
}

crates/runtime/src/scheduler/mod.rs

Lines changed: 67 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ pub trait AgentScheduler {
9292
agent_id: AgentId,
9393
request: crate::api::types::UpdateAgentRequest,
9494
) -> Result<(), SchedulerError>;
95+
96+
/// Check whether an agent is registered (regardless of run state)
97+
fn has_agent(&self, agent_id: AgentId) -> bool;
98+
99+
/// Retrieve the stored config for a registered agent
100+
fn get_agent_config(&self, agent_id: AgentId) -> Option<AgentConfig>;
101+
102+
/// Remove an agent from the registry entirely
103+
async fn delete_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError>;
95104
}
96105

97106
/// Scheduler configuration
@@ -251,6 +260,10 @@ pub struct DefaultAgentScheduler {
251260
task_manager: Arc<TaskManager>,
252261
running_agents: Arc<DashMap<AgentId, ScheduledTask>>,
253262
suspended_agents: Arc<DashMap<AgentId, AgentSuspensionInfo>>,
263+
/// Persistent registry of all agents that have been scheduled. Agents
264+
/// remain here after being dequeued so that status/execute/list continue
265+
/// to work even after completion.
266+
registered_agents: Arc<DashMap<AgentId, AgentConfig>>,
254267
system_metrics: Arc<RwLock<SystemMetrics>>,
255268
shutdown_notify: Arc<Notify>,
256269
is_running: Arc<RwLock<bool>>,
@@ -274,6 +287,7 @@ impl DefaultAgentScheduler {
274287
let task_manager = Arc::new(TaskManager::new(config.task_timeout));
275288
let running_agents = Arc::new(DashMap::new());
276289
let suspended_agents = Arc::new(DashMap::new());
290+
let registered_agents = Arc::new(DashMap::new());
277291
let system_metrics = Arc::new(RwLock::new(SystemMetrics::new()));
278292
let shutdown_notify = Arc::new(Notify::new());
279293
let is_running = Arc::new(RwLock::new(true));
@@ -305,6 +319,7 @@ impl DefaultAgentScheduler {
305319
task_manager,
306320
running_agents,
307321
suspended_agents,
322+
registered_agents,
308323
system_metrics,
309324
shutdown_notify,
310325
is_running,
@@ -456,9 +471,12 @@ impl AgentScheduler for DefaultAgentScheduler {
456471
return Err(SchedulerError::ShuttingDown);
457472
}
458473

459-
let task = ScheduledTask::new(config);
474+
let task = ScheduledTask::new(config.clone());
460475
let agent_id = task.agent_id;
461476

477+
// Persist in the registry so the agent survives dequeue
478+
self.registered_agents.insert(agent_id, config);
479+
462480
// Add to priority queue
463481
self.priority_queue.write().push(task);
464482

@@ -503,13 +521,16 @@ impl AgentScheduler for DefaultAgentScheduler {
503521
reason: format!("Failed to terminate task: {}", e),
504522
})?;
505523

524+
self.registered_agents.remove(&agent_id);
506525
tracing::info!("Terminated agent {}", agent_id);
507526
return Ok(());
508527
}
509528

510529
// Remove from queue
511530
let mut queue = self.priority_queue.write();
512531
if queue.remove(&agent_id).is_some() {
532+
drop(queue);
533+
self.registered_agents.remove(&agent_id);
513534
tracing::info!("Removed agent {} from queue", agent_id);
514535
return Ok(());
515536
}
@@ -623,6 +644,17 @@ impl AgentScheduler for DefaultAgentScheduler {
623644
active_tasks: 0,
624645
scheduled_at: task.scheduled_at,
625646
})
647+
} else if self.registered_agents.contains_key(&agent_id) {
648+
// Agent was registered but already ran and was dequeued
649+
Ok(AgentStatus {
650+
agent_id,
651+
state: AgentState::Completed,
652+
last_activity: SystemTime::now(),
653+
memory_usage: 0,
654+
cpu_usage: 0.0,
655+
active_tasks: 0,
656+
scheduled_at: SystemTime::now(),
657+
})
626658
} else {
627659
// Agent not found anywhere
628660
Err(SchedulerError::AgentNotFound { agent_id })
@@ -776,21 +808,11 @@ impl AgentScheduler for DefaultAgentScheduler {
776808
}
777809

778810
async fn list_agents(&self) -> Vec<AgentId> {
779-
let mut agent_ids = Vec::new();
780-
781-
// Collect running agents
782-
for entry in self.running_agents.iter() {
783-
agent_ids.push(*entry.key());
784-
}
785-
786-
// Collect queued agents
787-
let queue = self.priority_queue.read();
788-
let queued_tasks = queue.to_vec();
789-
for task in queued_tasks {
790-
agent_ids.push(task.agent_id);
791-
}
792-
793-
agent_ids
811+
// Return all registered agents (running, queued, and completed)
812+
self.registered_agents
813+
.iter()
814+
.map(|entry| *entry.key())
815+
.collect()
794816
}
795817

796818
#[cfg(feature = "http-api")]
@@ -840,6 +862,35 @@ impl AgentScheduler for DefaultAgentScheduler {
840862

841863
Err(SchedulerError::AgentNotFound { agent_id })
842864
}
865+
866+
fn has_agent(&self, agent_id: AgentId) -> bool {
867+
self.registered_agents.contains_key(&agent_id)
868+
}
869+
870+
fn get_agent_config(&self, agent_id: AgentId) -> Option<AgentConfig> {
871+
self.registered_agents.get(&agent_id).map(|r| r.clone())
872+
}
873+
874+
async fn delete_agent(&self, agent_id: AgentId) -> Result<(), SchedulerError> {
875+
// Remove from running agents if present
876+
if let Some((_, _)) = self.running_agents.remove(&agent_id) {
877+
let _ = self.task_manager.terminate_task(agent_id).await;
878+
}
879+
880+
// Remove from queue if present
881+
{
882+
let mut queue = self.priority_queue.write();
883+
queue.remove(&agent_id);
884+
}
885+
886+
// Remove from registry
887+
if self.registered_agents.remove(&agent_id).is_some() {
888+
tracing::info!("Deleted agent {} from registry", agent_id);
889+
Ok(())
890+
} else {
891+
Err(SchedulerError::AgentNotFound { agent_id })
892+
}
893+
}
843894
}
844895

845896
impl DefaultAgentScheduler {

src/commands/up.rs

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,21 @@ pub async fn run(matches: &ArgMatches) {
143143
}
144144
};
145145

146-
let agent_id = if let Some(_agent) = agents_found.first() {
147-
// For now, just create a new agent ID
148-
// TODO: Implement proper agent creation when runtime API is stable
149-
AgentId::new()
146+
// Load agents from DSL files into the scheduler registry
147+
let first_agent_id = if let Some(ref rt) = runtime {
148+
let loaded = load_agents_into_registry(rt).await;
149+
if loaded.is_empty() {
150+
None
151+
} else {
152+
println!("✓ {} agent(s) loaded from agents/", loaded.len());
153+
Some(loaded[0])
154+
}
150155
} else {
151-
AgentId::new()
156+
None
152157
};
153158

159+
let agent_id = first_agent_id.unwrap_or_else(AgentId::new);
160+
154161
let http_port_num = match http_port.parse::<u16>() {
155162
Ok(p) => p,
156163
Err(e) => {
@@ -660,6 +667,55 @@ async fn load_dsl_schedules(cron: &symbi_runtime::CronScheduler) -> usize {
660667
count
661668
}
662669

670+
/// Scan DSL files in the agents directory, parse each one, create an
671+
/// `AgentConfig`, and register it with the runtime scheduler so that
672+
/// `/api/v1/agents` lists them and `/api/v1/agents/:id/execute` works.
673+
async fn load_agents_into_registry(runtime: &AgentRuntime) -> Vec<AgentId> {
674+
let agents_dir = Path::new("agents");
675+
if !agents_dir.exists() {
676+
return vec![];
677+
}
678+
679+
let mut ids = Vec::new();
680+
if let Ok(entries) = fs::read_dir(agents_dir) {
681+
for entry in entries.flatten() {
682+
if entry.path().extension().is_some_and(|ext| ext == "dsl") {
683+
if let Ok(source) = fs::read_to_string(entry.path()) {
684+
let name = entry
685+
.path()
686+
.file_stem()
687+
.map(|s| s.to_string_lossy().to_string())
688+
.unwrap_or_default();
689+
690+
let agent_config = symbi_runtime::types::AgentConfig {
691+
id: symbi_runtime::types::AgentId::new(),
692+
name: name.clone(),
693+
dsl_source: source,
694+
execution_mode: symbi_runtime::types::ExecutionMode::Ephemeral,
695+
security_tier: symbi_runtime::types::SecurityTier::Tier1,
696+
resource_limits: symbi_runtime::types::ResourceLimits::default(),
697+
capabilities: vec![symbi_runtime::types::Capability::Computation],
698+
policies: vec![],
699+
metadata: std::collections::HashMap::new(),
700+
priority: symbi_runtime::types::Priority::Normal,
701+
};
702+
703+
match runtime.scheduler.schedule_agent(agent_config).await {
704+
Ok(id) => {
705+
println!(" → {} [{}]", name, id);
706+
ids.push(id);
707+
}
708+
Err(e) => {
709+
eprintln!(" ⚠ Failed to register agent '{}': {}", name, e);
710+
}
711+
}
712+
}
713+
}
714+
}
715+
}
716+
ids
717+
}
718+
663719
fn scan_agents_directory() -> Vec<String> {
664720
let agents_dir = Path::new("agents");
665721
let mut agents = Vec::new();

0 commit comments

Comments
 (0)