diff --git a/org.eclipse.paho.mqttv5.client.test/src/test/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionStateTest.java b/org.eclipse.paho.mqttv5.client.test/src/test/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionStateTest.java new file mode 100644 index 000000000..1f383b272 --- /dev/null +++ b/org.eclipse.paho.mqttv5.client.test/src/test/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionStateTest.java @@ -0,0 +1,28 @@ +package org.eclipse.paho.mqttv5.client.internal; + +import org.junit.Test; + +import static org.junit.Assert.*; + +public class MqttSessionStateTest { + + @Test + public void testClearSessionState() { + MqttSessionState state = new MqttSessionState(); + state.clearSessionState(); + assertTrue("Clear session state resets subscription identifier", 1 == state.getNextSubscriptionIdentifier()); + } + + /** + * Test that the subscription identifier is bounded between 1 and 268,435,455 + */ + @Test + public void testSubscriptionIdIsBounded() { + MqttSessionState state = new MqttSessionState(); + for (int i = 1; i <= 268_435_456; i++) { + assertTrue("Subscription identifier minimum bound", state.getNextSubscriptionIdentifier()>=1); + assertTrue("Subscription identifier maximum bound", state.getNextSubscriptionIdentifier()<=268_435_455); + } + } + +} diff --git a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionState.java b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionState.java index 63eb7f650..9df1fb4c4 100644 --- a/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionState.java +++ b/org.eclipse.paho.mqttv5.client/src/main/java/org/eclipse/paho/mqttv5/client/internal/MqttSessionState.java @@ -14,18 +14,34 @@ *
  • Next Subscription Identifier - The next subscription Identifier available * to use.
  • * + * + * Subscription identifier can take values from 1 to 268,435,455 according to MQTTv5 specification + * @see MQTTv5 Specification 3.8.2.1.2 Subscription Identifier */ public class MqttSessionState { // ******* Session Specific Properties and counters ******// private AtomicInteger nextSubscriptionIdentifier = new AtomicInteger(1); private String clientId; + private Integer SUBSCRIPTION_IDENTIFIER_MAX_LIMIT = 268_435_455; public void clearSessionState() { nextSubscriptionIdentifier.set(1); } public Integer getNextSubscriptionIdentifier() { + Integer nextValue = nextSubscriptionIdentifier.getAndIncrement(); + if (nextValue <= SUBSCRIPTION_IDENTIFIER_MAX_LIMIT) { + return nextValue; + } + + // nextValue > SUBSCRIPTION_IDENTIFIER_MAX_LIMIT, so we need to restart the identifier from 1 + synchronized(nextSubscriptionIdentifier) { + // read again to make sure no other thread has updated the value + if (nextSubscriptionIdentifier.get() > SUBSCRIPTION_IDENTIFIER_MAX_LIMIT) { + clearSessionState(); + } + } return nextSubscriptionIdentifier.getAndIncrement(); }