Skip to content

Commit 8d7af35

Browse files
authored
feat(iceberg): improve concurrency control and resilience (#12664)
1 parent a19edde commit 8d7af35

12 files changed

+2111
-513
lines changed

metadata-service/iceberg-catalog/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
implementation project(':metadata-models')
1414
implementation project(':metadata-utils')
1515
implementation project(':metadata-operation-context')
16+
implementation project(':metadata-io')
1617
implementation project(':metadata-integration:java:datahub-schematron:lib')
1718
implementation 'org.apache.iceberg:iceberg-core:1.6.1'
1819
implementation 'org.apache.iceberg:iceberg-aws:1.6.1'
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
package io.datahubproject.iceberg.catalog;
22

33
import static com.linkedin.metadata.Constants.*;
4-
import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect;
54
import static io.datahubproject.iceberg.catalog.Utils.*;
65

7-
import com.google.common.util.concurrent.Striped;
8-
import com.linkedin.common.AuditStamp;
6+
import com.google.common.annotations.VisibleForTesting;
97
import com.linkedin.common.FabricType;
8+
import com.linkedin.common.Status;
109
import com.linkedin.common.urn.DatasetUrn;
1110
import com.linkedin.common.urn.Urn;
11+
import com.linkedin.container.Container;
1212
import com.linkedin.data.template.RecordTemplate;
1313
import com.linkedin.dataplatforminstance.IcebergWarehouseInfo;
14+
import com.linkedin.dataset.DatasetProperties;
1415
import com.linkedin.dataset.IcebergCatalogInfo;
1516
import com.linkedin.entity.EnvelopedAspect;
16-
import com.linkedin.events.metadata.ChangeType;
17+
import com.linkedin.metadata.aspect.batch.AspectsBatch;
1718
import com.linkedin.metadata.entity.EntityService;
18-
import com.linkedin.mxe.MetadataChangeProposal;
19+
import com.linkedin.metadata.entity.IngestResult;
1920
import com.linkedin.platformresource.PlatformResourceInfo;
2021
import com.linkedin.secret.DataHubSecretValue;
2122
import com.linkedin.util.Pair;
@@ -24,13 +25,13 @@
2425
import io.datahubproject.metadata.services.SecretService;
2526
import java.net.URISyntaxException;
2627
import java.util.*;
27-
import java.util.concurrent.locks.Lock;
2828
import lombok.Getter;
2929
import lombok.SneakyThrows;
30-
import org.apache.iceberg.CatalogUtil;
30+
import lombok.extern.slf4j.Slf4j;
3131
import org.apache.iceberg.catalog.TableIdentifier;
3232
import org.apache.iceberg.exceptions.*;
3333

34+
@Slf4j
3435
public class DataHubIcebergWarehouse {
3536

3637
public static final String DATASET_ICEBERG_METADATA_ASPECT_NAME = "icebergCatalogInfo";
@@ -47,11 +48,8 @@ public class DataHubIcebergWarehouse {
4748

4849
@Getter private final String platformInstance;
4950

50-
// TODO: Need to handle locks for deployments with multiple GMS replicas.
51-
private static final Striped<Lock> resourceLocks =
52-
Striped.lazyWeakLock(Runtime.getRuntime().availableProcessors() * 2);
53-
54-
private DataHubIcebergWarehouse(
51+
@VisibleForTesting
52+
DataHubIcebergWarehouse(
5553
String platformInstance,
5654
IcebergWarehouseInfo icebergWarehouse,
5755
EntityService entityService,
@@ -121,39 +119,96 @@ public String getDataRoot() {
121119
return icebergWarehouse.getDataRoot();
122120
}
123121

122+
@SneakyThrows
124123
public Optional<DatasetUrn> getDatasetUrn(TableIdentifier tableIdentifier) {
125124
Urn resourceUrn = resourceUrn(tableIdentifier);
126-
PlatformResourceInfo platformResourceInfo =
127-
(PlatformResourceInfo)
128-
entityService.getLatestAspect(
129-
operationContext, resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME);
130-
if (platformResourceInfo == null) {
125+
Optional<PlatformResourceInfo> platformResourceInfo =
126+
getLatestAspectNonRemoved(resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME);
127+
128+
if (platformResourceInfo.isEmpty()) {
131129
return Optional.empty();
132130
}
133-
try {
134-
return Optional.of(DatasetUrn.createFromString(platformResourceInfo.getPrimaryKey()));
135-
} catch (URISyntaxException e) {
136-
throw new RuntimeException("Invalid dataset urn " + platformResourceInfo.getPrimaryKey(), e);
131+
132+
return Optional.of(DatasetUrn.createFromString(platformResourceInfo.get().getPrimaryKey()));
133+
}
134+
135+
private <T extends RecordTemplate> Optional<T> getLatestAspectNonRemoved(
136+
Urn urn, String aspectName) {
137+
Map<Urn, List<RecordTemplate>> aspectsMap =
138+
entityService.getLatestAspects(
139+
operationContext, Set.of(urn), Set.of(STATUS_ASPECT_NAME, aspectName), false);
140+
141+
if (aspectsMap == null || aspectsMap.isEmpty()) {
142+
return Optional.empty();
143+
}
144+
List<RecordTemplate> aspects = aspectsMap.get(urn);
145+
if (aspects == null || aspects.isEmpty()) {
146+
return Optional.empty();
137147
}
148+
149+
T result = null;
150+
151+
for (RecordTemplate aspect : aspects) {
152+
if (aspect instanceof Status status) {
153+
if (status.isRemoved()) {
154+
return Optional.empty();
155+
}
156+
} else {
157+
result = (T) aspect;
158+
}
159+
}
160+
161+
return Optional.ofNullable(result);
138162
}
139163

140-
public IcebergCatalogInfo getIcebergMetadata(TableIdentifier tableIdentifier) {
164+
private Optional<EnvelopedAspect> getLatestEnvelopedAspectNonRemoved(Urn urn, String aspectName)
165+
throws URISyntaxException {
166+
167+
Map<Urn, List<EnvelopedAspect>> aspectsMap =
168+
entityService.getLatestEnvelopedAspects(
169+
operationContext, Set.of(urn), Set.of(STATUS_ASPECT_NAME, aspectName), false);
170+
171+
if (aspectsMap == null || aspectsMap.isEmpty()) {
172+
return Optional.empty();
173+
}
174+
List<EnvelopedAspect> aspects = aspectsMap.get(urn);
175+
if (aspects == null || aspects.isEmpty()) {
176+
return Optional.empty();
177+
}
178+
179+
EnvelopedAspect result = null;
180+
181+
for (EnvelopedAspect aspect : aspects) {
182+
if (STATUS_ASPECT_NAME.equals(aspect.getName())) {
183+
Status status = new Status(aspect.getValue().data());
184+
if (status.isRemoved()) {
185+
return Optional.empty();
186+
}
187+
} else {
188+
result = aspect;
189+
}
190+
}
191+
192+
return Optional.ofNullable(result);
193+
}
194+
195+
public Optional<IcebergCatalogInfo> getIcebergMetadata(TableIdentifier tableIdentifier) {
141196
Optional<DatasetUrn> datasetUrn = getDatasetUrn(tableIdentifier);
142197
if (datasetUrn.isEmpty()) {
143-
return null;
198+
return Optional.empty();
144199
}
145200

146-
IcebergCatalogInfo icebergMeta =
147-
(IcebergCatalogInfo)
148-
entityService.getLatestAspect(
149-
operationContext, datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);
201+
Optional<IcebergCatalogInfo> icebergMeta =
202+
getLatestAspectNonRemoved(datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);
150203

151-
if (icebergMeta == null) {
152-
throw new IllegalStateException(
204+
if (icebergMeta.isEmpty()) {
205+
// possibly some deletion cleanup is pending; log error & return as if dataset doesn't exist.
206+
log.error(
153207
String.format(
154208
"IcebergMetadata not found for resource %s, dataset %s",
155209
resourceUrn(tableIdentifier), datasetUrn.get()));
156210
}
211+
157212
return icebergMeta;
158213
}
159214

@@ -165,19 +220,19 @@ public Pair<EnvelopedAspect, DatasetUrn> getIcebergMetadataEnveloped(
165220
}
166221

167222
try {
168-
EnvelopedAspect existingEnveloped =
169-
entityService.getLatestEnvelopedAspect(
170-
operationContext,
171-
DATASET_ENTITY_NAME,
172-
datasetUrn.get(),
173-
DATASET_ICEBERG_METADATA_ASPECT_NAME);
174-
if (existingEnveloped == null) {
175-
throw new IllegalStateException(
223+
Optional<EnvelopedAspect> existingEnveloped =
224+
getLatestEnvelopedAspectNonRemoved(
225+
datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);
226+
if (existingEnveloped.isEmpty()) {
227+
// possibly some deletion cleanup is pending; log error & return as if dataset doesn't
228+
// exist.
229+
log.error(
176230
String.format(
177231
"IcebergMetadata not found for resource %s, dataset %s",
178232
resourceUrn(tableIdentifier), datasetUrn.get()));
233+
return null;
179234
}
180-
return Pair.of(existingEnveloped, datasetUrn.get());
235+
return Pair.of(existingEnveloped.get(), datasetUrn.get());
181236
} catch (Exception e) {
182237
throw new RuntimeException(
183238
"Error fetching IcebergMetadata aspect for dataset " + datasetUrn.get(), e);
@@ -186,79 +241,121 @@ public Pair<EnvelopedAspect, DatasetUrn> getIcebergMetadataEnveloped(
186241

187242
public boolean deleteDataset(TableIdentifier tableIdentifier) {
188243
Urn resourceUrn = resourceUrn(tableIdentifier);
244+
if (!entityService.exists(operationContext, resourceUrn)) {
245+
return false;
246+
}
189247

190-
// guard against concurrent modifications that depend on the resource (rename table/view)
191-
Lock lock = resourceLocks.get(resourceUrn);
192-
lock.lock();
193-
try {
194-
if (!entityService.exists(operationContext, resourceUrn)) {
195-
return false;
196-
}
197-
Optional<DatasetUrn> urn = getDatasetUrn(tableIdentifier);
248+
Optional<DatasetUrn> datasetUrn = getDatasetUrn(tableIdentifier);
249+
if (datasetUrn.isEmpty()) {
250+
log.warn("Dataset urn not found for platform resource {}; cleaning up resource", resourceUrn);
198251
entityService.deleteUrn(operationContext, resourceUrn);
199-
urn.ifPresent(x -> entityService.deleteUrn(operationContext, x));
200-
return true;
201-
} finally {
202-
lock.unlock();
252+
return false;
203253
}
254+
255+
IcebergBatch icebergBatch = newIcebergBatch(operationContext);
256+
icebergBatch.softDeleteEntity(resourceUrn, PLATFORM_RESOURCE_ENTITY_NAME);
257+
icebergBatch.softDeleteEntity(datasetUrn.get(), DATASET_ENTITY_NAME);
258+
259+
AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch();
260+
List<IngestResult> ingestResults =
261+
entityService.ingestProposal(operationContext, aspectsBatch, false);
262+
263+
boolean result = true;
264+
for (IngestResult ingestResult : ingestResults) {
265+
if (ingestResult.getResult().isNoOp()) {
266+
result = false;
267+
break;
268+
}
269+
}
270+
271+
entityService.deleteUrn(operationContext, resourceUrn);
272+
entityService.deleteUrn(operationContext, datasetUrn.get());
273+
274+
return result;
204275
}
205276

206277
public DatasetUrn createDataset(
207-
TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) {
278+
TableIdentifier tableIdentifier, boolean view, IcebergBatch icebergBatch) {
208279
String datasetName = platformInstance + "." + UUID.randomUUID();
209280
DatasetUrn datasetUrn = new DatasetUrn(platformUrn(), datasetName, fabricType());
210-
createResource(datasetUrn, tableIdentifier, view, auditStamp);
281+
282+
createResource(datasetUrn, tableIdentifier, view, icebergBatch);
283+
211284
return datasetUrn;
212285
}
213286

214-
public DatasetUrn renameDataset(
215-
TableIdentifier fromTableId, TableIdentifier toTableId, boolean view, AuditStamp auditStamp) {
287+
public void renameDataset(TableIdentifier fromTableId, TableIdentifier toTableId, boolean view) {
288+
289+
Optional<DatasetUrn> optDatasetUrn = getDatasetUrn(fromTableId);
290+
if (optDatasetUrn.isEmpty()) {
291+
throw noSuchEntity(view, fromTableId);
292+
}
293+
294+
DatasetUrn datasetUrn = optDatasetUrn.get();
295+
296+
IcebergBatch icebergBatch = newIcebergBatch(operationContext);
297+
icebergBatch.softDeleteEntity(resourceUrn(fromTableId), PLATFORM_RESOURCE_ENTITY_NAME);
298+
createResource(datasetUrn, toTableId, view, icebergBatch);
299+
300+
DatasetProperties datasetProperties =
301+
new DatasetProperties()
302+
.setName(toTableId.name())
303+
.setQualifiedName(fullTableName(platformInstance, toTableId));
216304

217-
// guard against concurrent modifications to the resource (other renames, deletion)
218-
Lock lock = resourceLocks.get(resourceUrn(fromTableId));
219-
lock.lock();
305+
IcebergBatch.EntityBatch datasetBatch =
306+
icebergBatch.updateEntity(datasetUrn, DATASET_ENTITY_NAME);
307+
datasetBatch.aspect(DATASET_PROPERTIES_ASPECT_NAME, datasetProperties);
308+
309+
if (!fromTableId.namespace().equals(toTableId.namespace())) {
310+
Container container =
311+
new Container().setContainer(containerUrn(platformInstance, toTableId.namespace()));
312+
datasetBatch.aspect(CONTAINER_ASPECT_NAME, container);
313+
}
220314

221315
try {
222-
Optional<DatasetUrn> optDatasetUrn = getDatasetUrn(fromTableId);
223-
if (optDatasetUrn.isEmpty()) {
224-
if (view) {
225-
throw new NoSuchViewException(
226-
"No such view %s", fullTableName(platformInstance, fromTableId));
227-
} else {
228-
throw new NoSuchTableException(
229-
"No such table %s", fullTableName(platformInstance, fromTableId));
230-
}
316+
AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch();
317+
entityService.ingestProposal(operationContext, aspectsBatch, false);
318+
} catch (ValidationException e) {
319+
if (!entityService.exists(operationContext, resourceUrn(fromTableId), false)) {
320+
// someone else deleted "fromTable" before we could get through
321+
throw noSuchEntity(view, fromTableId);
231322
}
232-
233-
DatasetUrn datasetUrn = optDatasetUrn.get();
234-
try {
235-
createResource(datasetUrn, toTableId, view, auditStamp);
236-
} catch (ValidationException e) {
323+
if (entityService.exists(operationContext, resourceUrn(toTableId), true)) {
237324
throw new AlreadyExistsException(
238325
"%s already exists: %s",
239326
view ? "View" : "Table", fullTableName(platformInstance, toTableId));
240327
}
241-
entityService.deleteUrn(operationContext, resourceUrn(fromTableId));
242-
return datasetUrn;
243-
} finally {
244-
lock.unlock();
328+
throw new IllegalStateException(
329+
String.format(
330+
"Rename operation failed inexplicably, from %s to %s in warehouse %s",
331+
fromTableId, toTableId, platformInstance));
245332
}
333+
334+
entityService.deleteUrn(operationContext, resourceUrn(fromTableId));
335+
}
336+
337+
private RuntimeException noSuchEntity(boolean view, TableIdentifier tableIdentifier) {
338+
return view
339+
? new NoSuchViewException(
340+
"No such view %s", fullTableName(platformInstance, tableIdentifier))
341+
: new NoSuchTableException(
342+
"No such table %s", fullTableName(platformInstance, tableIdentifier));
246343
}
247344

248345
private void createResource(
249-
DatasetUrn datasetUrn, TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) {
346+
DatasetUrn datasetUrn,
347+
TableIdentifier tableIdentifier,
348+
boolean view,
349+
IcebergBatch icebergBatch) {
250350
PlatformResourceInfo resourceInfo =
251351
new PlatformResourceInfo().setPrimaryKey(datasetUrn.toString());
252352
resourceInfo.setResourceType(view ? "icebergView" : "icebergTable");
253353

254-
MetadataChangeProposal mcp = new MetadataChangeProposal();
255-
mcp.setEntityUrn(resourceUrn(tableIdentifier));
256-
mcp.setEntityType(PLATFORM_RESOURCE_ENTITY_NAME);
257-
mcp.setAspectName(PLATFORM_RESOURCE_INFO_ASPECT_NAME);
258-
mcp.setChangeType(ChangeType.CREATE_ENTITY);
259-
mcp.setAspect(serializeAspect(resourceInfo));
260-
261-
entityService.ingestProposal(operationContext, mcp, auditStamp, false);
354+
icebergBatch.createEntity(
355+
resourceUrn(tableIdentifier),
356+
PLATFORM_RESOURCE_ENTITY_NAME,
357+
PLATFORM_RESOURCE_INFO_ASPECT_NAME,
358+
resourceInfo);
262359
}
263360

264361
private FabricType fabricType() {
@@ -268,8 +365,15 @@ private FabricType fabricType() {
268365
@SneakyThrows
269366
private Urn resourceUrn(TableIdentifier tableIdentifier) {
270367
return Urn.createFromString(
271-
String.format(
272-
"urn:li:platformResource:%s.%s",
273-
PLATFORM_NAME, CatalogUtil.fullTableName(platformInstance, tableIdentifier)));
368+
String.format("urn:li:platformResource:%s.%s", PLATFORM_NAME, tableName(tableIdentifier)));
369+
}
370+
371+
private String tableName(TableIdentifier tableIdentifier) {
372+
return fullTableName(platformInstance, tableIdentifier);
373+
}
374+
375+
@VisibleForTesting
376+
IcebergBatch newIcebergBatch(OperationContext operationContext) {
377+
return new IcebergBatch(operationContext);
274378
}
275379
}

0 commit comments

Comments
 (0)