Skip to content

Commit b7f29b8

Browse files
committed
feat(gas): add automatic workflow pruning (#4169)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 8a9aecd commit b7f29b8

File tree

31 files changed

+884
-251
lines changed

31 files changed

+884
-251
lines changed

Cargo.lock

Lines changed: 18 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ members = [
2020
"engine/packages/error-macros",
2121
"engine/packages/gasoline",
2222
"engine/packages/gasoline-macros",
23+
"engine/packages/gasoline-runtime",
2324
"engine/packages/guard",
2425
"engine/packages/guard-core",
2526
"engine/packages/logs",
2627
"engine/packages/metrics",
27-
"engine/packages/metrics-aggregator",
2828
"engine/packages/namespace",
2929
"engine/packages/pegboard",
3030
"engine/packages/pegboard-gateway",
@@ -395,6 +395,9 @@ members = [
395395
[workspace.dependencies.gasoline-macros]
396396
path = "engine/packages/gasoline-macros"
397397

398+
[workspace.dependencies.gasoline-runtime]
399+
path = "engine/packages/gasoline-runtime"
400+
398401
[workspace.dependencies.rivet-guard]
399402
path = "engine/packages/guard"
400403

@@ -407,9 +410,6 @@ members = [
407410
[workspace.dependencies.rivet-metrics]
408411
path = "engine/packages/metrics"
409412

410-
[workspace.dependencies.rivet-metrics-aggregator]
411-
path = "engine/packages/metrics-aggregator"
412-
413413
[workspace.dependencies.namespace]
414414
path = "engine/packages/namespace"
415415

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Workflow Pruning
2+
3+
TODO

engine/artifacts/config-schema.json

Lines changed: 39 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

engine/packages/bootstrap/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ license.workspace = true
88
[dependencies]
99
epoxy.workspace = true
1010
gas.workspace = true
11+
gasoline-runtime.workspace = true
1112
namespace.workspace = true
1213
pegboard.workspace = true
1314
rivet-config.workspace = true
@@ -18,4 +19,4 @@ serde.workspace = true
1819
tokio-tungstenite.workspace = true
1920
tracing.workspace = true
2021
universaldb.workspace = true
21-
url.workspace = true
22+
url.workspace = true

engine/packages/bootstrap/src/lib.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
2222
},
2323
create_default_namespace(&ctx),
2424
backfill::run(&ctx),
25+
setup_pegboard_metrics_aggregator(&ctx),
26+
setup_gas_pruner(&ctx),
2527
)?;
2628

2729
Ok(())
@@ -103,3 +105,27 @@ async fn create_default_namespace(ctx: &StandaloneCtx) -> Result<()> {
103105

104106
Ok(())
105107
}
108+
109+
async fn setup_pegboard_metrics_aggregator(ctx: &StandaloneCtx) -> Result<()> {
110+
// Create metrics aggregator if does not exist
111+
let workflow_id = ctx
112+
.workflow(pegboard::workflows::metrics_aggregator::Input {})
113+
.unique()
114+
.dispatch()
115+
.await?;
116+
tracing::info!(%workflow_id, "created pegboard metrics aggregator");
117+
118+
Ok(())
119+
}
120+
121+
async fn setup_gas_pruner(ctx: &StandaloneCtx) -> Result<()> {
122+
// Create gas pruner if does not exist
123+
let workflow_id = ctx
124+
.workflow(gasoline_runtime::workflows::pruner::Input {})
125+
.unique()
126+
.dispatch()
127+
.await?;
128+
tracing::info!(%workflow_id, "created gasoline pruner");
129+
130+
Ok(())
131+
}

engine/packages/config/src/config/runtime.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use serde::{Deserialize, Serialize};
88
pub struct Runtime {
99
#[serde(default)]
1010
pub worker: Worker,
11+
#[serde(default)]
12+
pub gasoline: Gasoline,
1113
/// Time (in seconds) to allow for guard to wait for pending requests after receiving SIGTERM. Defaults
1214
/// to 1 hour.
1315
guard_shutdown_duration: Option<u32>,
@@ -71,3 +73,31 @@ impl Worker {
7173
Duration::from_secs(self.shutdown_duration.unwrap_or(30) as u64)
7274
}
7375
}
76+
77+
#[derive(Debug, Clone, Serialize, Deserialize, Default, JsonSchema)]
78+
#[serde(deny_unknown_fields)]
79+
pub struct Gasoline {
80+
/// Time (in seconds) after completion before considering a workflow eligible for pruning. Defaults to 7
81+
/// days. Set to 0 to never prune workflow data.
82+
prune_eligibility_duration: Option<u64>,
83+
/// Time (in seconds) to periodically check for workflows to prune. Defaults to 12 hours.
84+
prune_interval_duration: Option<u64>,
85+
}
86+
87+
impl Gasoline {
88+
pub fn prune_eligibility_duration(&self) -> Option<Duration> {
89+
if let Some(prune_eligibility_duration) = self.prune_eligibility_duration {
90+
if prune_eligibility_duration == 0 {
91+
None
92+
} else {
93+
Some(Duration::from_secs(prune_eligibility_duration))
94+
}
95+
} else {
96+
Some(Duration::from_secs(60 * 60 * 24 * 7))
97+
}
98+
}
99+
100+
pub fn prune_interval_duration(&self) -> Duration {
101+
Duration::from_secs(self.prune_interval_duration.unwrap_or(60 * 60 * 12))
102+
}
103+
}

engine/packages/engine/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@ rivet-cache.workspace = true
3232
rivet-config.workspace = true
3333
rivet-guard.workspace = true
3434
rivet-logs.workspace = true
35-
rivet-metrics-aggregator.workspace = true
3635
rivet-pools.workspace = true
3736
rivet-runtime.workspace = true
3837
rivet-service-manager.workspace = true

engine/packages/engine/src/run_config.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,6 @@ pub fn config(_rivet_config: rivet_config::Config) -> Result<RunConfigData> {
2121
|config, pools| Box::pin(rivet_workflow_worker::start(config, pools)),
2222
true,
2323
),
24-
Service::new(
25-
"metrics_aggregator",
26-
ServiceKind::Standalone,
27-
|config, pools| Box::pin(rivet_metrics_aggregator::start(config, pools)),
28-
false,
29-
),
3024
Service::new(
3125
"bootstrap",
3226
ServiceKind::Oneshot,

0 commit comments

Comments
 (0)