|
19 | 19 |
|
20 | 20 | import static org.junit.Assert.assertEquals; |
21 | 21 |
|
22 | | -import java.util.Collections; |
23 | 22 | import java.util.UUID; |
24 | 23 | import java.util.concurrent.atomic.AtomicInteger; |
25 | 24 | import org.apache.hadoop.conf.Configuration; |
| 25 | +import org.apache.hadoop.fs.Path; |
26 | 26 | import org.apache.hadoop.hbase.HBaseConfiguration; |
27 | 27 | import org.apache.hadoop.hbase.Waiter; |
28 | 28 | import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint; |
29 | | -import org.apache.hadoop.hbase.wal.WAL; |
| 29 | +import org.apache.hadoop.hbase.testclassification.MediumTests; |
| 30 | +import org.apache.hadoop.hbase.testclassification.ReplicationTests; |
30 | 31 | import org.junit.Test; |
| 32 | +import org.junit.experimental.categories.Category; |
31 | 33 | import org.mockito.Mockito; |
32 | 34 |
|
33 | 35 | /** |
34 | | - * Tests for staged WAL flush behavior in ReplicationSourceShipper. These tests validate that |
35 | | - * beforePersistingReplicationOffset() is invoked only when there is staged WAL data, and is not |
36 | | - * invoked for empty batches. |
| 36 | + * Tests staged WAL flush behavior in ReplicationSourceShipper. |
37 | 37 | */ |
| 38 | +@Category({ ReplicationTests.class, MediumTests.class }) |
38 | 39 | public class TestReplicationSourceShipperBufferedFlush { |
39 | 40 |
|
40 | | - /** |
41 | | - * ReplicationEndpoint implementation used for testing. Counts how many times |
42 | | - * beforePersistingReplicationOffset() is called. |
43 | | - */ |
44 | 41 | static class CountingReplicationEndpoint extends BaseReplicationEndpoint { |
45 | 42 |
|
46 | 43 | private final AtomicInteger beforePersistCalls = new AtomicInteger(); |
47 | 44 |
|
48 | 45 | @Override |
49 | | - protected void doStart() { |
50 | | - notifyStarted(); |
51 | | - } |
52 | | - |
53 | | - @Override |
54 | | - protected void doStop() { |
55 | | - notifyStopped(); |
| 46 | + public void start() { |
| 47 | + startAsync().awaitRunning(); |
56 | 48 | } |
57 | 49 |
|
58 | 50 | @Override |
59 | | - public UUID getPeerUUID() { |
60 | | - return null; |
| 51 | + public void stop() { |
| 52 | + stopAsync().awaitTerminated(); |
61 | 53 | } |
62 | 54 |
|
63 | 55 | @Override |
64 | | - public boolean replicate(ReplicateContext ctx) { |
65 | | - return true; |
| 56 | + protected void doStart() { |
| 57 | + notifyStarted(); |
66 | 58 | } |
67 | 59 |
|
68 | 60 | @Override |
69 | | - public void start() { |
70 | | - |
| 61 | + protected void doStop() { |
| 62 | + notifyStopped(); |
71 | 63 | } |
72 | 64 |
|
73 | 65 | @Override |
74 | | - public void stop() { |
75 | | - |
| 66 | + public boolean replicate(ReplicateContext ctx) { |
| 67 | + return true; |
76 | 68 | } |
77 | 69 |
|
78 | 70 | @Override |
79 | 71 | public void beforePersistingReplicationOffset() { |
80 | 72 | beforePersistCalls.incrementAndGet(); |
81 | 73 | } |
82 | 74 |
|
83 | | - int getBeforePersistCalls() { |
84 | | - return beforePersistCalls.get(); |
85 | | - } |
86 | | - |
87 | 75 | @Override |
88 | 76 | public long getMaxBufferSize() { |
89 | | - // Force size-based flush after any non-empty batch |
90 | | - return 1L; |
| 77 | + return 1L; // force immediate flush |
91 | 78 | } |
92 | 79 |
|
93 | 80 | @Override |
94 | 81 | public long maxFlushInterval() { |
95 | 82 | return Long.MAX_VALUE; |
96 | 83 | } |
| 84 | + |
| 85 | + @Override |
| 86 | + public UUID getPeerUUID() { |
| 87 | + return null; |
| 88 | + } |
| 89 | + |
| 90 | + int getBeforePersistCalls() { |
| 91 | + return beforePersistCalls.get(); |
| 92 | + } |
97 | 93 | } |
98 | 94 |
|
99 | 95 | @Test |
100 | 96 | public void testBeforePersistNotCalledForEmptyBatch() throws Exception { |
101 | 97 | Configuration conf = HBaseConfiguration.create(); |
102 | 98 |
|
103 | 99 | CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint(); |
| 100 | + endpoint.start(); |
104 | 101 |
|
105 | 102 | ReplicationSource source = Mockito.mock(ReplicationSource.class); |
106 | 103 | ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class); |
107 | 104 |
|
108 | | - WALEntryBatch emptyBatch = Mockito.mock(WALEntryBatch.class); |
109 | | - Mockito.when(emptyBatch.getWalEntries()).thenReturn(Collections.emptyList()); |
110 | | - |
111 | | - Mockito.when(walReader.take()).thenReturn(emptyBatch).thenReturn(null); |
112 | | - |
113 | 105 | Mockito.when(source.isPeerEnabled()).thenReturn(true); |
| 106 | + Mockito.when(source.isSourceActive()).thenReturn(true); |
114 | 107 | Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint); |
115 | 108 | Mockito.when(source.getPeerId()).thenReturn("1"); |
| 109 | + Mockito.when(source.getSourceMetrics()).thenReturn(Mockito.mock(MetricsSource.class)); |
116 | 110 |
|
117 | | - ReplicationSourceShipper shipper = |
118 | | - new ReplicationSourceShipper(conf, "wal-group", source, walReader); |
119 | | - |
120 | | - shipper.start(); |
121 | | - |
122 | | - // Give the shipper thread time to process the empty batch |
123 | | - Waiter.waitFor(conf, 3000, () -> true); |
124 | | - |
125 | | - shipper.interrupt(); |
126 | | - shipper.join(); |
| 111 | + WALEntryBatch batch = new WALEntryBatch(1, null); |
| 112 | + batch.setLastWalPath(new Path("wal")); |
| 113 | + batch.setLastWalPosition(1L); |
| 114 | + // no entries, no heap size |
127 | 115 |
|
128 | | - assertEquals("beforePersistingReplicationOffset should not be called for empty batch", 0, |
129 | | - endpoint.getBeforePersistCalls()); |
130 | | - } |
131 | | - |
132 | | - @Test |
133 | | - public void testBeforePersistCalledForNonEmptyBatch() throws Exception { |
134 | | - Configuration conf = HBaseConfiguration.create(); |
135 | | - |
136 | | - CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint(); |
137 | | - |
138 | | - ReplicationSource source = Mockito.mock(ReplicationSource.class); |
139 | | - ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class); |
140 | | - |
141 | | - WALEntryBatch batch = Mockito.mock(WALEntryBatch.class); |
142 | | - WAL.Entry entry = Mockito.mock(WAL.Entry.class); |
143 | | - |
144 | | - Mockito.when(batch.getWalEntries()).thenReturn(Collections.singletonList(entry)); |
145 | | - Mockito.when(batch.getHeapSize()).thenReturn(10L); |
146 | | - Mockito.when(batch.isEndOfFile()).thenReturn(false); |
147 | 116 | Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null); |
148 | 117 |
|
149 | | - Mockito.when(source.isPeerEnabled()).thenReturn(true); |
150 | | - Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint); |
151 | | - Mockito.when(source.getPeerId()).thenReturn("1"); |
152 | | - |
153 | 118 | ReplicationSourceShipper shipper = |
154 | 119 | new ReplicationSourceShipper(conf, "wal-group", source, walReader); |
155 | 120 |
|
156 | 121 | shipper.start(); |
157 | 122 |
|
158 | | - // Wait until beforePersistingReplicationOffset() is invoked once |
159 | | - Waiter.waitFor(conf, 5000, () -> endpoint.getBeforePersistCalls() == 1); |
| 123 | + // Allow loop to run |
| 124 | + Waiter.waitFor(conf, 3000, () -> true); |
160 | 125 |
|
161 | 126 | shipper.interrupt(); |
162 | 127 | shipper.join(); |
163 | 128 |
|
164 | | - assertEquals("beforePersistingReplicationOffset should be called exactly once", 1, |
165 | | - endpoint.getBeforePersistCalls()); |
| 129 | + assertEquals(0, endpoint.getBeforePersistCalls()); |
166 | 130 | } |
167 | 131 | } |
0 commit comments