Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.eclipse.paho.mqttv5.client.internal;

import org.junit.Test;

import static org.junit.Assert.*;

public class MqttSessionStateTest {

@Test
public void testClearSessionState() {
MqttSessionState state = new MqttSessionState();
state.clearSessionState();
assertTrue("Clear session state resets subscription identifier", 1 == state.getNextSubscriptionIdentifier());
}

/**
* Test that the subscription identifier is bounded between 1 and 268,435,455
*/
@Test
public void testSubscriptionIdIsBounded() {
MqttSessionState state = new MqttSessionState();
for (int i = 1; i <= 268_435_456; i++) {
assertTrue("Subscription identifier minimum bound", state.getNextSubscriptionIdentifier()>=1);
assertTrue("Subscription identifier maximum bound", state.getNextSubscriptionIdentifier()<=268_435_455);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,34 @@
* <li>Next Subscription Identifier - The next subscription Identifier available
* to use.</li>
* </ul>
*
* Subscription identifier can take values from 1 to 268,435,455 according to MQTTv5 specification
* @see <a href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901117">MQTTv5 Specification 3.8.2.1.2 Subscription Identifier</a>
*/
public class MqttSessionState {

// ******* Session Specific Properties and counters ******//
private AtomicInteger nextSubscriptionIdentifier = new AtomicInteger(1);
private String clientId;
private Integer SUBSCRIPTION_IDENTIFIER_MAX_LIMIT = 268_435_455;

public void clearSessionState() {
nextSubscriptionIdentifier.set(1);
}

public Integer getNextSubscriptionIdentifier() {
Integer nextValue = nextSubscriptionIdentifier.getAndIncrement();
if (nextValue <= SUBSCRIPTION_IDENTIFIER_MAX_LIMIT) {
return nextValue;
}

// nextValue > SUBSCRIPTION_IDENTIFIER_MAX_LIMIT, so we need to restart the identifier from 1
synchronized(nextSubscriptionIdentifier) {
// read again to make sure no other thread has updated the value
if (nextSubscriptionIdentifier.get() > SUBSCRIPTION_IDENTIFIER_MAX_LIMIT) {
clearSessionState();
}
}
return nextSubscriptionIdentifier.getAndIncrement();
}

Expand Down