Skip to content

Commit 2300517

Browse files
committed
fix(#384): Ignore local racks for LWT statements in DefaultLoadBalancingPolicy.newQUeryPlan().
1 parent bbf89a2 commit 2300517

File tree

1 file changed

+17
-7
lines changed

1 file changed

+17
-7
lines changed

core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/DefaultLoadBalancingPolicy.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ protected Optional<String> discoverLocalDc(@NonNull Map<UUID, Node> nodes) {
135135
@Override
136136
public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session session) {
137137
List<Node> replicas = getReplicas(request, session);
138-
boolean isLWT = Objects.nonNull(request) && request.getRequestType() == RequestRoutingType.LWT;
138+
RequestRoutingType requestType =
139+
Objects.nonNull(request) ? request.getRequestType() : RequestRoutingType.REGULAR;
140+
boolean isLWT = requestType == RequestRoutingType.LWT;
139141
Object[] currentNodes =
140142
isLWT ? replicas.toArray() : getLiveNodes().dc(getLocalDatacenter()).toArray();
141143
if (Objects.nonNull(request)
@@ -145,12 +147,13 @@ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session ses
145147

146148
int replicaCount = 0; // in currentNodes
147149
if (!replicas.isEmpty()) {
148-
Pair<Integer, Integer> counts = moveReplicasToFront(currentNodes, replicas);
150+
Pair<Integer, Integer> counts = moveReplicasToFront(requestType, currentNodes, replicas);
149151
replicaCount = counts.getLeft();
150152
int localRackReplicaCount = counts.getRight(); // in currentNodes
151153

152154
if (replicaCount > 1) {
153-
shuffleLocalRackReplicasAndReplicas(currentNodes, replicaCount, localRackReplicaCount);
155+
shuffleLocalRackReplicasAndReplicas(
156+
requestType, currentNodes, replicaCount, localRackReplicaCount);
154157

155158
if (replicaCount > 2 && avoidSlowReplicas) {
156159
avoidSlowReplicas(Objects.requireNonNull(session), currentNodes, replicaCount);
@@ -171,13 +174,15 @@ public Queue<Node> newQueryPlan(@Nullable Request request, @Nullable Session ses
171174
return maybeAddDcFailover(request, plan);
172175
}
173176

174-
private Pair<Integer, Integer> moveReplicasToFront(Object[] currentNodes, Set<Node> allReplicas) {
177+
private Pair<Integer, Integer> moveReplicasToFront(
178+
RequestRoutingType routingType, Object[] currentNodes, List<Node> allReplicas) {
175179
int replicaCount = 0, localRackReplicaCount = 0;
176180
for (int i = 0; i < currentNodes.length; i++) {
177181
Node node = (Node) currentNodes[i];
178182
if (allReplicas.contains(node)) {
179183
if (Objects.equals(node.getRack(), getLocalRack())
180-
&& Objects.equals(node.getDatacenter(), getLocalDatacenter())) {
184+
&& Objects.equals(node.getDatacenter(), getLocalDatacenter())
185+
&& routingType != RequestRoutingType.LWT) {
181186
ArrayUtils.bubbleUp(currentNodes, i, localRackReplicaCount);
182187
localRackReplicaCount++;
183188
} else {
@@ -190,8 +195,13 @@ private Pair<Integer, Integer> moveReplicasToFront(Object[] currentNodes, Set<No
190195
}
191196

192197
private void shuffleLocalRackReplicasAndReplicas(
193-
Object[] currentNodes, int replicaCount, int localRackReplicaCount) {
194-
if (getLocalRack() != null && localRackReplicaCount > 0) {
198+
RequestRoutingType routingType,
199+
Object[] currentNodes,
200+
int replicaCount,
201+
int localRackReplicaCount) {
202+
if (routingType != RequestRoutingType.LWT
203+
&& getLocalRack() != null
204+
&& localRackReplicaCount > 0) {
195205
// Shuffle only replicas that are in the local rack
196206
shuffleHead(currentNodes, localRackReplicaCount);
197207
// Shuffles only replicas that are not in local rack

0 commit comments

Comments
 (0)