Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 415e2fd

Browse files
committedMar 10, 2025·
Add number option to preserve in Iceberg expire_snapshots
1 parent 58ecb8f commit 415e2fd

File tree

4 files changed

+66
-7
lines changed

4 files changed

+66
-7
lines changed
 

‎plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
import org.apache.iceberg.DataFiles;
143143
import org.apache.iceberg.DeleteFile;
144144
import org.apache.iceberg.DeleteFiles;
145+
import org.apache.iceberg.ExpireSnapshots;
145146
import org.apache.iceberg.FileFormat;
146147
import org.apache.iceberg.FileMetadata;
147148
import org.apache.iceberg.FileScanTask;
@@ -1707,12 +1708,13 @@ private Optional<ConnectorTableExecuteHandle> getTableHandleForDropExtendedStats
17071708
private Optional<ConnectorTableExecuteHandle> getTableHandleForExpireSnapshots(ConnectorSession session, IcebergTableHandle tableHandle, Map<String, Object> executeProperties)
17081709
{
17091710
Duration retentionThreshold = (Duration) executeProperties.get(RETENTION_THRESHOLD);
1711+
Optional<Integer> retainLast = Optional.ofNullable((Integer) executeProperties.get("retain_last"));
17101712
Table icebergTable = catalog.loadTable(session, tableHandle.getSchemaTableName());
17111713

17121714
return Optional.of(new IcebergTableExecuteHandle(
17131715
tableHandle.getSchemaTableName(),
17141716
EXPIRE_SNAPSHOTS,
1715-
new IcebergExpireSnapshotsHandle(retentionThreshold),
1717+
new IcebergExpireSnapshotsHandle(retentionThreshold, retainLast),
17161718
icebergTable.location(),
17171719
icebergTable.io().properties()));
17181720
}
@@ -2174,11 +2176,12 @@ private void executeExpireSnapshots(ConnectorSession session, IcebergTableExecut
21742176
}
21752177
};
21762178

2179+
ExpireSnapshots expireSnapshots = table.expireSnapshots()
2180+
.expireOlderThan(expireTimestampMillis)
2181+
.deleteWith(deleteFunction);
2182+
expireSnapshotsHandle.retainLast().ifPresent(expireSnapshots::retainLast);
21772183
try {
2178-
table.expireSnapshots()
2179-
.expireOlderThan(expireTimestampMillis)
2180-
.deleteWith(deleteFunction)
2181-
.commit();
2184+
expireSnapshots.commit();
21822185

21832186
fileSystem.deleteFiles(pathsToDelete);
21842187
}

‎plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/ExpireSnapshotsTableProcedure.java

+13
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
import com.google.common.collect.ImmutableList;
1717
import com.google.inject.Provider;
1818
import io.airlift.units.Duration;
19+
import io.trino.spi.TrinoException;
1920
import io.trino.spi.connector.TableProcedureMetadata;
2021

2122
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
2223
import static io.trino.plugin.iceberg.procedure.IcebergTableProcedureId.EXPIRE_SNAPSHOTS;
24+
import static io.trino.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
2325
import static io.trino.spi.connector.TableProcedureExecutionMode.coordinatorOnly;
26+
import static io.trino.spi.session.PropertyMetadata.integerProperty;
2427

2528
public class ExpireSnapshotsTableProcedure
2629
implements Provider<TableProcedureMetadata>
@@ -36,6 +39,16 @@ public TableProcedureMetadata get()
3639
"retention_threshold",
3740
"Only snapshots older than threshold should be removed",
3841
Duration.valueOf("7d"),
42+
false),
43+
integerProperty(
44+
"retain_last",
45+
"Number of ancestor snapshots to preserve regardless of retention_threshold (defaults to 1)",
46+
null,
47+
value -> {
48+
if (value < 1) {
49+
throw new TrinoException(INVALID_PROCEDURE_ARGUMENT, "retain_last must be at least 1, cannot be: %s".formatted(value));
50+
}
51+
},
3952
false)));
4053
}
4154
}

‎plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/procedure/IcebergExpireSnapshotsHandle.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,27 @@
1515

1616
import io.airlift.units.Duration;
1717

18+
import java.util.Optional;
19+
20+
import static com.google.common.base.MoreObjects.toStringHelper;
1821
import static java.util.Objects.requireNonNull;
1922

20-
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold)
23+
public record IcebergExpireSnapshotsHandle(Duration retentionThreshold, Optional<Integer> retainLast)
2124
implements IcebergProcedureHandle
2225
{
2326
public IcebergExpireSnapshotsHandle
2427
{
2528
requireNonNull(retentionThreshold, "retentionThreshold is null");
29+
requireNonNull(retainLast, "retainLast is null");
30+
}
31+
32+
@Override
33+
public String toString()
34+
{
35+
return toStringHelper(this)
36+
.omitEmptyValues()
37+
.add("retentionThreshold", retentionThreshold)
38+
.add("retainLast", retainLast)
39+
.toString();
2640
}
2741
}

‎plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java

+30-1
Original file line numberDiff line numberDiff line change
@@ -6473,6 +6473,32 @@ public void testExpireSnapshotsPartitionedTable()
64736473
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
64746474
}
64756475

6476+
@Test
6477+
public void testExpireSnapshotsRetainLast()
6478+
throws Exception
6479+
{
6480+
String tableName = "test_expiring_snapshots_" + randomNameSuffix();
6481+
Session sessionWithShortRetentionUnlocked = prepareCleanUpSession();
6482+
assertUpdate("CREATE TABLE " + tableName + " (key varchar, value integer)");
6483+
assertUpdate("INSERT INTO " + tableName + " VALUES ('one', 1)", 1);
6484+
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
6485+
assertUpdate("INSERT INTO " + tableName + " VALUES ('three', 3)", 1);
6486+
assertThat(query("SELECT count(*) FROM " + tableName)).matches("VALUES BIGINT '3'");
6487+
6488+
List<Long> initialSnapshots = getSnapshotIds(tableName);
6489+
String tableLocation = getTableLocation(tableName);
6490+
List<String> initialFiles = getAllMetadataFilesFromTableDirectory(tableLocation);
6491+
assertQuerySucceeds(sessionWithShortRetentionUnlocked, "ALTER TABLE " + tableName + " EXECUTE expire_snapshots(retention_threshold => '0s', retain_last => 2)");
6492+
6493+
assertThat(query("SELECT count(*) FROM " + tableName)).matches("VALUES BIGINT '3'");
6494+
List<String> updatedFiles = getAllMetadataFilesFromTableDirectory(tableLocation);
6495+
List<Long> updatedSnapshots = getSnapshotIds(tableName);
6496+
assertThat(updatedFiles).hasSize(initialFiles.size() - 2);
6497+
assertThat(updatedSnapshots.size()).isLessThan(initialSnapshots.size());
6498+
assertThat(updatedSnapshots).hasSize(2);
6499+
assertThat(initialSnapshots).containsAll(updatedSnapshots);
6500+
}
6501+
64766502
@Test
64776503
public void testExpireSnapshotsOnSnapshot()
64786504
{
@@ -6507,7 +6533,7 @@ public void testExplainExpireSnapshotOutput()
65076533
assertUpdate("INSERT INTO " + tableName + " VALUES ('two', 2)", 1);
65086534

65096535
assertExplain("EXPLAIN ALTER TABLE " + tableName + " EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '0s')",
6510-
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\[retentionThreshold=0\\.00s].*");
6536+
"SimpleTableExecute\\[table = iceberg:schemaTableName:tpch.test_expiring_snapshots.*\\{retentionThreshold=0\\.00s}.*");
65116537
}
65126538

65136539
@Test
@@ -6525,6 +6551,9 @@ public void testExpireSnapshotsParameterValidation()
65256551
assertQueryFails(
65266552
"ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '33s')",
65276553
"\\QRetention specified (33.00s) is shorter than the minimum retention configured in the system (7.00d). Minimum retention can be changed with iceberg.expire-snapshots.min-retention configuration property or iceberg.expire_snapshots_min_retention session property");
6554+
assertQueryFails(
6555+
"ALTER TABLE nation EXECUTE EXPIRE_SNAPSHOTS (retention_threshold => '10d', retain_last => 0)",
6556+
".* retain_last must be at least 1, cannot be: 0");
65286557
}
65296558

65306559
@Test

0 commit comments

Comments
 (0)
Please sign in to comment.