Skip to content
This repository was archived by the owner on Jan 27, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ edition = "2021"
match_same_arms = "warn"
unused_async = "warn"
uninlined_format_args = "warn"
manual_let_else = "warn"

[workspace.lints.rust]
unreachable_pub = "warn"
manual_let_else = "warn"
11 changes: 6 additions & 5 deletions crates/orchestrator/src/api/routes/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ mod tests {
use std::sync::Arc;

use super::*;
use crate::plugins::StatusUpdatePlugin;
use crate::{
api::tests::helper::{create_test_app_state, create_test_app_state_with_nodegroups},
models::node::{NodeStatus, OrchestratorNode},
Expand Down Expand Up @@ -575,12 +574,14 @@ mod tests {
None,
None,
));
let _ = plugin.clone().register_observer().await;

let _ = plugin
.handle_status_change(&node, &NodeStatus::Healthy)
let _ = app_state
.store_context
.task_store
.add_observer(plugin.clone())
.await;

let _ = plugin.handle_status_change(&node).await;

let task = Task {
id: Uuid::new_v4(),
image: "test-image".to_string(),
Expand Down
5 changes: 3 additions & 2 deletions crates/orchestrator/src/api/routes/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::api::server::AppState;
use crate::plugins::node_groups::get_task_topologies;
use actix_web::{
web::{self, delete, get, post, Data},
HttpResponse, Scope,
Expand Down Expand Up @@ -64,8 +65,8 @@ async fn create_task(task: web::Json<TaskRequest>, app_state: Data<AppState>) ->
}
};

if let Some(group_plugin) = &app_state.node_groups_plugin {
match group_plugin.get_task_topologies(&task) {
if app_state.node_groups_plugin.is_some() {
match get_task_topologies(&task) {
Ok(topology) => {
if topology.is_empty() {
return HttpResponse::BadRequest().json(json!({"success": false, "error": "No topology found for task but grouping plugin is active."}));
Expand Down
4 changes: 2 additions & 2 deletions crates/orchestrator/src/discovery/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct DiscoveryMonitor {
heartbeats: Arc<LoopHeartbeats>,
http_client: reqwest::Client,
max_healthy_nodes_with_same_endpoint: u32,
status_change_handlers: Vec<Box<dyn StatusUpdatePlugin>>,
status_change_handlers: Vec<StatusUpdatePlugin>,
}

impl DiscoveryMonitor {
Expand All @@ -38,7 +38,7 @@ impl DiscoveryMonitor {
store_context: Arc<StoreContext>,
heartbeats: Arc<LoopHeartbeats>,
max_healthy_nodes_with_same_endpoint: u32,
status_change_handlers: Vec<Box<dyn StatusUpdatePlugin>>,
status_change_handlers: Vec<StatusUpdatePlugin>,
) -> Self {
Self {
coordinator_wallet,
Expand Down
7 changes: 0 additions & 7 deletions crates/orchestrator/src/events/mod.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/orchestrator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod api;
mod discovery;
mod events;
mod metrics;
mod models;
mod node;
Expand Down
41 changes: 21 additions & 20 deletions crates/orchestrator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ async fn main() -> Result<()> {
.unwrap();

let group_store_context = store_context.clone();
let mut scheduler_plugins: Vec<Box<dyn SchedulerPlugin>> = Vec::new();
let mut status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
let mut scheduler_plugins: Vec<SchedulerPlugin> = Vec::new();
let mut status_update_plugins: Vec<StatusUpdatePlugin> = vec![];
let mut node_groups_plugin: Option<Arc<NodeGroupsPlugin>> = None;
let mut webhook_plugins: Vec<WebhookPlugin> = vec![];

Expand All @@ -167,7 +167,7 @@ async fn main() -> Result<()> {
let plugin = WebhookPlugin::new(config);
let plugin_clone = plugin.clone();
webhook_plugins.push(plugin_clone);
status_update_plugins.push(Box::new(plugin));
status_update_plugins.push(plugin.into());
info!("Plugin: Webhook plugin initialized");
}
}
Expand Down Expand Up @@ -201,26 +201,27 @@ async fn main() -> Result<()> {
match serde_json::from_str::<Vec<NodeGroupConfiguration>>(&node_group_configs) {
Ok(configs) if !configs.is_empty() => {
let node_groups_heartbeats = heartbeats.clone();
let group_plugin = NodeGroupsPlugin::new(

let group_plugin = Arc::new(NodeGroupsPlugin::new(
configs,
store.clone(),
group_store_context.clone(),
Some(node_groups_heartbeats.clone()),
Some(webhook_plugins.clone()),
);
));

// Register the plugin as a task observer
group_store_context
.task_store
.add_observer(group_plugin.clone())
.await;

let status_group_plugin = group_plugin.clone();
let group_plugin_for_server = group_plugin.clone();
let group_plugin_arc = Arc::new(group_plugin_for_server);

// Register the plugin as a task observer
if let Err(e) = group_plugin_arc.clone().register_observer().await {
error!("Failed to register node groups plugin as observer: {e}");
}

node_groups_plugin = Some(group_plugin_arc);
scheduler_plugins.push(Box::new(group_plugin));
status_update_plugins.push(Box::new(status_group_plugin));
node_groups_plugin = Some(group_plugin_for_server);
scheduler_plugins.push(group_plugin.into());
status_update_plugins.push(status_group_plugin.into());
info!("Plugin: Node group plugin initialized");
}
Ok(_) => {
Expand Down Expand Up @@ -263,16 +264,16 @@ async fn main() -> Result<()> {
}

// Create status_update_plugins for discovery monitor
let mut discovery_status_update_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
let mut discovery_status_update_plugins: Vec<StatusUpdatePlugin> = vec![];

// Add webhook plugins to discovery status update plugins
for plugin in &webhook_plugins {
discovery_status_update_plugins.push(Box::new(plugin.clone()));
discovery_status_update_plugins.push(plugin.into());
}

// Add node groups plugin if available
if let Some(group_plugin) = node_groups_plugin.clone() {
discovery_status_update_plugins.push(Box::new(group_plugin.as_ref().clone()));
discovery_status_update_plugins.push(group_plugin.into());
}

let discovery_store_context = store_context.clone();
Expand Down Expand Up @@ -316,16 +317,16 @@ async fn main() -> Result<()> {
});

// Create status_update_plugins for status updater
let mut status_updater_plugins: Vec<Box<dyn StatusUpdatePlugin>> = vec![];
let mut status_updater_plugins: Vec<StatusUpdatePlugin> = vec![];

// Add webhook plugins to status updater plugins
for plugin in &webhook_plugins {
status_updater_plugins.push(Box::new(plugin.clone()));
status_updater_plugins.push(plugin.into());
}

// Add node groups plugin if available
if let Some(group_plugin) = node_groups_plugin.clone() {
status_updater_plugins.push(Box::new(group_plugin.as_ref().clone()));
status_updater_plugins.push(group_plugin.into());
}

let status_update_store_context = store_context.clone();
Expand Down
103 changes: 99 additions & 4 deletions crates/orchestrator/src/plugins/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,103 @@
mod traits;
pub use traits::*;
use crate::plugins::newest_task::NewestTaskPlugin;
use alloy::primitives::Address;
use anyhow::Result;
use shared::models::task::Task;
use std::sync::Arc;

pub(crate) mod node_groups;
use crate::{
models::node::{NodeStatus, OrchestratorNode},
plugins::node_groups::NodeGroupsPlugin,
plugins::webhook::WebhookPlugin,
};

pub(crate) mod newest_task;

pub(crate) mod node_groups;
pub(crate) mod webhook;

#[derive(Clone)]
pub enum StatusUpdatePlugin {
NodeGroupsPlugin(Arc<NodeGroupsPlugin>),
WebhookPlugin(WebhookPlugin),
}

impl StatusUpdatePlugin {
pub(crate) async fn handle_status_change(
&self,
node: &OrchestratorNode,
status: &NodeStatus,
) -> Result<()> {
match self {
StatusUpdatePlugin::NodeGroupsPlugin(plugin) => plugin.handle_status_change(node).await,
StatusUpdatePlugin::WebhookPlugin(plugin) => plugin.handle_status_change(node, status),
}
}
}

impl From<Arc<NodeGroupsPlugin>> for StatusUpdatePlugin {
fn from(plugin: Arc<NodeGroupsPlugin>) -> Self {
StatusUpdatePlugin::NodeGroupsPlugin(plugin)
}
}

impl From<&Arc<NodeGroupsPlugin>> for StatusUpdatePlugin {
fn from(plugin: &Arc<NodeGroupsPlugin>) -> Self {
StatusUpdatePlugin::NodeGroupsPlugin(plugin.clone())
}
}

impl From<WebhookPlugin> for StatusUpdatePlugin {
fn from(plugin: WebhookPlugin) -> Self {
StatusUpdatePlugin::WebhookPlugin(plugin)
}
}

impl From<&WebhookPlugin> for StatusUpdatePlugin {
fn from(plugin: &WebhookPlugin) -> Self {
StatusUpdatePlugin::WebhookPlugin(plugin.clone())
}
}

#[derive(Clone)]
pub enum SchedulerPlugin {
NodeGroupsPlugin(Arc<NodeGroupsPlugin>),
NewestTaskPlugin(NewestTaskPlugin),
}

impl SchedulerPlugin {
pub(crate) async fn filter_tasks(
&self,
tasks: &[Task],
node_address: &Address,
) -> Result<Vec<Task>> {
match self {
SchedulerPlugin::NodeGroupsPlugin(plugin) => {
plugin.filter_tasks(tasks, node_address).await
}
SchedulerPlugin::NewestTaskPlugin(plugin) => plugin.filter_tasks(tasks),
}
}
}

impl From<Arc<NodeGroupsPlugin>> for SchedulerPlugin {
fn from(plugin: Arc<NodeGroupsPlugin>) -> Self {
SchedulerPlugin::NodeGroupsPlugin(plugin)
}
}

impl From<&Arc<NodeGroupsPlugin>> for SchedulerPlugin {
fn from(plugin: &Arc<NodeGroupsPlugin>) -> Self {
SchedulerPlugin::NodeGroupsPlugin(plugin.clone())
}
}

impl From<NewestTaskPlugin> for SchedulerPlugin {
fn from(plugin: NewestTaskPlugin) -> Self {
SchedulerPlugin::NewestTaskPlugin(plugin)
}
}

impl From<&NewestTaskPlugin> for SchedulerPlugin {
fn from(plugin: &NewestTaskPlugin) -> Self {
SchedulerPlugin::NewestTaskPlugin(plugin.clone())
}
}
20 changes: 7 additions & 13 deletions crates/orchestrator/src/plugins/newest_task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use alloy::primitives::Address;
use anyhow::Result;
use async_trait::async_trait;
use shared::models::task::Task;

use super::{Plugin, SchedulerPlugin};
#[derive(Clone)]
pub struct NewestTaskPlugin;

pub(crate) struct NewestTaskPlugin;

impl Plugin for NewestTaskPlugin {}

#[async_trait]
impl SchedulerPlugin for NewestTaskPlugin {
async fn filter_tasks(&self, tasks: &[Task], _node_address: &Address) -> Result<Vec<Task>> {
impl NewestTaskPlugin {
pub(crate) fn filter_tasks(&self, tasks: &[Task]) -> Result<Vec<Task>> {
if tasks.is_empty() {
return Ok(vec![]);
}
Expand All @@ -32,8 +26,8 @@ mod tests {

use super::*;

#[tokio::test]
async fn test_filter_tasks() {
#[test]
fn test_filter_tasks() {
let plugin = NewestTaskPlugin;
let tasks = vec![
Task {
Expand All @@ -54,7 +48,7 @@ mod tests {
},
];

let filtered_tasks = plugin.filter_tasks(&tasks, &Address::ZERO).await.unwrap();
let filtered_tasks = plugin.filter_tasks(&tasks).unwrap();
assert_eq!(filtered_tasks.len(), 1);
assert_eq!(filtered_tasks[0].id, tasks[1].id);
}
Expand Down
Loading