Skip to content

Commit c5f0f5b

Browse files
authored
HIVE-29420: Hive ACID: Cleaner mishandles retries of killed compactions (#6281)
1 parent 6ff00f3 commit c5f0f5b

File tree

23 files changed

+727
-508
lines changed

23 files changed

+727
-508
lines changed

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,35 @@
3737
import org.junit.Before;
3838
import org.junit.BeforeClass;
3939
import org.junit.Test;
40+
import org.junit.runner.RunWith;
41+
import org.junit.runners.Parameterized;
42+
import org.junit.runners.Parameterized.Parameters;
4043

4144
import java.io.IOException;
45+
import java.util.Arrays;
46+
import java.util.Collection;
4247

4348
import static org.junit.Assert.assertEquals;
4449

50+
@RunWith(Parameterized.class)
4551
public class TestCleanerWithReplication extends CompactorTest {
52+
4653
private Path cmRootDirectory;
4754
private static MiniDFSCluster miniDFSCluster;
4855
private final String dbName = "TestCleanerWithReplication";
4956

57+
private final boolean useMinHistoryWriteId;
58+
59+
public TestCleanerWithReplication(boolean useMinHistoryWriteId) {
60+
this.useMinHistoryWriteId = useMinHistoryWriteId;
61+
}
62+
63+
@Parameters(name = "useMinHistoryWriteId={0}")
64+
public static Collection<Object[]> parameters() {
65+
return Arrays.asList(
66+
new Object[][]{{true}, {false}});
67+
}
68+
5069
@Before
5170
public void setup() throws Exception {
5271
HiveConf conf = new HiveConf();
@@ -63,6 +82,11 @@ public void setup() throws Exception {
6382
ms.createDatabase(db);
6483
}
6584

85+
@Override
86+
protected boolean useMinHistoryWriteId() {
87+
return useMinHistoryWriteId;
88+
}
89+
6690
@BeforeClass
6791
public static void classLevelSetup() throws IOException {
6892
Configuration hadoopConf = new Configuration();

ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1380,7 +1380,7 @@ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDi
13801380
// Filter out all delta directories that are shadowed by others
13811381
findBestWorkingDeltas(writeIdList, directory);
13821382

1383-
if(directory.getOldestBase() != null && directory.getBase() == null &&
1383+
if (directory.getOldestBase() != null && directory.getBase() == null &&
13841384
isCompactedBase(directory.getOldestBase(), fs, dirSnapshots)) {
13851385
/*
13861386
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,9 @@ fs, path, getConf(), validWriteIdList, Ref.from(false), false,
163163

164164
// Make sure there are no leftovers below the compacted watermark
165165
boolean success = false;
166-
getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
166+
if (info.minOpenWriteId < 0) {
167+
getConf().set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
168+
}
167169

168170
dir = AcidUtils.getAcidState(
169171
fs, path, getConf(),

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.hadoop.hive.ql.txn.compactor.service;
1919

20-
import com.google.common.collect.ImmutableMap;
2120
import org.apache.commons.lang3.tuple.Pair;
2221
import org.apache.hadoop.fs.FileSystem;
2322
import org.apache.hadoop.fs.Path;
@@ -196,9 +195,6 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception {
196195
txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
197196
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, txnWriteIds.toString());
198197

199-
msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(),
200-
ImmutableMap.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));
201-
202198
ci.highestWriteId = tblValidWriteIds.getHighWatermark();
203199
//this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we keep metadata about
204200
//it until after any data written by it are physically removed

ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -693,9 +693,10 @@ public void testRevokeTimedOutWorkers() throws Exception {
693693
rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
694694
txnHandler.compact(rqst);
695695

696-
assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
696+
FindNextCompactRequest nextCompactRqst = aFindNextCompactRequest("fred-193892", WORKER_VERSION);
697+
assertNotNull(txnHandler.findNextToCompact(nextCompactRqst));
697698
Thread.sleep(200);
698-
assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
699+
assertNotNull(txnHandler.findNextToCompact(nextCompactRqst));
699700
txnHandler.revokeTimedoutWorkers(100);
700701

701702
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());

ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java

Lines changed: 125 additions & 130 deletions
Large diffs are not rendered by default.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.hive.ql.testutil;
20+
21+
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
22+
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
23+
import org.apache.hadoop.hive.metastore.api.MetaException;
24+
import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
25+
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
26+
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
27+
import org.apache.hadoop.hive.metastore.txn.TxnStore;
28+
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
29+
30+
import java.util.Collections;
31+
32+
import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars;
33+
34+
public final class TxnStoreHelper {
35+
36+
private final TxnStore txnHandler;
37+
38+
private TxnStoreHelper(TxnStore txnHandler) {
39+
this.txnHandler = txnHandler;
40+
}
41+
42+
public static TxnStoreHelper wrap(TxnStore txnHandler) {
43+
return new TxnStoreHelper(txnHandler);
44+
}
45+
46+
/**
47+
* Allocates a new write ID for the table in the given transaction.
48+
*/
49+
public long allocateTableWriteId(String dbName, String tblName, long txnId)
50+
throws TxnAbortedException, NoSuchTxnException, MetaException {
51+
AllocateTableWriteIdsRequest request = new AllocateTableWriteIdsRequest(dbName, tblName.toLowerCase());
52+
request.setTxnIds(Collections.singletonList(txnId));
53+
54+
AllocateTableWriteIdsResponse response = txnHandler.allocateTableWriteIds(request);
55+
return response.getTxnToWriteIds().getFirst().getWriteId();
56+
}
57+
58+
/**
59+
* Registers the min open write ID for the table in the given transaction.
60+
*/
61+
public void registerMinOpenWriteId(String dbName, String tblName, long txnId) throws MetaException {
62+
if (!ConfVars.useMinHistoryWriteId()) {
63+
return;
64+
}
65+
long maxWriteId = txnHandler.getMaxAllocatedTableWriteId(
66+
new MaxAllocatedTableWriteIdRequest(dbName, tblName.toLowerCase()))
67+
.getMaxWriteId();
68+
69+
txnHandler.addWriteIdsToMinHistory(txnId,
70+
Collections.singletonMap(
71+
TxnUtils.getFullTableName(dbName, tblName), maxWriteId + 1));
72+
}
73+
}

0 commit comments

Comments
 (0)