Skip to content

Commit 2ca502d

Browse files
authored
Merge pull request systeminit#5263 from systeminit/brit/bubble-up-veritech-errors
fix(veritech): When errors occur that don't cause us to retry the function, inform subscribers
2 parents 37f7783 + 746f098 commit 2ca502d

File tree

2 files changed

+79
-26
lines changed

2 files changed

+79
-26
lines changed

lib/dal/tests/integration_test/func/kill_execution.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use dal_test::helpers::{
1111
};
1212
use dal_test::test;
1313
use pretty_assertions_sorted::assert_eq;
14-
use si_events::FuncRunState;
14+
use si_events::{ActionResultState, FuncRunState};
1515

1616
#[test]
1717
async fn kill_execution_works(ctx: &mut DalContext) {
@@ -96,6 +96,18 @@ async fn kill_execution_works(ctx: &mut DalContext) {
9696
FuncRunner::kill_execution(ctx, func_run_id)
9797
.await
9898
.expect("could not kill execution");
99+
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
100+
.await
101+
.expect("could not commit and update snapshot to visibility");
102+
let action_result_state = ctx
103+
.layer_db()
104+
.func_run()
105+
.read(func_run_id)
106+
.await
107+
.expect("could not get func run")
108+
.expect("no func run found")
109+
.action_result_state()
110+
.expect("action result state found");
99111
let func_run_state = ctx
100112
.layer_db()
101113
.func_run()
@@ -104,8 +116,21 @@ async fn kill_execution_works(ctx: &mut DalContext) {
104116
.expect("could not get func run")
105117
.expect("no func run found")
106118
.state();
119+
120+
// This should be [`FuncRunState::Killed`]
121+
// but because this is getting bubbled back to the Pinga Job running the
122+
// action, and the error is now embedded in a successful response,
123+
// (because we did get a response from Veritech)
124+
// Pinga is setting the func run state to success and the
125+
// action result state to failure.
126+
// make the madness end pls.
127+
assert_eq!(
128+
FuncRunState::Success, // expected
129+
func_run_state // actual
130+
);
131+
107132
assert_eq!(
108-
FuncRunState::Killed, // expected
109-
func_run_state // actual
133+
ActionResultState::Failure, // expected
134+
action_result_state // actual
110135
);
111136
}

lib/veritech-server/src/handlers.rs

Lines changed: 51 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ use si_data_nats::{InnerMessage, Subject};
1010
// seems strange to get these cyclone_core types from si_pool_noodle?
1111
use si_pool_noodle::{
1212
ActionRunResultSuccess, CycloneClient, CycloneRequest, CycloneRequestable, ExecutionError,
13-
ManagementResultSuccess, ProgressMessage, ResolverFunctionResultSuccess,
14-
SchemaVariantDefinitionResultSuccess, SensitiveStrings, ValidationResultSuccess,
13+
FunctionResultFailure, FunctionResultFailureError, ManagementResultSuccess, ProgressMessage,
14+
ResolverFunctionResultSuccess, SchemaVariantDefinitionResultSuccess, SensitiveStrings,
15+
ValidationResultSuccess,
1516
};
1617
use std::{collections::HashMap, result, str::Utf8Error, sync::Arc, time::Duration};
1718
use telemetry::prelude::*;
18-
use telemetry_utils::metric;
1919
use thiserror::Error;
2020
use tokio::sync::{oneshot, Mutex};
2121
use veritech_core::{
@@ -186,7 +186,6 @@ where
186186
let nats_for_publisher = state.nats.clone();
187187
let publisher = Publisher::new(&nats_for_publisher, &reply_mailbox);
188188
let execution_id = request.execution_id().to_owned();
189-
190189
let cyclone_request = CycloneRequest::from_parts(request.clone(), sensitive_strings);
191190

192191
let (kill_sender, kill_receiver) = oneshot::channel::<()>();
@@ -224,7 +223,7 @@ where
224223
trace!("received heartbeat message");
225224
}
226225
Err(err) => {
227-
warn!(error = ?err, "next progress message was an error, bailing out");
226+
warn!(si.error.message = ?err, "next progress message was an error, bailing out");
228227
break;
229228
}
230229
}
@@ -242,47 +241,76 @@ where
242241
HandlerResult::Ok(function_result)
243242
};
244243

245-
// we do not want to return errors at this point as it will retry functions that may have
246-
// failed for legitimate reasons and should not be retried
244+
// we do not want to return errors at this point as it will Nack the message and end up auto-retrying
245+
// functions that may have failed for legitimate reasons and should not be retried
247246
let timeout = state.cyclone_client_execution_timeout;
248247
let result = tokio::select! {
249248
_ = tokio::time::sleep(timeout) => {
250-
error!("hit timeout for communicating with cyclone server");
251-
kill_sender_remove_blocking(&state.kill_senders, execution_id).await?;
249+
error!("hit timeout for communicating with cyclone server:{:?}", &timeout);
250+
kill_sender_remove_blocking(&state.kill_senders, execution_id.to_owned()).await?;
252251
Err(HandlerError::CycloneTimeout(
253252
timeout,
254253
))
255254
},
256255
Ok(_) = kill_receiver => {
257-
Err(HandlerError::Killed(execution_id))
256+
Err(HandlerError::Killed(execution_id.clone()))
258257
}
259258
func_result = progress_loop => {
260-
kill_sender_remove_blocking(&state.kill_senders, execution_id).await?;
259+
kill_sender_remove_blocking(&state.kill_senders, execution_id.to_owned()).await?;
261260
func_result
262261
},
263262
};
264263

265264
match result {
265+
// Got an Ok - let anyone subscribing to a reply know
266266
Ok(function_result) => {
267267
if let Err(err) = publisher.publish_result(&function_result).await {
268-
metric!(counter.function_run.action = -1);
269-
error!(error = ?err, "failed to publish errored result");
268+
error!(si.error.message = ?err, "failed to publish errored result");
270269
}
271270

272271
request.dec_run_metric();
273272
span.record_ok();
274273
}
275-
Err(HandlerError::CycloneTimeout(timeout)) => {
276-
request.dec_run_metric();
277-
warn!(error = ?timeout, "timed out trying to run function to completion");
278-
}
279-
Err(HandlerError::Killed(execution_id)) => {
280-
request.dec_run_metric();
281-
info!(error = ?execution_id, "function killed during execution via signal");
282-
}
283-
Err(err) => {
274+
// Got an error that we don't want to recover from here - need to let anyone subscribing know we're done
275+
// so they're not waiting forever and can decide how to proceed
276+
// Construct the Error result to propagate to subscribers
277+
Err(ref err) => {
278+
let func_result_error = match err {
279+
HandlerError::CycloneTimeout(ref timeout) => {
280+
warn!(si.error.message = ?err, "timed out trying to run function to completion: {:?}", timeout);
281+
let func_res_failure = FunctionResultFailure::new_for_veritech_server_error(
282+
execution_id.to_owned(),
283+
"timed out trying to run function to completion",
284+
timestamp(),
285+
);
286+
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
287+
}
288+
HandlerError::Killed(ref execution_id) => {
289+
warn!(si.error.message = ?err, si.func_run.id = ?execution_id, "function killed during execution: {:?} via signal", execution_id);
290+
let func_res_failure = FunctionResultFailure::new(
291+
execution_id.to_owned(),
292+
FunctionResultFailureError {
293+
kind: si_pool_noodle::FunctionResultFailureErrorKind::KilledExecution,
294+
message: "function execution terminated".to_owned(),
295+
},
296+
timestamp(),
297+
);
298+
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
299+
}
300+
err => {
301+
error!(si.error.message = ?err, si.func_run.id = ?execution_id.to_owned(), "failure trying to run function to completion");
302+
let func_res_failure = FunctionResultFailure::new_for_veritech_server_error(
303+
execution_id.to_owned(),
304+
"timed out trying to run function to completion",
305+
timestamp(),
306+
);
307+
si_pool_noodle::FunctionResult::Failure::<Request>(func_res_failure)
308+
}
309+
};
284310
request.dec_run_metric();
285-
error!(error = ?err, "failure trying to run function to completion");
311+
if let Err(err) = publisher.publish_result(&func_result_error).await {
312+
error!(error = ?err, "failed to publish errored result");
313+
}
286314
}
287315
}
288316

0 commit comments

Comments
 (0)