2424package com .datastax .oss .driver .core .resolver ;
2525
2626import static org .assertj .core .api .Assertions .assertThat ;
27+ import static org .awaitility .Awaitility .await ;
2728import static org .junit .Assert .assertFalse ;
2829import static org .junit .Assert .assertTrue ;
2930import static org .junit .Assert .fail ;
3435import com .datastax .oss .driver .api .core .config .TypedDriverOption ;
3536import com .datastax .oss .driver .api .core .cql .ResultSet ;
3637import com .datastax .oss .driver .api .core .cql .Row ;
37- import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
38- import com .datastax .oss .driver .api .core .cql .SimpleStatementBuilder ;
3938import com .datastax .oss .driver .api .core .metadata .Node ;
4039import com .datastax .oss .driver .api .testinfra .ccm .CcmBridge ;
4140import com .datastax .oss .driver .categories .IsolatedTests ;
@@ -130,30 +129,16 @@ public void replace_cluster_test() {
130129 ccmBridge .create ();
131130 ccmBridge .start ();
132131 session = builder .build ();
133- boolean allNodesUp = false ;
134- int nodesUp = 0 ;
135- for (int i = 0 ; i < CLUSTER_WAIT_SECONDS ; i ++) {
136- try {
137- Collection <Node > nodes = session .getMetadata ().getNodes ().values ();
138- nodesUp = 0 ;
139- for (Node node : nodes ) {
140- if (node .getUpSinceMillis () > 0 ) {
141- nodesUp ++;
142- }
143- }
144- if (nodesUp == numberOfNodes ) {
145- allNodesUp = true ;
146- break ;
147- }
148- Thread .sleep (1000 );
149- } catch (InterruptedException e ) {
150- break ;
151- }
152- }
153- if (!allNodesUp ) {
132+ final CqlSession firstSession = session ;
133+ try {
134+ await ()
135+ .atMost (Duration .ofSeconds (CLUSTER_WAIT_SECONDS ))
136+ .pollInterval (Duration .ofSeconds (1 ))
137+ .until (() -> countUpNodes (firstSession ) == numberOfNodes );
138+ } catch (org .awaitility .core .ConditionTimeoutException e ) {
154139 LOG .error (
155140 "Driver sees only {} nodes UP instead of {} after waiting {}s" ,
156- nodesUp ,
141+ countUpNodes ( firstSession ) ,
157142 numberOfNodes ,
158143 CLUSTER_WAIT_SECONDS );
159144 }
@@ -178,30 +163,15 @@ public void replace_cluster_test() {
178163 CcmBridge .builder ().withNodes (numberOfNodes ).withIpPrefix ("127.0.1." ).build ()) {
179164 ccmBridge .create ();
180165 ccmBridge .start ();
181- boolean allNodesUp = false ;
182- int nodesUp = 0 ;
183- for (int i = 0 ; i < CLUSTER_WAIT_SECONDS ; i ++) {
184- try {
185- Collection <Node > nodes = session .getMetadata ().getNodes ().values ();
186- nodesUp = 0 ;
187- for (Node node : nodes ) {
188- if (node .getUpSinceMillis () > 0 ) {
189- nodesUp ++;
190- }
191- }
192- if (nodesUp == numberOfNodes ) {
193- allNodesUp = true ;
194- break ;
195- }
196- Thread .sleep (1000 );
197- } catch (InterruptedException e ) {
198- break ;
199- }
200- }
201- if (!allNodesUp ) {
166+ try {
167+ await ()
168+ .atMost (Duration .ofSeconds (CLUSTER_WAIT_SECONDS ))
169+ .pollInterval (Duration .ofSeconds (1 ))
170+ .until (() -> countUpNodes (session ) == numberOfNodes );
171+ } catch (org .awaitility .core .ConditionTimeoutException e ) {
202172 LOG .error (
203173 "Driver sees only {} nodes UP instead of {} after waiting {}s" ,
204- nodesUp ,
174+ countUpNodes ( session ) ,
205175 numberOfNodes ,
206176 CLUSTER_WAIT_SECONDS );
207177 }
@@ -269,30 +239,8 @@ public void cannot_reconnect_with_resolved_socket() {
269239 ccmBridge .create ();
270240 ccmBridge .start ();
271241 session = builder .build ();
272- long endTime = System .currentTimeMillis () + CLUSTER_WAIT_SECONDS * 1000 ;
273- while (System .currentTimeMillis () < endTime ) {
274- try {
275- nodes = session .getMetadata ().getNodes ().values ();
276- int upNodes = 0 ;
277- for (Node node : nodes ) {
278- if (node .getUpSinceMillis () > 0 ) {
279- upNodes ++;
280- }
281- }
282- if (upNodes == 3 ) {
283- break ;
284- }
285- // session.refreshSchema();
286- SimpleStatement statement =
287- new SimpleStatementBuilder ("select * from system.local where key='local'" )
288- .setTimeout (Duration .ofSeconds (3 ))
289- .build ();
290- session .executeAsync (statement );
291- Thread .sleep (3000 );
292- } catch (InterruptedException e ) {
293- break ;
294- }
295- }
242+ final CqlSession firstUnusedSession = session ;
243+ awaitAllNodesUp (firstUnusedSession , 3 );
296244 ResultSet rs = session .execute ("select * from system.local where key='local'" );
297245 assertThat (rs ).isNotNull ();
298246 Row row = rs .one ();
@@ -329,29 +277,7 @@ public void cannot_reconnect_with_resolved_socket() {
329277 "test.cluster.fake" , ccmBridge .getNodeIpAddress (3 ));
330278 ccmBridge .create ();
331279 ccmBridge .start ();
332- long endTime = System .currentTimeMillis () + CLUSTER_WAIT_SECONDS * 1000 ;
333- while (System .currentTimeMillis () < endTime ) {
334- try {
335- nodes = session .getMetadata ().getNodes ().values ();
336- int upNodes = 0 ;
337- for (Node node : nodes ) {
338- if (node .getUpSinceMillis () > 0 ) {
339- upNodes ++;
340- }
341- }
342- if (upNodes == 3 ) {
343- break ;
344- }
345- SimpleStatement statement =
346- new SimpleStatementBuilder ("select * from system.local where key='local'" )
347- .setTimeout (Duration .ofSeconds (3 ))
348- .build ();
349- session .executeAsync (statement );
350- Thread .sleep (3000 );
351- } catch (InterruptedException e ) {
352- break ;
353- }
354- }
280+ awaitAllNodesUp (session , 3 );
355281 nodes = session .getMetadata ().getNodes ().values ();
356282 assertThat (nodes ).hasSize (3 );
357283 Iterator <Node > iterator = nodes .iterator ();
@@ -384,32 +310,34 @@ public void cannot_reconnect_with_resolved_socket() {
384310 // Now the driver should fail to reconnect since unresolved hostname is gone.
385311 ccmBridge .create ();
386312 ccmBridge .start ();
387- long endTime = System .currentTimeMillis () + CLUSTER_WAIT_SECONDS * 1000 ;
388- while (System .currentTimeMillis () < endTime ) {
389- try {
390- nodes = session .getMetadata ().getNodes ().values ();
391- int upNodes = 0 ;
392- for (Node node : nodes ) {
393- if (node .getUpSinceMillis () > 0 ) {
394- upNodes ++;
395- }
396- }
397- if (upNodes == 3 ) {
398- break ;
399- }
400- // session.refreshSchema();
401- SimpleStatement statement =
402- new SimpleStatementBuilder ("select * from system.local where key='local'" )
403- .setTimeout (Duration .ofSeconds (3 ))
404- .build ();
405- session .executeAsync (statement );
406- Thread .sleep (3000 );
407- } catch (InterruptedException e ) {
408- break ;
409- }
410- }
313+ awaitAllNodesUp (session , 3 );
411314 session .execute ("select * from system.local where key='local'" );
412315 }
413316 session .close ();
414317 }
318+
319+ private static int countUpNodes (CqlSession session ) {
320+ int count = 0 ;
321+ for (Node node : session .getMetadata ().getNodes ().values ()) {
322+ if (node .getUpSinceMillis () > 0 ) {
323+ count ++;
324+ }
325+ }
326+ return count ;
327+ }
328+
329+ private static void awaitAllNodesUp (CqlSession session , int expectedNodes ) {
330+ try {
331+ await ()
332+ .atMost (Duration .ofSeconds (CLUSTER_WAIT_SECONDS ))
333+ .pollInterval (Duration .ofSeconds (1 ))
334+ .until (() -> countUpNodes (session ) == expectedNodes );
335+ } catch (org .awaitility .core .ConditionTimeoutException e ) {
336+ LOG .error (
337+ "Driver sees only {} nodes UP instead of {} after waiting {}s" ,
338+ countUpNodes (session ),
339+ expectedNodes ,
340+ CLUSTER_WAIT_SECONDS );
341+ }
342+ }
415343}
0 commit comments