|
1 | 1 | package com.linkedin.metadata.entity;
|
2 | 2 |
|
| 3 | +import static com.linkedin.metadata.Constants.DATASET_PROFILE_ASPECT_NAME; |
3 | 4 | import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
|
4 | 5 | import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
|
| 6 | +import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP; |
5 | 7 | import static org.mockito.ArgumentMatchers.any;
|
| 8 | +import static org.mockito.Mockito.doReturn; |
6 | 9 | import static org.mockito.Mockito.mock;
|
7 | 10 | import static org.mockito.Mockito.never;
|
8 | 11 | import static org.mockito.Mockito.times;
|
|
13 | 16 | import static org.testng.Assert.assertNotNull;
|
14 | 17 | import static org.testng.Assert.assertNull;
|
15 | 18 | import static org.testng.Assert.assertTrue;
|
| 19 | +import static org.testng.Assert.fail; |
16 | 20 |
|
17 | 21 | import com.datahub.util.RecordUtils;
|
18 | 22 | import com.linkedin.common.AuditStamp;
|
|
21 | 25 | import com.linkedin.common.urn.UrnUtils;
|
22 | 26 | import com.linkedin.data.template.DataTemplateUtil;
|
23 | 27 | import com.linkedin.data.template.RecordTemplate;
|
| 28 | +import com.linkedin.dataset.DatasetProfile; |
24 | 29 | import com.linkedin.dataset.UpstreamLineage;
|
| 30 | +import com.linkedin.events.metadata.ChangeType; |
25 | 31 | import com.linkedin.identity.CorpUserInfo;
|
26 | 32 | import com.linkedin.metadata.AspectGenerationUtils;
|
27 | 33 | import com.linkedin.metadata.aspect.SystemAspect;
|
| 34 | +import com.linkedin.metadata.aspect.batch.AspectsBatch; |
28 | 35 | import com.linkedin.metadata.aspect.batch.ChangeMCP;
|
29 | 36 | import com.linkedin.metadata.config.PreProcessHooks;
|
30 | 37 | import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
|
31 | 38 | import com.linkedin.metadata.entity.ebean.EbeanSystemAspect;
|
| 39 | +import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl; |
32 | 40 | import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
|
| 41 | +import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl; |
33 | 42 | import com.linkedin.metadata.event.EventProducer;
|
34 | 43 | import com.linkedin.metadata.models.registry.EntityRegistry;
|
35 | 44 | import com.linkedin.metadata.utils.GenericRecordUtils;
|
|
40 | 49 | import io.datahubproject.metadata.context.OperationContext;
|
41 | 50 | import io.datahubproject.test.metadata.context.TestOperationContexts;
|
42 | 51 | import java.sql.Timestamp;
|
| 52 | +import java.util.List; |
43 | 53 | import java.util.Optional;
|
44 | 54 | import java.util.concurrent.CompletableFuture;
|
45 | 55 | import java.util.concurrent.Future;
|
| 56 | +import java.util.stream.Stream; |
46 | 57 | import org.testng.annotations.BeforeMethod;
|
47 | 58 | import org.testng.annotations.Test;
|
48 | 59 |
|
@@ -505,4 +516,104 @@ public void testAspectWithLineageRelationship() {
|
505 | 516 | verify(mockEventProducer, times(1))
|
506 | 517 | .produceMetadataChangeLog(any(OperationContext.class), any(), any(), any());
|
507 | 518 | }
|
| 519 | + |
| 520 | + @Test |
| 521 | + public void testIngestTimeseriesProposal() { |
| 522 | + // Create a spy of the EntityServiceImpl to track method calls |
| 523 | + EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService); |
| 524 | + |
| 525 | + Urn timeseriesUrn = |
| 526 | + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesTest,PROD)"); |
| 527 | + DatasetProfile datasetProfile = new DatasetProfile(); |
| 528 | + datasetProfile.setRowCount(1000); |
| 529 | + datasetProfile.setColumnCount(15); |
| 530 | + datasetProfile.setTimestampMillis(0L); |
| 531 | + |
| 532 | + // Create a mock AspectsBatch with timeseries aspects |
| 533 | + AspectsBatch mockBatch = |
| 534 | + AspectsBatchImpl.builder() |
| 535 | + .retrieverContext(opContext.getRetrieverContext()) |
| 536 | + .items( |
| 537 | + List.of( |
| 538 | + ChangeItemImpl.builder() |
| 539 | + .urn(timeseriesUrn) |
| 540 | + .aspectName(DATASET_PROFILE_ASPECT_NAME) |
| 541 | + .recordTemplate(datasetProfile) |
| 542 | + .changeType(ChangeType.UPSERT) |
| 543 | + .auditStamp(TEST_AUDIT_STAMP) |
| 544 | + .build(opContext.getAspectRetriever()), |
| 545 | + ChangeItemImpl.builder() |
| 546 | + .urn(timeseriesUrn) |
| 547 | + .aspectName(DATASET_PROFILE_ASPECT_NAME) |
| 548 | + .recordTemplate(datasetProfile) |
| 549 | + .changeType(ChangeType.UPSERT) |
| 550 | + .auditStamp(TEST_AUDIT_STAMP) |
| 551 | + .build(opContext.getAspectRetriever()))) |
| 552 | + .build(); |
| 553 | + |
| 554 | + // Test case 1: async = true path |
| 555 | + // Arrange |
| 556 | + doReturn(Stream.empty()) |
| 557 | + .when(entityServiceSpy) |
| 558 | + .ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class)); |
| 559 | + |
| 560 | + // Act |
| 561 | + entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true); |
| 562 | + |
| 563 | + // Verify |
| 564 | + verify(entityServiceSpy, times(1)) |
| 565 | + .ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class)); |
| 566 | + verify(entityServiceSpy, never()) |
| 567 | + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); |
| 568 | + |
| 569 | + // Test case 2: async = false path |
| 570 | + // Arrange |
| 571 | + org.mockito.Mockito.reset(entityServiceSpy); |
| 572 | + doReturn(Stream.empty()) |
| 573 | + .when(entityServiceSpy) |
| 574 | + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); |
| 575 | + |
| 576 | + // Act |
| 577 | + entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, false); |
| 578 | + |
| 579 | + // Verify |
| 580 | + verify(entityServiceSpy, never()) |
| 581 | + .ingestProposalAsync(any(OperationContext.class), any(AspectsBatch.class)); |
| 582 | + verify(entityServiceSpy, times(1)) |
| 583 | + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); |
| 584 | + } |
| 585 | + |
| 586 | + @Test |
| 587 | + public void testIngestTimeseriesProposalUnsupported() { |
| 588 | + // Create a spy of the EntityServiceImpl to track method calls |
| 589 | + EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService); |
| 590 | + |
| 591 | + Urn timeseriesUrn = |
| 592 | + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesUnsupportedTest,PROD)"); |
| 593 | + |
| 594 | + // Create a mock AspectsBatch with timeseries aspects |
| 595 | + AspectsBatch mockBatch = |
| 596 | + AspectsBatchImpl.builder() |
| 597 | + .retrieverContext(opContext.getRetrieverContext()) |
| 598 | + .items( |
| 599 | + List.of( |
| 600 | + DeleteItemImpl.builder() |
| 601 | + .urn(timeseriesUrn) |
| 602 | + .aspectName(DATASET_PROFILE_ASPECT_NAME) |
| 603 | + .auditStamp(TEST_AUDIT_STAMP) |
| 604 | + .build(opContext.getAspectRetriever()), |
| 605 | + DeleteItemImpl.builder() |
| 606 | + .urn(timeseriesUrn) |
| 607 | + .aspectName(DATASET_PROFILE_ASPECT_NAME) |
| 608 | + .auditStamp(TEST_AUDIT_STAMP) |
| 609 | + .build(opContext.getAspectRetriever()))) |
| 610 | + .build(); |
| 611 | + |
| 612 | + try { |
| 613 | + entityServiceSpy.ingestTimeseriesProposal(opContext, mockBatch, true); |
| 614 | + fail("Should throw UnsupportedOperationException for non-UPSERT change types"); |
| 615 | + } catch (UnsupportedOperationException e) { |
| 616 | + // Expected |
| 617 | + } |
| 618 | + } |
508 | 619 | }
|
0 commit comments