Skip to content

Commit e6c8161

Browse files
committed
Core, API: Report metrics about deleted files in ExpireSnapshots
1 parent 026ec35 commit e6c8161

File tree

10 files changed

+332
-42
lines changed

10 files changed

+332
-42
lines changed

.palantir/revapi.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1369,6 +1369,9 @@ acceptedBreaks:
13691369
old: "class org.apache.iceberg.encryption.EncryptingFileIO"
13701370
new: "class org.apache.iceberg.encryption.EncryptingFileIO"
13711371
justification: "New method for Manifest List reading"
1372+
- code: "java.method.addedToInterface"
1373+
new: "method org.apache.iceberg.ExpireSnapshots org.apache.iceberg.ExpireSnapshots::metricsReporter(org.apache.iceberg.metrics.MetricsReporter)"
1374+
justification: "New method for reporting metrics from ExpireSnapshots"
13721375
org.apache.iceberg:iceberg-core:
13731376
- code: "java.class.noLongerInheritsFromClass"
13741377
old: "class org.apache.iceberg.rest.auth.OAuth2Manager"

api/src/main/java/org/apache/iceberg/ExpireSnapshots.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.concurrent.ExecutorService;
2323
import java.util.function.Consumer;
24+
import org.apache.iceberg.metrics.MetricsReporter;
2425

2526
/**
2627
* API for removing old {@link Snapshot snapshots} from a table.
@@ -161,4 +162,10 @@ default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
161162
throw new UnsupportedOperationException(
162163
this.getClass().getName() + " doesn't implement cleanExpiredMetadata");
163164
}
165+
166+
/** Report metrics about the ExpireSnapshots operation to the provided reporter */
167+
default ExpireSnapshots metricsReporter(MetricsReporter reporter) {
168+
throw new UnsupportedOperationException(
169+
this.getClass().getName() + " doesn't implement metricsReporter");
170+
}
164171
}

core/src/main/java/org/apache/iceberg/FileCleanupStrategy.java

Lines changed: 138 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
*/
1919
package org.apache.iceberg;
2020

21+
import java.util.Objects;
2122
import java.util.Set;
2223
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.atomic.AtomicLong;
2325
import java.util.function.Consumer;
2426
import org.apache.iceberg.exceptions.NotFoundException;
27+
import org.apache.iceberg.exceptions.ValidationException;
2528
import org.apache.iceberg.io.BulkDeletionFailureException;
2629
import org.apache.iceberg.io.CloseableIterable;
2730
import org.apache.iceberg.io.FileIO;
2831
import org.apache.iceberg.io.SupportsBulkOperations;
32+
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
2933
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3034
import org.apache.iceberg.util.Tasks;
3135
import org.slf4j.Logger;
@@ -42,6 +46,9 @@ public void accept(String file) {
4246
};
4347

4448
private static final Logger LOG = LoggerFactory.getLogger(FileCleanupStrategy.class);
49+
protected static final String MANIFEST = "manifest";
50+
protected static final String MANIFEST_LIST = "manifest list";
51+
protected static final String STATISTICS_FILES = "statistics files";
4552

4653
protected final FileIO fileIO;
4754
protected final ExecutorService planExecutorService;
@@ -72,7 +79,7 @@ protected FileCleanupStrategy(
7279
* @param afterExpiration table metadata after snapshot expiration
7380
* @param cleanupLevel controls which types of files are eligible for deletion
7481
*/
75-
public abstract void cleanFiles(
82+
public abstract DeleteSummary cleanFiles(
7683
TableMetadata beforeExpiration,
7784
TableMetadata afterExpiration,
7885
ExpireSnapshots.CleanupLevel cleanupLevel);
@@ -99,8 +106,9 @@ protected CloseableIterable<ManifestFile> readManifests(Snapshot snapshot) {
99106
}
100107
}
101108

102-
protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
109+
protected void deleteFiles(Set<String> pathsToDelete, String fileType, DeleteSummary summary) {
103110
if (deleteFunc == null && fileIO instanceof SupportsBulkOperations) {
111+
int failures = 0;
104112
try {
105113
((SupportsBulkOperations) fileIO).deleteFiles(pathsToDelete);
106114
} catch (BulkDeletionFailureException e) {
@@ -110,9 +118,11 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
110118
pathsToDelete.size(),
111119
fileType,
112120
e);
121+
failures = e.numberFailedObjects();
113122
} catch (RuntimeException e) {
114123
LOG.warn("Bulk deletion failed", e);
115124
}
125+
summary.deletedFiles(fileType, pathsToDelete.size() - failures);
116126
} else {
117127
Consumer<String> deleteFuncToUse = deleteFunc == null ? defaultDeleteFunc : deleteFunc;
118128

@@ -124,7 +134,92 @@ protected void deleteFiles(Set<String> pathsToDelete, String fileType) {
124134
.suppressFailureWhenFinished()
125135
.onFailure(
126136
(file, thrown) -> LOG.warn("Delete failed for {} file: {}", fileType, file, thrown))
127-
.run(deleteFuncToUse::accept);
137+
.run(
138+
file -> {
139+
deleteFuncToUse.accept(file);
140+
summary.deletedFile(fileType);
141+
});
142+
}
143+
}
144+
145+
static class DeleteSummary {
146+
private final AtomicLong dataFilesCount = new AtomicLong(0L);
147+
private final AtomicLong positionDeleteFilesCount = new AtomicLong(0L);
148+
private final AtomicLong equalityDeleteFilesCount = new AtomicLong(0L);
149+
private final AtomicLong manifestsCount = new AtomicLong(0L);
150+
private final AtomicLong manifestListsCount = new AtomicLong(0L);
151+
private final AtomicLong statisticsFilesCount = new AtomicLong(0L);
152+
153+
public void deletedFiles(String type, int numFiles) {
154+
if (FileContent.DATA.name().equalsIgnoreCase(type)) {
155+
dataFilesCount.addAndGet(numFiles);
156+
157+
} else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
158+
positionDeleteFilesCount.addAndGet(numFiles);
159+
160+
} else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
161+
equalityDeleteFilesCount.addAndGet(numFiles);
162+
163+
} else if (MANIFEST.equalsIgnoreCase(type)) {
164+
manifestsCount.addAndGet(numFiles);
165+
166+
} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
167+
manifestListsCount.addAndGet(numFiles);
168+
169+
} else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
170+
statisticsFilesCount.addAndGet(numFiles);
171+
172+
} else {
173+
throw new ValidationException("Illegal file type: %s", type);
174+
}
175+
}
176+
177+
public void deletedFile(String type) {
178+
if (FileContent.DATA.name().equalsIgnoreCase(type)) {
179+
dataFilesCount.incrementAndGet();
180+
181+
} else if (FileContent.POSITION_DELETES.name().equalsIgnoreCase(type)) {
182+
positionDeleteFilesCount.incrementAndGet();
183+
184+
} else if (FileContent.EQUALITY_DELETES.name().equalsIgnoreCase(type)) {
185+
equalityDeleteFilesCount.incrementAndGet();
186+
187+
} else if (MANIFEST.equalsIgnoreCase(type)) {
188+
manifestsCount.incrementAndGet();
189+
190+
} else if (MANIFEST_LIST.equalsIgnoreCase(type)) {
191+
manifestListsCount.incrementAndGet();
192+
193+
} else if (STATISTICS_FILES.equalsIgnoreCase(type)) {
194+
statisticsFilesCount.incrementAndGet();
195+
196+
} else {
197+
throw new ValidationException("Illegal file type: %s", type);
198+
}
199+
}
200+
201+
public long dataFilesCount() {
202+
return dataFilesCount.get();
203+
}
204+
205+
public long positionDeleteFilesCount() {
206+
return positionDeleteFilesCount.get();
207+
}
208+
209+
public long equalityDeleteFilesCount() {
210+
return equalityDeleteFilesCount.get();
211+
}
212+
213+
public long manifestsCount() {
214+
return manifestsCount.get();
215+
}
216+
217+
public long manifestListsCount() {
218+
return manifestListsCount.get();
219+
}
220+
221+
public long statisticsFilesCount() {
222+
return statisticsFilesCount.get();
128223
}
129224
}
130225

@@ -141,6 +236,46 @@ protected Set<String> expiredStatisticsFilesLocations(
141236
return Sets.difference(statsFileLocationsBeforeExpiration, statsFileLocationsAfterExpiration);
142237
}
143238

239+
protected static class FileInfo {
240+
private final FileContent content;
241+
private final String path;
242+
243+
public FileInfo(FileContent content, String path) {
244+
this.content = content;
245+
this.path = path;
246+
}
247+
248+
public FileContent getContent() {
249+
return content;
250+
}
251+
252+
public String getPath() {
253+
return path;
254+
}
255+
256+
@Override
257+
public boolean equals(Object other) {
258+
if (this == other) {
259+
return true;
260+
} else if (other == null || getClass() != other.getClass()) {
261+
return false;
262+
}
263+
264+
FileInfo fileInfo = (FileInfo) other;
265+
return Objects.equals(content, fileInfo.content) && Objects.equals(path, fileInfo.path);
266+
}
267+
268+
@Override
269+
public int hashCode() {
270+
return Objects.hash(content, path);
271+
}
272+
273+
@Override
274+
public String toString() {
275+
return MoreObjects.toStringHelper(this).add("content", content).add("path", path).toString();
276+
}
277+
}
278+
144279
private Set<String> statsFileLocations(TableMetadata tableMetadata) {
145280
Set<String> statsFileLocations = Sets.newHashSet();
146281

core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.ExecutorService;
2727
import java.util.function.Consumer;
28+
import java.util.stream.Collectors;
2829
import org.apache.iceberg.exceptions.RuntimeIOException;
2930
import org.apache.iceberg.io.CloseableIterable;
3031
import org.apache.iceberg.io.FileIO;
@@ -48,18 +49,19 @@ class IncrementalFileCleanup extends FileCleanupStrategy {
4849

4950
@Override
5051
@SuppressWarnings({"checkstyle:CyclomaticComplexity", "MethodLength"})
51-
public void cleanFiles(
52+
public DeleteSummary cleanFiles(
5253
TableMetadata beforeExpiration,
5354
TableMetadata afterExpiration,
5455
ExpireSnapshots.CleanupLevel cleanupLevel) {
56+
DeleteSummary summary = new DeleteSummary();
5557
// clean up required underlying files based on the expired snapshots
5658
// 1. Get a list of the snapshots that were removed
5759
// 2. Delete any data files that were deleted by those snapshots and are not in the table
5860
// 3. Delete any manifests that are no longer used by current snapshots
5961
// 4. Delete the manifest lists
6062
if (ExpireSnapshots.CleanupLevel.NONE == cleanupLevel) {
6163
LOG.info("Nothing to clean.");
62-
return;
64+
return summary;
6365
}
6466

6567
Set<Long> validIds = Sets.newHashSet();
@@ -79,12 +81,12 @@ public void cleanFiles(
7981

8082
if (expiredIds.isEmpty()) {
8183
// if no snapshots were expired, skip cleanup
82-
return;
84+
return summary;
8385
}
8486

8587
Snapshot latest = beforeExpiration.currentSnapshot();
8688
if (latest == null) {
87-
return;
89+
return summary;
8890
}
8991

9092
List<Snapshot> snapshots = afterExpiration.snapshots();
@@ -259,32 +261,44 @@ public void cleanFiles(
259261
});
260262

261263
if (ExpireSnapshots.CleanupLevel.ALL == cleanupLevel) {
262-
Set<String> filesToDelete =
264+
Set<FileInfo> filesToDelete =
263265
findFilesToDelete(
264266
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());
265-
LOG.debug("Deleting {} data files", filesToDelete.size());
266-
deleteFiles(filesToDelete, "data");
267+
Map<FileContent, Set<String>> groupedFilesToDelete =
268+
filesToDelete.stream()
269+
.collect(
270+
Collectors.groupingBy(
271+
FileInfo::getContent,
272+
Collectors.mapping(FileInfo::getPath, Collectors.toSet())));
273+
274+
for (Map.Entry<FileContent, Set<String>> entry : groupedFilesToDelete.entrySet()) {
275+
Set<String> filesToDeleteGroup = entry.getValue();
276+
String fileType = entry.getKey().name();
277+
LOG.debug("Deleting {} {} files", filesToDeleteGroup.size(), fileType);
278+
deleteFiles(filesToDeleteGroup, fileType, summary);
279+
}
267280
}
268281

269282
LOG.debug("Deleting {} manifest files", manifestsToDelete.size());
270-
deleteFiles(manifestsToDelete, "manifest");
283+
deleteFiles(manifestsToDelete, MANIFEST, summary);
271284
LOG.debug("Deleting {} manifest-list files", manifestListsToDelete.size());
272-
deleteFiles(manifestListsToDelete, "manifest list");
285+
deleteFiles(manifestListsToDelete, MANIFEST_LIST, summary);
273286

274287
if (hasAnyStatisticsFiles(beforeExpiration)) {
275288
Set<String> expiredStatisticsFilesLocations =
276289
expiredStatisticsFilesLocations(beforeExpiration, afterExpiration);
277290
LOG.debug("Deleting {} statistics files", expiredStatisticsFilesLocations.size());
278-
deleteFiles(expiredStatisticsFilesLocations, "statistics files");
291+
deleteFiles(expiredStatisticsFilesLocations, STATISTICS_FILES, summary);
279292
}
293+
return summary;
280294
}
281295

282-
private Set<String> findFilesToDelete(
296+
private Set<FileInfo> findFilesToDelete(
283297
Set<ManifestFile> manifestsToScan,
284298
Set<ManifestFile> manifestsToRevert,
285299
Set<Long> validIds,
286300
Map<Integer, PartitionSpec> specsById) {
287-
Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
301+
Set<FileInfo> filesToDelete = ConcurrentHashMap.newKeySet();
288302
Tasks.foreach(manifestsToScan)
289303
.retry(3)
290304
.suppressFailureWhenFinished()
@@ -295,14 +309,16 @@ private Set<String> findFilesToDelete(
295309
.run(
296310
manifest -> {
297311
// the manifest has deletes, scan it to find files to delete
298-
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
299-
for (ManifestEntry<?> entry : reader.entries()) {
312+
try (ManifestReader<? extends ContentFile<?>> reader =
313+
ManifestFiles.open(manifest, fileIO, specsById)) {
314+
for (ManifestEntry<? extends ContentFile<?>> entry : reader.entries()) {
300315
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
301316
// deleted
302317
if (entry.status() == ManifestEntry.Status.DELETED
303318
&& !validIds.contains(entry.snapshotId())) {
304319
// use toString to ensure the path will not change (Utf8 is reused)
305-
filesToDelete.add(entry.file().location());
320+
ContentFile<?> file = entry.file();
321+
filesToDelete.add(new FileInfo(file.content(), file.location()));
306322
}
307323
}
308324
} catch (IOException e) {
@@ -320,12 +336,14 @@ private Set<String> findFilesToDelete(
320336
.run(
321337
manifest -> {
322338
// the manifest has deletes, scan it to find files to delete
323-
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
324-
for (ManifestEntry<?> entry : reader.entries()) {
339+
try (ManifestReader<? extends ContentFile<?>> reader =
340+
ManifestFiles.open(manifest, fileIO, specsById)) {
341+
for (ManifestEntry<? extends ContentFile<?>> entry : reader.entries()) {
325342
// delete any ADDED file from manifests that were reverted
326343
if (entry.status() == ManifestEntry.Status.ADDED) {
327344
// use toString to ensure the path will not change (Utf8 is reused)
328-
filesToDelete.add(entry.file().location());
345+
ContentFile<?> file = entry.file();
346+
filesToDelete.add(new FileInfo(file.content(), file.location()));
329347
}
330348
}
331349
} catch (IOException e) {

core/src/main/java/org/apache/iceberg/ManifestFiles.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.github.benmanes.caffeine.cache.Cache;
2222
import com.github.benmanes.caffeine.cache.Caffeine;
2323
import java.io.IOException;
24+
import java.util.Collection;
2425
import java.util.Map;
2526
import org.apache.iceberg.ManifestReader.FileType;
2627
import org.apache.iceberg.avro.AvroEncoderUtil;
@@ -93,16 +94,16 @@ public static CacheMetricsReport contentCacheStats(FileIO io) {
9394
}
9495

9596
/**
96-
* Returns a {@link CloseableIterable} of file paths in the {@link ManifestFile}.
97+
* Returns a {@link CloseableIterable} of {@link DataFile}s in the {@link ManifestFile}.
9798
*
9899
* @param manifest a ManifestFile
99100
* @param io a FileIO
100101
* @return a manifest reader
101102
*/
102-
public static CloseableIterable<String> readPaths(ManifestFile manifest, FileIO io) {
103+
public static CloseableIterable<DataFile> readColumns(
104+
ManifestFile manifest, FileIO io, Collection<String> columns) {
103105
return CloseableIterable.transform(
104-
read(manifest, io, null).select(ImmutableList.of("file_path")).liveEntries(),
105-
entry -> entry.file().location());
106+
read(manifest, io, null).select(columns).liveEntries(), ManifestEntry::file);
106107
}
107108

108109
/**

0 commit comments

Comments
 (0)