Skip to content

Commit ea8ed6a

Browse files
committed
HBASE-29823 Control WAL flush and offset persistence from ReplicationSourceShipper
1 parent f9e3536 commit ea8ed6a

File tree

3 files changed

+244
-7
lines changed

3 files changed

+244
-7
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,4 +283,29 @@ public int getTimeout() {
283283
* @throws IllegalStateException if this service's state isn't FAILED.
284284
*/
285285
Throwable failureCause();
286+
287+
// WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup
288+
// file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
289+
// ContinuousBackupReplicationEndpoint
290+
// and -1 for other ReplicationEndpoint since they don't buffer.
291+
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
292+
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
293+
default long getMaxBufferSize() {
294+
return -1;
295+
}
296+
297+
// WAL entries are buffered in ContinuousBackupReplicationEndpoint before flushing to WAL backup
298+
// file. So we return config value CONF_STAGED_WAL_FLUSH_INTERVAL for
299+
// ContinuousBackupReplicationEndpoint
300+
// and Long.MAX_VALUE for other ReplicationEndpoint since they don't buffer.
301+
// For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we update replication
302+
// offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
303+
default long maxFlushInterval() {
304+
return Long.MAX_VALUE;
305+
}
306+
307+
// Used in ContinuousBackupReplicationEndpoint to flush/close WAL backup files
308+
default void beforePersistingReplicationOffset() {
309+
310+
}
286311
}

hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java

Lines changed: 52 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.hadoop.hbase.replication.ReplicationUtils.sleepForRetries;
2222

2323
import java.io.IOException;
24+
import java.util.ArrayList;
2425
import java.util.List;
2526
import org.apache.hadoop.conf.Configuration;
2627
import org.apache.hadoop.fs.Path;
@@ -74,6 +75,10 @@ public enum WorkerState {
7475
private final int DEFAULT_TIMEOUT = 20000;
7576
private final int getEntriesTimeout;
7677
private final int shipEditsTimeout;
78+
private long stagedWalSize = 0L;
79+
private long lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
80+
private WALEntryBatch lastShippedBatch;
81+
private final List<Entry> entriesForCleanUpHFileRefs = new ArrayList<>();
7782

7883
public ReplicationSourceShipper(Configuration conf, String walGroupId, ReplicationSource source,
7984
ReplicationSourceWALReader walReader) {
@@ -98,6 +103,10 @@ public final void run() {
98103
LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", this.walGroupId);
99104
// Loop until we close down
100105
while (isActive()) {
106+
// check if flush needed for WAL backup, this is need for timeout based flush
107+
if (shouldPersistLogPosition()) {
108+
persistLogPosition();
109+
}
101110
// Sleep until replication is enabled again
102111
if (!source.isPeerEnabled()) {
103112
// The peer enabled check is in memory, not expensive, so do not need to increase the
@@ -155,7 +164,8 @@ private void shipEdits(WALEntryBatch entryBatch) {
155164
List<Entry> entries = entryBatch.getWalEntries();
156165
int sleepMultiplier = 0;
157166
if (entries.isEmpty()) {
158-
updateLogPosition(entryBatch);
167+
lastShippedBatch = entryBatch;
168+
persistLogPosition();
159169
return;
160170
}
161171
int currentSize = (int) entryBatch.getHeapSize();
@@ -190,13 +200,13 @@ private void shipEdits(WALEntryBatch entryBatch) {
190200
} else {
191201
sleepMultiplier = Math.max(sleepMultiplier - 1, 0);
192202
}
193-
// Clean up hfile references
194-
for (Entry entry : entries) {
195-
cleanUpHFileRefs(entry.getEdit());
196-
LOG.trace("shipped entry {}: ", entry);
203+
204+
stagedWalSize += currentSize;
205+
entriesForCleanUpHFileRefs.addAll(entries);
206+
lastShippedBatch = entryBatch;
207+
if (shouldPersistLogPosition()) {
208+
persistLogPosition();
197209
}
198-
// Log and clean up WAL logs
199-
updateLogPosition(entryBatch);
200210

201211
// offsets totalBufferUsed by deducting shipped batchSize (excludes bulk load size)
202212
// this sizeExcludeBulkLoad has to use same calculation that when calling
@@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
229239
}
230240
}
231241

242+
private boolean shouldPersistLogPosition() {
243+
if (stagedWalSize == 0 || lastShippedBatch == null) {
244+
return false;
245+
}
246+
return (stagedWalSize >= source.getReplicationEndpoint().getMaxBufferSize())
247+
|| (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs
248+
>= source.getReplicationEndpoint().maxFlushInterval());
249+
}
250+
251+
private void persistLogPosition() {
252+
if (lastShippedBatch == null) {
253+
return;
254+
}
255+
if (stagedWalSize > 0) {
256+
source.getReplicationEndpoint().beforePersistingReplicationOffset();
257+
}
258+
stagedWalSize = 0;
259+
lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
260+
261+
// Clean up hfile references
262+
for (Entry entry : entriesForCleanUpHFileRefs) {
263+
try {
264+
cleanUpHFileRefs(entry.getEdit());
265+
} catch (IOException e) {
266+
LOG.warn("{} threw unknown exception:",
267+
source.getReplicationEndpoint().getClass().getName(), e);
268+
}
269+
LOG.trace("shipped entry {}: ", entry);
270+
}
271+
entriesForCleanUpHFileRefs.clear();
272+
273+
// Log and clean up WAL logs
274+
updateLogPosition(lastShippedBatch);
275+
}
276+
232277
private void cleanUpHFileRefs(WALEdit edit) throws IOException {
233278
String peerId = source.getPeerId();
234279
if (peerId.contains("-")) {
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.replication.regionserver;
19+
20+
import static org.junit.Assert.assertEquals;
21+
22+
import java.util.Collections;
23+
import java.util.UUID;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import org.apache.hadoop.conf.Configuration;
26+
import org.apache.hadoop.hbase.HBaseConfiguration;
27+
import org.apache.hadoop.hbase.Waiter;
28+
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
29+
import org.apache.hadoop.hbase.wal.WAL;
30+
import org.junit.Test;
31+
import org.mockito.Mockito;
32+
33+
/**
34+
* Tests for staged WAL flush behavior in ReplicationSourceShipper. These tests validate that
35+
* beforePersistingReplicationOffset() is invoked only when there is staged WAL data, and is not
36+
* invoked for empty batches.
37+
*/
38+
public class TestReplicationSourceShipperBufferedFlush {
39+
40+
/**
41+
* ReplicationEndpoint implementation used for testing. Counts how many times
42+
* beforePersistingReplicationOffset() is called.
43+
*/
44+
static class CountingReplicationEndpoint extends BaseReplicationEndpoint {
45+
46+
private final AtomicInteger beforePersistCalls = new AtomicInteger();
47+
48+
@Override
49+
protected void doStart() {
50+
notifyStarted();
51+
}
52+
53+
@Override
54+
protected void doStop() {
55+
notifyStopped();
56+
}
57+
58+
@Override
59+
public UUID getPeerUUID() {
60+
return null;
61+
}
62+
63+
@Override
64+
public boolean replicate(ReplicateContext ctx) {
65+
return true;
66+
}
67+
68+
@Override
69+
public void start() {
70+
71+
}
72+
73+
@Override
74+
public void stop() {
75+
76+
}
77+
78+
@Override
79+
public void beforePersistingReplicationOffset() {
80+
beforePersistCalls.incrementAndGet();
81+
}
82+
83+
int getBeforePersistCalls() {
84+
return beforePersistCalls.get();
85+
}
86+
87+
@Override
88+
public long getMaxBufferSize() {
89+
// Force size-based flush after any non-empty batch
90+
return 1L;
91+
}
92+
93+
@Override
94+
public long maxFlushInterval() {
95+
return Long.MAX_VALUE;
96+
}
97+
}
98+
99+
@Test
100+
public void testBeforePersistNotCalledForEmptyBatch() throws Exception {
101+
Configuration conf = HBaseConfiguration.create();
102+
103+
CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
104+
105+
ReplicationSource source = Mockito.mock(ReplicationSource.class);
106+
ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class);
107+
108+
WALEntryBatch emptyBatch = Mockito.mock(WALEntryBatch.class);
109+
Mockito.when(emptyBatch.getWalEntries()).thenReturn(Collections.emptyList());
110+
111+
Mockito.when(walReader.take()).thenReturn(emptyBatch).thenReturn(null);
112+
113+
Mockito.when(source.isPeerEnabled()).thenReturn(true);
114+
Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
115+
Mockito.when(source.getPeerId()).thenReturn("1");
116+
117+
ReplicationSourceShipper shipper =
118+
new ReplicationSourceShipper(conf, "wal-group", source, walReader);
119+
120+
shipper.start();
121+
122+
// Give the shipper thread time to process the empty batch
123+
Waiter.waitFor(conf, 3000, () -> true);
124+
125+
shipper.interrupt();
126+
shipper.join();
127+
128+
assertEquals("beforePersistingReplicationOffset should not be called for empty batch", 0,
129+
endpoint.getBeforePersistCalls());
130+
}
131+
132+
@Test
133+
public void testBeforePersistCalledForNonEmptyBatch() throws Exception {
134+
Configuration conf = HBaseConfiguration.create();
135+
136+
CountingReplicationEndpoint endpoint = new CountingReplicationEndpoint();
137+
138+
ReplicationSource source = Mockito.mock(ReplicationSource.class);
139+
ReplicationSourceWALReader walReader = Mockito.mock(ReplicationSourceWALReader.class);
140+
141+
WALEntryBatch batch = Mockito.mock(WALEntryBatch.class);
142+
WAL.Entry entry = Mockito.mock(WAL.Entry.class);
143+
144+
Mockito.when(batch.getWalEntries()).thenReturn(Collections.singletonList(entry));
145+
Mockito.when(batch.getHeapSize()).thenReturn(10L);
146+
Mockito.when(batch.isEndOfFile()).thenReturn(false);
147+
Mockito.when(walReader.take()).thenReturn(batch).thenReturn(null);
148+
149+
Mockito.when(source.isPeerEnabled()).thenReturn(true);
150+
Mockito.when(source.getReplicationEndpoint()).thenReturn(endpoint);
151+
Mockito.when(source.getPeerId()).thenReturn("1");
152+
153+
ReplicationSourceShipper shipper =
154+
new ReplicationSourceShipper(conf, "wal-group", source, walReader);
155+
156+
shipper.start();
157+
158+
// Wait until beforePersistingReplicationOffset() is invoked once
159+
Waiter.waitFor(conf, 5000, () -> endpoint.getBeforePersistCalls() == 1);
160+
161+
shipper.interrupt();
162+
shipper.join();
163+
164+
assertEquals("beforePersistingReplicationOffset should be called exactly once", 1,
165+
endpoint.getBeforePersistCalls());
166+
}
167+
}

0 commit comments

Comments
 (0)