Skip to content

Commit 58f81e4

Browse files
committed
fix(openapi): fix openapi timeseries async ingestion
* ensure key aspects are created when a timeseries aspect is encountered when using async mode
1 parent a646185 commit 58f81e4

File tree

2 files changed

+148
-28
lines changed

2 files changed

+148
-28
lines changed

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+37-28
Original file line numberDiff line numberDiff line change
@@ -1361,7 +1361,8 @@ public List<IngestResult> ingestProposal(
13611361
* @param aspectsBatch timeseries upserts batch
13621362
* @return returns ingest proposal result, however was never in the MCP topic
13631363
*/
1364-
private Stream<IngestResult> ingestTimeseriesProposal(
1364+
@VisibleForTesting
1365+
Stream<IngestResult> ingestTimeseriesProposal(
13651366
@Nonnull OperationContext opContext, AspectsBatch aspectsBatch, final boolean async) {
13661367

13671368
List<? extends BatchItem> unsupported =
@@ -1381,31 +1382,38 @@ private Stream<IngestResult> ingestTimeseriesProposal(
13811382
return opContext.withSpan(
13821383
"ingestTimeseriesProposal",
13831384
() -> {
1384-
if (!async) {
1385-
// Handle throttling
1386-
APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true);
1387-
1388-
// Create default non-timeseries aspects for timeseries aspects
1389-
List<MCPItem> timeseriesKeyAspects =
1390-
aspectsBatch.getMCPItems().stream()
1391-
.filter(
1392-
item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries())
1393-
.map(
1394-
item ->
1395-
ChangeItemImpl.builder()
1396-
.urn(item.getUrn())
1397-
.aspectName(item.getEntitySpec().getKeyAspectName())
1398-
.changeType(ChangeType.UPSERT)
1399-
.entitySpec(item.getEntitySpec())
1400-
.aspectSpec(item.getEntitySpec().getKeyAspectSpec())
1401-
.auditStamp(item.getAuditStamp())
1402-
.systemMetadata(item.getSystemMetadata())
1403-
.recordTemplate(
1404-
EntityApiUtils.buildKeyAspect(
1405-
opContext.getEntityRegistry(), item.getUrn()))
1406-
.build(opContext.getAspectRetriever()))
1407-
.collect(Collectors.toList());
1385+
// Handle throttling
1386+
APIThrottle.evaluate(opContext, new HashSet<>(throttleEvents.values()), true);
1387+
1388+
// Create default non-timeseries aspects for timeseries aspects
1389+
List<MCPItem> timeseriesKeyAspects =
1390+
aspectsBatch.getMCPItems().stream()
1391+
.filter(
1392+
item -> item.getAspectSpec() != null && item.getAspectSpec().isTimeseries())
1393+
.map(
1394+
item ->
1395+
ChangeItemImpl.builder()
1396+
.urn(item.getUrn())
1397+
.aspectName(item.getEntitySpec().getKeyAspectName())
1398+
.changeType(ChangeType.UPSERT)
1399+
.entitySpec(item.getEntitySpec())
1400+
.aspectSpec(item.getEntitySpec().getKeyAspectSpec())
1401+
.auditStamp(item.getAuditStamp())
1402+
.systemMetadata(item.getSystemMetadata())
1403+
.recordTemplate(
1404+
EntityApiUtils.buildKeyAspect(
1405+
opContext.getEntityRegistry(), item.getUrn()))
1406+
.build(opContext.getAspectRetriever()))
1407+
.collect(Collectors.toList());
14081408

1409+
if (async) {
1410+
ingestProposalAsync(
1411+
opContext,
1412+
AspectsBatchImpl.builder()
1413+
.retrieverContext(aspectsBatch.getRetrieverContext())
1414+
.items(timeseriesKeyAspects)
1415+
.build());
1416+
} else {
14091417
ingestProposalSync(
14101418
opContext,
14111419
AspectsBatchImpl.builder()
@@ -1479,8 +1487,8 @@ private Stream<IngestResult> ingestTimeseriesProposal(
14791487
* @param aspectsBatch non-timeseries ingest aspects
14801488
* @return produced items to the MCP topic
14811489
*/
1482-
private Stream<IngestResult> ingestProposalAsync(
1483-
OperationContext opContext, AspectsBatch aspectsBatch) {
1490+
@VisibleForTesting
1491+
Stream<IngestResult> ingestProposalAsync(OperationContext opContext, AspectsBatch aspectsBatch) {
14841492
return opContext.withSpan(
14851493
"ingestProposalAsync",
14861494
() -> {
@@ -1524,7 +1532,8 @@ private Stream<IngestResult> ingestProposalAsync(
15241532
String.valueOf(aspectsBatch.getItems().size()));
15251533
}
15261534

1527-
private Stream<IngestResult> ingestProposalSync(
1535+
@VisibleForTesting
1536+
Stream<IngestResult> ingestProposalSync(
15281537
@Nonnull OperationContext opContext, AspectsBatch aspectsBatch) {
15291538

15301539
return opContext.withSpan(

metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceImplTest.java

+111
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.linkedin.metadata.entity;
22

3+
import static com.linkedin.metadata.Constants.DATASET_PROFILE_ASPECT_NAME;
34
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
45
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
6+
import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP;
57
import static org.mockito.ArgumentMatchers.any;
8+
import static org.mockito.Mockito.doReturn;
69
import static org.mockito.Mockito.mock;
710
import static org.mockito.Mockito.never;
811
import static org.mockito.Mockito.times;
@@ -13,6 +16,7 @@
1316
import static org.testng.Assert.assertNotNull;
1417
import static org.testng.Assert.assertNull;
1518
import static org.testng.Assert.assertTrue;
19+
import static org.testng.Assert.fail;
1620

1721
import com.datahub.util.RecordUtils;
1822
import com.linkedin.common.AuditStamp;
@@ -21,15 +25,20 @@
2125
import com.linkedin.common.urn.UrnUtils;
2226
import com.linkedin.data.template.DataTemplateUtil;
2327
import com.linkedin.data.template.RecordTemplate;
28+
import com.linkedin.dataset.DatasetProfile;
2429
import com.linkedin.dataset.UpstreamLineage;
30+
import com.linkedin.events.metadata.ChangeType;
2531
import com.linkedin.identity.CorpUserInfo;
2632
import com.linkedin.metadata.AspectGenerationUtils;
2733
import com.linkedin.metadata.aspect.SystemAspect;
34+
import com.linkedin.metadata.aspect.batch.AspectsBatch;
2835
import com.linkedin.metadata.aspect.batch.ChangeMCP;
2936
import com.linkedin.metadata.config.PreProcessHooks;
3037
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
3138
import com.linkedin.metadata.entity.ebean.EbeanSystemAspect;
39+
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
3240
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
41+
import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl;
3342
import com.linkedin.metadata.event.EventProducer;
3443
import com.linkedin.metadata.models.registry.EntityRegistry;
3544
import com.linkedin.metadata.utils.GenericRecordUtils;
@@ -40,9 +49,11 @@
4049
import io.datahubproject.metadata.context.OperationContext;
4150
import io.datahubproject.test.metadata.context.TestOperationContexts;
4251
import java.sql.Timestamp;
52+
import java.util.List;
4353
import java.util.Optional;
4454
import java.util.concurrent.CompletableFuture;
4555
import java.util.concurrent.Future;
56+
import java.util.stream.Stream;
4657
import org.testng.annotations.BeforeMethod;
4758
import org.testng.annotations.Test;
4859

@@ -505,4 +516,104 @@ public void testAspectWithLineageRelationship() {
505516
verify(mockEventProducer, times(1))
506517
.produceMetadataChangeLog(any(OperationContext.class), any(), any(), any());
507518
}
519+
520+
@Test
521+
public void testIngestTimeseriesProposal() {
522+
// Create a spy of the EntityServiceImpl to track method calls
523+
EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService);
524+
525+
Urn timeseriesUrn =
526+
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesTest,PROD)");
527+
DatasetProfile datasetProfile = new DatasetProfile();
528+
datasetProfile.setRowCount(1000);
529+
datasetProfile.setColumnCount(15);
530+
datasetProfile.setTimestampMillis(0L);
531+
532+
// Create a mock AspectsBatch with timeseries aspects
533+
AspectsBatch mockBatch =
534+
AspectsBatchImpl.builder()
535+
.retrieverContext(opContext.getRetrieverContext())
536+
.items(
537+
List.of(
538+
ChangeItemImpl.builder()
539+
.urn(timeseriesUrn)
540+
.aspectName(DATASET_PROFILE_ASPECT_NAME)
541+
.recordTemplate(datasetProfile)
542+
.changeType(ChangeType.UPSERT)
543+
.auditStamp(TEST_AUDIT_STAMP)
544+
.build(opContext.getAspectRetriever()),
545+
ChangeItemImpl.builder()
546+
.urn(timeseriesUrn)
547+
.aspectName(DATASET_PROFILE_ASPECT_NAME)
548+
.recordTemplate(datasetProfile)
549+
.changeType(ChangeType.UPSERT)
550+
.auditStamp(TEST_AUDIT_STAMP)
551+
.build(opContext.getAspectRetriever())))
552+
.build();
553+
554+
// Test case 1: async = true path
555+
// Arrange
556+
doReturn(Stream.empty())
557+
.when(entityServiceSpy)
558+
.ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class));
559+
560+
// Act
561+
entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true);
562+
563+
// Verify
564+
verify(entityServiceSpy, times(1))
565+
.ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class));
566+
verify(entityServiceSpy, never())
567+
.ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class));
568+
569+
// Test case 2: async = false path
570+
// Arrange
571+
org.mockito.Mockito.reset(entityServiceSpy);
572+
doReturn(Stream.empty())
573+
.when(entityServiceSpy)
574+
.ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class));
575+
576+
// Act
577+
entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, false);
578+
579+
// Verify
580+
verify(entityServiceSpy, never())
581+
.ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class));
582+
verify(entityServiceSpy, times(1))
583+
.ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class));
584+
}
585+
586+
@Test
587+
public void testIngestTimeseriesProposalUnsupported() {
588+
// Create a spy of the EntityServiceImpl to track method calls
589+
EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService);
590+
591+
Urn timeseriesUrn =
592+
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesUnsupportedTest,PROD)");
593+
594+
// Create a mock AspectsBatch with timeseries aspects
595+
AspectsBatch mockBatch =
596+
AspectsBatchImpl.builder()
597+
.retrieverContext(opContext.getRetrieverContext())
598+
.items(
599+
List.of(
600+
DeleteItemImpl.builder()
601+
.urn(timeseriesUrn)
602+
.aspectName(DATASET_PROFILE_ASPECT_NAME)
603+
.auditStamp(TEST_AUDIT_STAMP)
604+
.build(opContext.getAspectRetriever()),
605+
DeleteItemImpl.builder()
606+
.urn(timeseriesUrn)
607+
.aspectName(DATASET_PROFILE_ASPECT_NAME)
608+
.auditStamp(TEST_AUDIT_STAMP)
609+
.build(opContext.getAspectRetriever())))
610+
.build();
611+
612+
try {
613+
entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true);
614+
fail("Should throw UnsupportedOperationException for non-UPSERT change types");
615+
} catch (UnsupportedOperationException e) {
616+
// Expected
617+
}
618+
}
508619
}

0 commit comments

Comments
 (0)