Skip to content

Commit 7031c04

Browse files
thjaeckleclaude
andcommitted
Fix spurious "Unknown message" warnings in ConnectionPersistenceActor
During connection recovery, ConnectionPersistenceActor logged WARN-level "Unknown message: ReceiveTimeout" and "Unknown message: PA_RECOVERED" messages. These were harmless but added noise to logs, especially during rolling deployments when many connections recover simultaneously. Root causes: - ReceiveTimeout: The 2s receive timeout set in ConnectionSupervisorActor .preStart() could get stashed during startup and later forwarded to the persistence actor via the supervisor's catch-all matchAny handler, since activeBehaviour had no explicit ReceiveTimeout handler. - PA_RECOVERED: The NoOpEnforcerActor's blanket matchAny echoed all messages back to the sender. When the supervisor forwarded PA_RECOVERED to the enforcer with sender=PA, the NoOpEnforcerActor bounced it right back to the persistence actor. Fixes: - ConnectionSupervisorActor: Handle ReceiveTimeout in activeBehaviour - NoOpEnforcerActorPropsFactory: Consume PA_RECOVERED control signal instead of echoing it back to the persistence actor - ConnectionPersistenceActor: Defense-in-depth handlers for both messages at DEBUG level in matchAnyAfterInitialization Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e4a058a commit 7031c04

File tree

3 files changed

+20
-0
lines changed

3 files changed

+20
-0
lines changed

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/enforcement/NoOpEnforcerActorPropsFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import com.typesafe.config.Config;
1818

19+
import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor;
20+
1921
import org.apache.pekko.actor.AbstractActor;
2022
import org.apache.pekko.actor.ActorRef;
2123
import org.apache.pekko.actor.ActorSystem;
@@ -42,6 +44,9 @@ private static Props props() {
4244
@Override
4345
public Receive createReceive() {
4446
return receiveBuilder()
47+
.matchEquals(AbstractEnforcerActor.Control.PA_RECOVERED, msg -> {
48+
// PA_RECOVERED is a control signal from the supervisor — consume it, don't echo back.
49+
})
4550
.matchAny(any -> sender().tell(any, ActorRef.noSender()))
4651
.build();
4752
}

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.pekko.actor.ActorRef;
4141
import org.apache.pekko.actor.ActorSystem;
4242
import org.apache.pekko.actor.Props;
43+
import org.apache.pekko.actor.ReceiveTimeout;
4344
import org.apache.pekko.actor.Status;
4445
import org.apache.pekko.actor.SupervisorStrategy;
4546
import org.apache.pekko.cluster.Cluster;
@@ -135,6 +136,7 @@
135136
import org.eclipse.ditto.internal.utils.persistence.mongo.config.SnapshotConfig;
136137
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
137138
import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceActor;
139+
import org.eclipse.ditto.policies.enforcement.AbstractEnforcerActor;
138140
import org.eclipse.ditto.internal.utils.persistentactors.EmptyEvent;
139141
import org.eclipse.ditto.internal.utils.persistentactors.commands.CommandStrategy;
140142
import org.eclipse.ditto.internal.utils.persistentactors.commands.DefaultContext;
@@ -706,6 +708,13 @@ protected Receive matchAnyAfterInitialization() {
706708
.matchEquals(Control.TRIGGER_UPDATE_PRIORITY, this::triggerUpdatePriority)
707709
.match(UpdatePriority.class, this::updatePriority)
708710
.match(ConnectionSupervisorActor.RestartByConnectionType.class, this::initiateRestartByConnectionType)
711+
// ReceiveTimeout may arrive here during startup due to the supervisor's
712+
// stash-unstash-forward mechanism — ignore gracefully:
713+
.match(ReceiveTimeout.class, receiveTimeout ->
714+
log.debug("Ignoring ReceiveTimeout forwarded from supervisor during startup."))
715+
// PA_RECOVERED may be echoed back by NoOpEnforcerActor — ignore gracefully:
716+
.matchEquals(AbstractEnforcerActor.Control.PA_RECOVERED, paRecovered ->
717+
log.debug("Ignoring PA_RECOVERED echoed back by enforcer."))
709718
.build()
710719
.orElse(super.matchAnyAfterInitialization());
711720
}

connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionSupervisorActor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,12 @@ protected Receive activeBehaviour(final Runnable matchProcessNextTwinMessageBeha
153153
log.debug("Successfully registered for connectivity config changes.");
154154
isRegisteredForConnectivityConfigChanges = true;
155155
})
156+
.match(ReceiveTimeout.class, receiveTimeout -> {
157+
// ReceiveTimeout may still be in the stash from the initial 2s timeout set in preStart().
158+
// Cancel and ignore it — it served its purpose during startup.
159+
log.debug("Received lingering ReceiveTimeout in active state, cancelling.");
160+
getContext().cancelReceiveTimeout();
161+
})
156162
.build()
157163
.orElse(connectivityConfigModifiedBehavior(getSelf(), () -> persistenceActorChild))
158164
.orElse(super.activeBehaviour(matchProcessNextTwinMessageBehavior, matchAnyBehavior));

0 commit comments

Comments
 (0)