Skip to content

Commit 4f88964

Browse files
authored
TINKERPOP-3213 Add SessionedChildClient (#3258)
Added a new SessionedChildClient which reuses connection from parent Client instead of creating new one
1 parent 3bdb5bd commit 4f88964

File tree

6 files changed

+124
-4
lines changed

6 files changed

+124
-4
lines changed

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Client.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,80 @@ public Client alias(final Map<String, String> aliases) {
773773
}
774774
}
775775

776+
/**
777+
* A {@code Client} implementation that operates in the context of a session. {@code ChildSessionClient} is tied to
778+
* another client as a child, it borrows the connection from the parent client's pool for the transaction. Requests are
779+
* sent to a single server, where each request is bound to the same thread and same connection with the same set of
780+
* bindings across requests.
781+
* Transaction are not automatically committed. It is up the client to issue commit/rollback commands.
782+
*/
783+
public final static class SessionedChildClient extends Client {
784+
private final String sessionId;
785+
private final boolean manageTransactions;
786+
private final boolean maintainStateAfterException;
787+
private final Client parentClient;
788+
private Connection borrowedConnection;
789+
private boolean closed;
790+
791+
public SessionedChildClient(final Client parentClient, String sessionId) {
792+
super(parentClient.cluster, parentClient.settings);
793+
this.parentClient = parentClient;
794+
this.sessionId = sessionId;
795+
this.closed = false;
796+
this.manageTransactions = parentClient.settings.getSession().map(s -> s.manageTransactions).orElse(false);
797+
this.maintainStateAfterException = parentClient.settings.getSession().map(s -> s.maintainStateAfterException).orElse(false);
798+
}
799+
800+
public String getSessionId() {
801+
return sessionId;
802+
}
803+
804+
@Override
805+
public RequestMessage.Builder buildMessage(final RequestMessage.Builder builder) {
806+
builder.processor("session");
807+
builder.addArg(Tokens.ARGS_SESSION, sessionId);
808+
builder.addArg(Tokens.ARGS_MANAGE_TRANSACTION, manageTransactions);
809+
builder.addArg(Tokens.ARGS_MAINTAIN_STATE_AFTER_EXCEPTION, maintainStateAfterException);
810+
return builder;
811+
}
812+
813+
@Override
814+
protected void initializeImplementation() {
815+
// do nothing, parentClient is already initialized
816+
}
817+
818+
@Override
819+
protected synchronized Connection chooseConnection(RequestMessage msg) throws TimeoutException, ConnectionException {
820+
if (borrowedConnection == null) {
821+
//Borrow from parentClient's pool instead of creating new connection
822+
borrowedConnection = parentClient.chooseConnection(msg);
823+
}
824+
//Increment everytime, the connection is chosen, all these will be decremented when transaction is commited/rolledback
825+
borrowedConnection.borrowed.incrementAndGet();
826+
return borrowedConnection;
827+
}
828+
829+
@Override
830+
public synchronized CompletableFuture<Void> closeAsync() {
831+
if (borrowedConnection != null && !borrowedConnection.isDead()) {
832+
833+
//Decrement borrowed one last time which was incremented by parentClient when the connection is borrowed initially
834+
//returnToPool() does this
835+
borrowedConnection.returnToPool();
836+
837+
borrowedConnection = null;
838+
}
839+
closed = true;
840+
return CompletableFuture.completedFuture(null);
841+
}
842+
843+
@Override
844+
public boolean isClosing() {
845+
return parentClient.isClosing() || closed;
846+
}
847+
}
848+
849+
776850
/**
777851
* A {@code Client} implementation that operates in the context of a session. Requests are sent to a single
778852
* server, where each request is bound to the same thread with the same set of bindings across requests.

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Cluster.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,13 @@ public int getPort() {
459459
return manager.port;
460460
}
461461

462+
/**
463+
* Determines whether to reuse connections for transactions or create new ones.
464+
*/
465+
public boolean isReuseConnectionsForSessions() {
466+
return manager.reuseConnectionsForSessions;
467+
}
468+
462469
/**
463470
* Gets a list of all the configured hosts.
464471
*/
@@ -624,6 +631,7 @@ public final static class Builder {
624631
private long connectionSetupTimeoutMillis = Connection.CONNECTION_SETUP_TIMEOUT_MILLIS;
625632
private boolean enableUserAgentOnConnect = true;
626633
private boolean enableCompression = true;
634+
private boolean reuseConnectionsForSessions = false;
627635

628636
private Builder() {
629637
// empty to prevent direct instantiation
@@ -872,6 +880,14 @@ public Builder maxWaitForConnection(final int maxWait) {
872880
return this;
873881
}
874882

883+
/**
884+
* If true, reuses the connections for transactions
885+
*/
886+
public Builder reuseConnectionsForSessions(final boolean reuseConnectionsForSessions) {
887+
this.reuseConnectionsForSessions = reuseConnectionsForSessions;
888+
return this;
889+
}
890+
875891
/**
876892
* The amount of time in milliseconds to wait the connection to close before timing out where the default
877893
* value is 3000. This timeout allows for a delay to occur in waiting for remaining messages that may still
@@ -1118,6 +1134,7 @@ class Manager {
11181134
private final String path;
11191135
private final boolean enableUserAgentOnConnect;
11201136
private final boolean enableCompression;
1137+
private final boolean reuseConnectionsForSessions;
11211138

11221139
private final AtomicReference<CompletableFuture<Void>> closeFuture = new AtomicReference<>();
11231140

@@ -1132,6 +1149,7 @@ private Manager(final Builder builder) {
11321149
this.interceptor = builder.interceptor;
11331150
this.enableUserAgentOnConnect = builder.enableUserAgentOnConnect;
11341151
this.enableCompression = builder.enableCompression;
1152+
this.reuseConnectionsForSessions = builder.reuseConnectionsForSessions;
11351153

11361154
connectionPoolSettings = new Settings.ConnectionPoolSettings();
11371155
connectionPoolSettings.maxInProcessPerConnection = builder.maxInProcessPerConnection;

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Connection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public ChannelPromise write(final RequestMessage requestMessage, final Completab
267267
return requestPromise;
268268
}
269269

270-
private void returnToPool() {
270+
public void returnToPool() {
271271
try {
272272
if (pool != null) pool.returnConnection(this);
273273
} catch (ConnectionException ce) {

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/Settings.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ final class Settings {
110110
*/
111111
public boolean enableCompression = true;
112112

113+
/**
114+
* Determines whether to use SessionedChildClient (true) or SessionedClient (false) for transactions.
115+
* SessionedChildClient reuses the existing connections whereas SessoinedClient creates a new one for every transaction.
116+
* Defaults to false for backward compatibility.
117+
*/
118+
public boolean reuseConnectionsForSessions = false;
119+
113120
/**
114121
* Read configuration from a file into a new {@link Settings} object.
115122
*
@@ -162,6 +169,9 @@ public static Settings from(final Configuration conf) {
162169
if (conf.containsKey("enableCompression"))
163170
settings.enableCompression = conf.getBoolean("enableCompression");
164171

172+
if (conf.containsKey("reuseConnectionsForSessions"))
173+
settings.reuseConnectionsForSessions = conf.getBoolean("reuseConnectionsForSessions");
174+
165175
if (conf.containsKey("hosts"))
166176
settings.hosts = conf.getList("hosts").stream().map(Object::toString).collect(Collectors.toList());
167177

gremlin-driver/src/main/java/org/apache/tinkerpop/gremlin/driver/remote/DriverRemoteConnection.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.tinkerpop.gremlin.driver.remote;
2020

2121
import org.apache.commons.configuration2.Configuration;
22+
import org.apache.tinkerpop.gremlin.driver.Channelizer;
2223
import org.apache.tinkerpop.gremlin.driver.Client;
2324
import org.apache.tinkerpop.gremlin.driver.Cluster;
2425
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
@@ -259,9 +260,24 @@ public void close() throws Exception {
259260
*/
260261
@Override
261262
public Transaction tx() {
262-
final DriverRemoteConnection session = new DriverRemoteConnection(
263-
client.getCluster().connect(UUID.randomUUID().toString()), remoteTraversalSourceName, true);
264-
return new DriverRemoteTransaction(session);
263+
if (client.getCluster().getChannelizer().equalsIgnoreCase(Channelizer.HttpChannelizer.class.getName())) {
264+
throw new IllegalStateException(String.format("Cannot use sessions or tx() with %s", Channelizer.HttpChannelizer.class.getSimpleName()));
265+
}
266+
267+
final boolean reuseConnections = client.getCluster().isReuseConnectionsForSessions();
268+
final String sessionId = UUID.randomUUID().toString();
269+
final DriverRemoteConnection connection;
270+
271+
if (reuseConnections) {
272+
client.init();
273+
final Client.SessionedChildClient childClient = new Client.SessionedChildClient(client, sessionId);
274+
connection = new DriverRemoteConnection(
275+
childClient, remoteTraversalSourceName, true);
276+
} else {
277+
connection = new DriverRemoteConnection(
278+
client.getCluster().connect(sessionId), remoteTraversalSourceName, true);
279+
}
280+
return new DriverRemoteTransaction(connection);
265281
}
266282

267283
@Override

gremlin-driver/src/test/java/org/apache/tinkerpop/gremlin/driver/SettingsTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public void shouldCreateFromConfiguration() {
4848
conf.setProperty("serializer.config.any", "thing");
4949
conf.setProperty("enableUserAgentOnConnect", false);
5050
conf.setProperty("enableCompression", false);
51+
conf.setProperty("reuseConnectionsForSessions", true);
5152
conf.setProperty("connectionPool.enableSsl", true);
5253
conf.setProperty("connectionPool.keyStore", "server.jks");
5354
conf.setProperty("connectionPool.keyStorePassword", "password2");
@@ -87,6 +88,7 @@ public void shouldCreateFromConfiguration() {
8788
assertEquals(false, settings.enableUserAgentOnConnect);
8889
assertEquals(false, settings.enableCompression);
8990
assertThat(settings.connectionPool.enableSsl, is(true));
91+
assertEquals(true, settings.reuseConnectionsForSessions);
9092
assertEquals("server.jks", settings.connectionPool.keyStore);
9193
assertEquals("password2", settings.connectionPool.keyStorePassword);
9294
assertEquals("pkcs12", settings.connectionPool.keyStoreType);

0 commit comments

Comments
 (0)