-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Summary
The DelegateServiceScheduler (created via Schedulers.fromExecutorService) delegates periodic scheduling to Schedulers.directSchedulePeriodically. This method wraps the task in a PeriodicSchedulerTask but does not defensively catch exceptions thrown by the user's Runnable.
According to the JDK ScheduledExecutorService contract, if a task execution encounters an exception, any subsequent executions are suppressed. Because the exception is not caught within the wrapper, a single transient failure in the user logic causes the periodic schedule to silently terminate forever without any default logging.
Expected Behavior
The infrastructure should be resilient to user-level exceptions in periodic tasks. It should:
- Catch
Throwableinside the execution loop. - Handle it (e.g., via
Schedulers.handleError(t)or internal logging). - Ensure the
Runnablecompletes normally so that the underlying JDKScheduledExecutorServicecontinues to schedule future runs.
Actual Behavior
The exception propagates out of the wrapper. The underlying JDK ScheduledExecutorService catches the exception and stops rescheduling the task. No error logs are produced by default, leading to a "Silent Failure" (Exception Not Caught - ENC pattern).
Steps to Reproduce
import org.junit.jupiter.api.Test;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
class SchedulerBugTest {
@Test
void periodicTaskSilentlyDiesOnException() throws InterruptedException {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
Scheduler scheduler = Schedulers.fromExecutorService(exec);
AtomicInteger runCount = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(2); // We expect at least 2 runs
scheduler.schedulePeriodically(() -> {
int current = runCount.incrementAndGet();
System.out.println("Execution: " + current);
latch.countDown();
// Simulate a transient error on the first run
if (current == 1) {
throw new RuntimeException("Transient Error");
}
}, 0, 100, TimeUnit.MILLISECONDS);
// Wait to see if the second execution happens
boolean secondRunHappened = latch.await(1, TimeUnit.SECONDS);
// ACTUAL: false (The scheduler died after the first run)
// EXPECTED: true (The scheduler should survive the exception)
assertThat(secondRunHappened).as("Scheduler should continue after exception").isFalse();
assertThat(runCount.get()).as("Task ran only once").isEqualTo(1);
exec.shutdown();
}
}
Possible Solution
In Schedulers.java, the PeriodicSchedulerTask (or the Runnable passed to it) needs to wrap the user's task execution in a try-catch block.
Root Cause Analysis: In DelegateServiceScheduler.java:
@Override
public Disposable schedulePeriodically(Runnable task, long initialDelay, long period, TimeUnit unit) {
return Schedulers.directSchedulePeriodically(getOrCreate(), task, initialDelay, period, unit);
}
In Schedulers.java, directSchedulePeriodically creates a PeriodicSchedulerTask which submits directly to exec.scheduleAtFixedRate. The run() method of the task (or its wrapper) lacks exception trapping.
Environment
Reactor Core version: Latest (Main)
JDK:1.8