1919
2020package org .apache .iceberg ;
2121
22+ import java .util .Set ;
23+ import java .util .stream .Collectors ;
2224import org .apache .iceberg .hive .HiveTableOperations ;
2325import org .apache .iceberg .hive .StagingTableOperations ;
26+ import org .apache .iceberg .io .FileIO ;
27+ import org .apache .iceberg .relocated .com .google .common .collect .Sets ;
28+ import org .apache .iceberg .util .Tasks ;
29+ import org .slf4j .Logger ;
30+ import org .slf4j .LoggerFactory ;
2431
2532/**
2633 * Transaction implementation that stages metadata changes for atomic batch HMS updates across
3138 */
3239public class HiveTransaction extends BaseTransaction {
3340
41+ private static final Logger LOG = LoggerFactory .getLogger (HiveTransaction .class );
42+
3443 private final HiveTableOperations hiveOps ;
3544 private final StagingTableOperations stagingOps ;
3645
@@ -51,4 +60,92 @@ public HiveTableOperations ops() {
5160 public StagingTableOperations stagingOps () {
5261 return stagingOps ;
5362 }
63+
64+ /**
65+ * Cleans up all artifacts produced by the staged commit: new manifests, manifest lists,
66+ * the metadata JSON file, and uncommitted files.
67+ *
68+ * <p>Called by the coordinator when the batch HMS update fails after staging succeeded.
69+ */
70+ public void cleanUpOnCommitFailure () {
71+ // clean up manifests and manifest lists from new snapshots
72+ cleanAllUpdates ();
73+
74+ // delete the staged metadata JSON file
75+ deleteMetadataFile ();
76+
77+ // delete uncommitted files tracked by the base transaction
78+ deleteUncommittedFiles ();
79+ }
80+
81+ /**
82+ * Deletes manifest files and manifest lists produced by new snapshots in this transaction.
83+ * Uses metadata diff (current vs start) to identify new snapshots and their artifacts.
84+ */
85+ private void cleanAllUpdates () {
86+ FileIO io = stagingOps .io ();
87+
88+ // Collect all manifest paths from the base metadata — these must NOT be deleted
89+ Set <String > baseManifestPaths = Sets .newHashSet ();
90+ for (Snapshot snapshot : startMetadata ().snapshots ()) {
91+ try {
92+ snapshot .allManifests (io ).forEach (m -> baseManifestPaths .add (m .path ()));
93+ } catch (RuntimeException e ) {
94+ LOG .warn ("Failed to read base manifests for cleanup" , e );
95+ }
96+ }
97+
98+ // Find new snapshots added by this transaction and clean their artifacts
99+ Set <Long > baseSnapshotIds = startMetadata ().snapshots ().stream ()
100+ .map (Snapshot ::snapshotId )
101+ .collect (Collectors .toSet ());
102+
103+ for (Snapshot snapshot : currentMetadata ().snapshots ()) {
104+ if (baseSnapshotIds .contains (snapshot .snapshotId ())) {
105+ continue ;
106+ }
107+
108+ // Delete new manifest files (not from base)
109+ try {
110+ for (ManifestFile manifest : snapshot .allManifests (io )) {
111+ if (!baseManifestPaths .contains (manifest .path ())) {
112+ io .deleteFile (manifest .path ());
113+ }
114+ }
115+ } catch (RuntimeException e ) {
116+ LOG .warn ("Failed to clean manifests for snapshot {}" , snapshot .snapshotId (), e );
117+ }
118+
119+ // Delete the manifest list
120+ try {
121+ io .deleteFile (snapshot .manifestListLocation ());
122+ } catch (RuntimeException e ) {
123+ LOG .warn ("Failed to clean manifest list {}" , snapshot .manifestListLocation (), e );
124+ }
125+ }
126+ }
127+
128+ /**
129+ * Deletes the staged metadata JSON file written by StagingTableOperations.doCommit().
130+ */
131+ private void deleteMetadataFile () {
132+ String metadataLocation = stagingOps .metadataLocation ();
133+ if (metadataLocation != null ) {
134+ try {
135+ stagingOps .io ().deleteFile (metadataLocation );
136+ } catch (RuntimeException e ) {
137+ LOG .warn ("Failed to clean metadata file {}" , metadataLocation , e );
138+ }
139+ }
140+ }
141+
142+ /**
143+ * Deletes uncommitted files tracked during the transaction (e.g. replaced data files).
144+ */
145+ private void deleteUncommittedFiles () {
146+ Tasks .foreach (deletedFiles ())
147+ .suppressFailureWhenFinished ()
148+ .onFailure ((file , exc ) -> LOG .warn ("Failed to delete uncommitted file: {}" , file , exc ))
149+ .run (stagingOps .io ()::deleteFile );
150+ }
54151}
0 commit comments