|
5 | 5 | import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
|
6 | 6 | import static com.linkedin.metadata.entity.EntityServiceTest.TEST_AUDIT_STAMP;
|
7 | 7 | import static org.mockito.ArgumentMatchers.any;
|
| 8 | +import static org.mockito.ArgumentMatchers.anyInt; |
| 9 | +import static org.mockito.ArgumentMatchers.argThat; |
8 | 10 | import static org.mockito.Mockito.doReturn;
|
9 | 11 | import static org.mockito.Mockito.mock;
|
10 | 12 | import static org.mockito.Mockito.never;
|
| 13 | +import static org.mockito.Mockito.spy; |
11 | 14 | import static org.mockito.Mockito.times;
|
12 | 15 | import static org.mockito.Mockito.verify;
|
13 | 16 | import static org.mockito.Mockito.when;
|
|
36 | 39 | import com.linkedin.metadata.config.PreProcessHooks;
|
37 | 40 | import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
|
38 | 41 | import com.linkedin.metadata.entity.ebean.EbeanSystemAspect;
|
| 42 | +import com.linkedin.metadata.entity.ebean.PartitionedStream; |
39 | 43 | import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
|
40 | 44 | import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
|
41 | 45 | import com.linkedin.metadata.entity.ebean.batch.DeleteItemImpl;
|
| 46 | +import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs; |
| 47 | +import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult; |
42 | 48 | import com.linkedin.metadata.event.EventProducer;
|
43 | 49 | import com.linkedin.metadata.models.registry.EntityRegistry;
|
44 | 50 | import com.linkedin.metadata.utils.GenericRecordUtils;
|
|
49 | 55 | import io.datahubproject.metadata.context.OperationContext;
|
50 | 56 | import io.datahubproject.test.metadata.context.TestOperationContexts;
|
51 | 57 | import java.sql.Timestamp;
|
| 58 | +import java.util.ArrayList; |
52 | 59 | import java.util.List;
|
53 | 60 | import java.util.Optional;
|
54 | 61 | import java.util.concurrent.CompletableFuture;
|
55 | 62 | import java.util.concurrent.Future;
|
56 | 63 | import java.util.stream.Stream;
|
| 64 | +import org.mockito.ArgumentCaptor; |
57 | 65 | import org.testng.annotations.BeforeMethod;
|
58 | 66 | import org.testng.annotations.Test;
|
59 | 67 |
|
@@ -520,7 +528,7 @@ public void testAspectWithLineageRelationship() {
|
520 | 528 | @Test
|
521 | 529 | public void testIngestTimeseriesProposal() {
|
522 | 530 | // Create a spy of the EntityServiceImpl to track method calls
|
523 |
| - EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService); |
| 531 | + EntityServiceImpl entityServiceSpy = spy(entityService); |
524 | 532 |
|
525 | 533 | Urn timeseriesUrn =
|
526 | 534 | UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesTest,PROD)");
|
@@ -586,7 +594,7 @@ public void testIngestTimeseriesProposal() {
|
586 | 594 | @Test
|
587 | 595 | public void testIngestTimeseriesProposalUnsupported() {
|
588 | 596 | // Create a spy of the EntityServiceImpl to track method calls
|
589 |
| - EntityServiceImpl entityServiceSpy = org.mockito.Mockito.spy(entityService); |
| 597 | + EntityServiceImpl entityServiceSpy = spy(entityService); |
590 | 598 |
|
591 | 599 | Urn timeseriesUrn =
|
592 | 600 | UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,timeseriesUnsupportedTest,PROD)");
|
@@ -616,4 +624,274 @@ public void testIngestTimeseriesProposalUnsupported() {
|
616 | 624 | // Expected
|
617 | 625 | }
|
618 | 626 | }
|
| 627 | + |
| 628 | + /** |
| 629 | + * Tests an end-to-end scenario with createDefaultAspects=true, ensuring that both the aspects are |
| 630 | + * restored to the index and default aspects are created. |
| 631 | + */ |
| 632 | + @Test |
| 633 | + public void testRestoreIndicesEndToEndWithDefaultAspects() throws Exception { |
| 634 | + // Setup mock AspectDao |
| 635 | + AspectDao mockAspectDao = mock(AspectDao.class); |
| 636 | + PartitionedStream<EbeanAspectV2> mockStream = mock(PartitionedStream.class); |
| 637 | + |
| 638 | + // Create test aspects |
| 639 | + List<EbeanAspectV2> batch = new ArrayList<>(); |
| 640 | + |
| 641 | + // Dataset aspect |
| 642 | + EbeanAspectV2 datasetAspect = |
| 643 | + new EbeanAspectV2( |
| 644 | + "urn:li:dataset:(urn:li:dataPlatform:test,defaultAspectsTest,PROD)", |
| 645 | + STATUS_ASPECT_NAME, |
| 646 | + 0L, |
| 647 | + RecordUtils.toJsonString(new Status().setRemoved(false)), |
| 648 | + new Timestamp(System.currentTimeMillis()), |
| 649 | + TEST_AUDIT_STAMP.getActor().toString(), |
| 650 | + null, |
| 651 | + RecordUtils.toJsonString(SystemMetadataUtils.createDefaultSystemMetadata())); |
| 652 | + batch.add(datasetAspect); |
| 653 | + |
| 654 | + // Setup mock stream |
| 655 | + when(mockStream.partition(anyInt())).thenReturn(Stream.of(batch.stream())); |
| 656 | + when(mockAspectDao.streamAspectBatches(any())).thenReturn(mockStream); |
| 657 | + |
| 658 | + // Setup mock EventProducer |
| 659 | + EventProducer mockEventProducer = mock(EventProducer.class); |
| 660 | + when(mockEventProducer.produceMetadataChangeLog( |
| 661 | + any(OperationContext.class), any(), any(), any())) |
| 662 | + .thenReturn(CompletableFuture.completedFuture(null)); |
| 663 | + |
| 664 | + // Create EntityServiceImpl with mocks |
| 665 | + EntityServiceImpl entityServiceSpy = |
| 666 | + spy( |
| 667 | + new EntityServiceImpl( |
| 668 | + mockAspectDao, mockEventProducer, false, mock(PreProcessHooks.class), 0, true)); |
| 669 | + |
| 670 | + // Mock ingestProposalSync to capture default aspects |
| 671 | + ArgumentCaptor<AspectsBatch> batchCaptor = ArgumentCaptor.forClass(AspectsBatch.class); |
| 672 | + doReturn(Stream.empty()) |
| 673 | + .when(entityServiceSpy) |
| 674 | + .ingestProposalSync(any(OperationContext.class), batchCaptor.capture()); |
| 675 | + |
| 676 | + // Create RestoreIndicesArgs with createDefaultAspects set to true |
| 677 | + RestoreIndicesArgs args = |
| 678 | + new RestoreIndicesArgs() |
| 679 | + .start(0) |
| 680 | + .limit(100) |
| 681 | + .batchSize(50) |
| 682 | + .batchDelayMs(0L) |
| 683 | + .createDefaultAspects(true); // Explicitly set to true |
| 684 | + |
| 685 | + // Execute the method under test |
| 686 | + List<RestoreIndicesResult> results = |
| 687 | + entityServiceSpy.restoreIndices(opContext, args, message -> {}); |
| 688 | + |
| 689 | + // Verify results |
| 690 | + assertNotNull(results); |
| 691 | + assertEquals(1, results.size()); |
| 692 | + |
| 693 | + // Verify MCL production |
| 694 | + verify(mockEventProducer) |
| 695 | + .produceMetadataChangeLog(any(OperationContext.class), any(), any(), any()); |
| 696 | + |
| 697 | + // Verify default aspect creation was attempted |
| 698 | + verify(entityServiceSpy).ingestProposalSync(any(OperationContext.class), any()); |
| 699 | + |
| 700 | + // Verify the captured batch contains aspects |
| 701 | + AspectsBatch capturedBatch = batchCaptor.getValue(); |
| 702 | + assertNotNull(capturedBatch); |
| 703 | + assertFalse(capturedBatch.getItems().isEmpty()); |
| 704 | + |
| 705 | + // Verify the defaultAspectsCreated count in results |
| 706 | + assertTrue(results.get(0).defaultAspectsCreated >= 0); |
| 707 | + } |
| 708 | + |
| 709 | + /** |
| 710 | + * Tests an end-to-end scenario with createDefaultAspects=false, ensuring that only aspects are |
| 711 | + * restored to the index without creating default aspects. |
| 712 | + */ |
| 713 | + @Test |
| 714 | + public void testRestoreIndicesEndToEndWithoutDefaultAspects() throws Exception { |
| 715 | + // Setup mock AspectDao |
| 716 | + AspectDao mockAspectDao = mock(AspectDao.class); |
| 717 | + PartitionedStream<EbeanAspectV2> mockStream = mock(PartitionedStream.class); |
| 718 | + |
| 719 | + // Create test aspects |
| 720 | + List<EbeanAspectV2> batch = new ArrayList<>(); |
| 721 | + |
| 722 | + // Dataset aspect |
| 723 | + EbeanAspectV2 datasetAspect = |
| 724 | + new EbeanAspectV2( |
| 725 | + "urn:li:dataset:(urn:li:dataPlatform:test,defaultAspectsTest,PROD)", |
| 726 | + STATUS_ASPECT_NAME, |
| 727 | + 0L, |
| 728 | + RecordUtils.toJsonString(new Status().setRemoved(false)), |
| 729 | + new Timestamp(System.currentTimeMillis()), |
| 730 | + TEST_AUDIT_STAMP.getActor().toString(), |
| 731 | + null, |
| 732 | + RecordUtils.toJsonString(SystemMetadataUtils.createDefaultSystemMetadata())); |
| 733 | + batch.add(datasetAspect); |
| 734 | + |
| 735 | + // Setup mock stream |
| 736 | + when(mockStream.partition(anyInt())).thenReturn(Stream.of(batch.stream())); |
| 737 | + when(mockAspectDao.streamAspectBatches(any())).thenReturn(mockStream); |
| 738 | + |
| 739 | + // Setup mock EventProducer |
| 740 | + EventProducer mockEventProducer = mock(EventProducer.class); |
| 741 | + when(mockEventProducer.produceMetadataChangeLog( |
| 742 | + any(OperationContext.class), any(), any(), any())) |
| 743 | + .thenReturn(CompletableFuture.completedFuture(null)); |
| 744 | + |
| 745 | + // Create EntityServiceImpl with mocks |
| 746 | + EntityServiceImpl entityServiceSpy = |
| 747 | + spy( |
| 748 | + new EntityServiceImpl( |
| 749 | + mockAspectDao, mockEventProducer, false, mock(PreProcessHooks.class), 0, true)); |
| 750 | + |
| 751 | + // Simply stub the method without capturing |
| 752 | + doReturn(Stream.empty()) |
| 753 | + .when(entityServiceSpy) |
| 754 | + .ingestProposalSync(any(OperationContext.class), any(AspectsBatch.class)); |
| 755 | + |
| 756 | + // Create RestoreIndicesArgs with createDefaultAspects set to false |
| 757 | + RestoreIndicesArgs args = |
| 758 | + new RestoreIndicesArgs() |
| 759 | + .start(0) |
| 760 | + .limit(100) |
| 761 | + .batchSize(50) |
| 762 | + .batchDelayMs(0L) |
| 763 | + .createDefaultAspects(false); // Explicitly set to false |
| 764 | + |
| 765 | + // Execute the method under test |
| 766 | + List<RestoreIndicesResult> results = |
| 767 | + entityServiceSpy.restoreIndices(opContext, args, message -> {}); |
| 768 | + |
| 769 | + // Verify results |
| 770 | + assertNotNull(results); |
| 771 | + assertEquals(1, results.size()); |
| 772 | + |
| 773 | + // Verify MCL production |
| 774 | + verify(mockEventProducer) |
| 775 | + .produceMetadataChangeLog(any(OperationContext.class), any(), any(), any()); |
| 776 | + |
| 777 | + // Verify default aspect creation was never attempted |
| 778 | + verify(entityServiceSpy, never()).ingestProposalSync(any(OperationContext.class), any()); |
| 779 | + |
| 780 | + // Don't try to use the capturedBatch since the method should never be called |
| 781 | + // Instead, verify the defaultAspectsCreated count in results is 0 |
| 782 | + assertEquals(0, results.get(0).defaultAspectsCreated); |
| 783 | + } |
| 784 | + |
| 785 | + /** |
| 786 | + * Tests the continuation behavior of restoreIndices when an exception occurs. It should continue |
| 787 | + * processing other aspects even if one fails. |
| 788 | + */ |
| 789 | + @Test |
| 790 | + public void testRestoreIndicesContinuationOnException() throws Exception { |
| 791 | + // Setup mock AspectDao |
| 792 | + AspectDao mockAspectDao = mock(AspectDao.class); |
| 793 | + PartitionedStream<EbeanAspectV2> mockStream = mock(PartitionedStream.class); |
| 794 | + |
| 795 | + // Create test aspects |
| 796 | + List<EbeanAspectV2> batch = new ArrayList<>(); |
| 797 | + |
| 798 | + // First aspect (will succeed) |
| 799 | + EbeanAspectV2 successAspect = |
| 800 | + new EbeanAspectV2( |
| 801 | + "urn:li:dataset:(urn:li:dataPlatform:test,success,PROD)", |
| 802 | + STATUS_ASPECT_NAME, |
| 803 | + 0L, |
| 804 | + RecordUtils.toJsonString(new Status().setRemoved(false)), |
| 805 | + new Timestamp(System.currentTimeMillis()), |
| 806 | + TEST_AUDIT_STAMP.getActor().toString(), |
| 807 | + null, |
| 808 | + RecordUtils.toJsonString(SystemMetadataUtils.createDefaultSystemMetadata())); |
| 809 | + batch.add(successAspect); |
| 810 | + |
| 811 | + // Second aspect (will fail) |
| 812 | + EbeanAspectV2 failAspect = |
| 813 | + new EbeanAspectV2( |
| 814 | + "urn:li:dataset:(urn:li:dataPlatform:test,fail,PROD)", |
| 815 | + STATUS_ASPECT_NAME, |
| 816 | + 0L, |
| 817 | + "INVALID_JSON", // This will cause deserialization to fail |
| 818 | + new Timestamp(System.currentTimeMillis()), |
| 819 | + TEST_AUDIT_STAMP.getActor().toString(), |
| 820 | + null, |
| 821 | + RecordUtils.toJsonString(SystemMetadataUtils.createDefaultSystemMetadata())); |
| 822 | + batch.add(failAspect); |
| 823 | + |
| 824 | + // Third aspect (will succeed) |
| 825 | + EbeanAspectV2 anotherSuccessAspect = |
| 826 | + new EbeanAspectV2( |
| 827 | + "urn:li:dataset:(urn:li:dataPlatform:test,anotherSuccess,PROD)", |
| 828 | + STATUS_ASPECT_NAME, |
| 829 | + 0L, |
| 830 | + RecordUtils.toJsonString(new Status().setRemoved(false)), |
| 831 | + new Timestamp(System.currentTimeMillis()), |
| 832 | + TEST_AUDIT_STAMP.getActor().toString(), |
| 833 | + null, |
| 834 | + RecordUtils.toJsonString(SystemMetadataUtils.createDefaultSystemMetadata())); |
| 835 | + batch.add(anotherSuccessAspect); |
| 836 | + |
| 837 | + // Setup mock stream to create two separate batches |
| 838 | + // This is the key change - we need to separate the failing aspect into its own batch |
| 839 | + when(mockStream.partition(anyInt())) |
| 840 | + .thenReturn( |
| 841 | + Stream.of( |
| 842 | + // First batch with success aspect |
| 843 | + Stream.of(successAspect), |
| 844 | + // Second batch with failing aspect |
| 845 | + Stream.of(failAspect), |
| 846 | + // Third batch with another success aspect |
| 847 | + Stream.of(anotherSuccessAspect))); |
| 848 | + |
| 849 | + when(mockAspectDao.streamAspectBatches(any())).thenReturn(mockStream); |
| 850 | + |
| 851 | + // Setup mock EventProducer |
| 852 | + EventProducer mockEventProducer = mock(EventProducer.class); |
| 853 | + when(mockEventProducer.produceMetadataChangeLog( |
| 854 | + any(OperationContext.class), |
| 855 | + argThat( |
| 856 | + urn -> |
| 857 | + urn.toString() |
| 858 | + .contains("success")), // Only succeed for URNs containing "success" |
| 859 | + any(), |
| 860 | + any())) |
| 861 | + .thenReturn(CompletableFuture.completedFuture(null)); |
| 862 | + |
| 863 | + // Create EntityServiceImpl with mocks |
| 864 | + EntityServiceImpl entityService = |
| 865 | + new EntityServiceImpl( |
| 866 | + mockAspectDao, mockEventProducer, false, mock(PreProcessHooks.class), 0, true); |
| 867 | + |
| 868 | + // Create RestoreIndicesArgs |
| 869 | + RestoreIndicesArgs args = |
| 870 | + new RestoreIndicesArgs() |
| 871 | + .start(0) |
| 872 | + .limit(100) |
| 873 | + .batchSize(1) // Process one aspect at a time |
| 874 | + .batchDelayMs(0L) |
| 875 | + .createDefaultAspects(false); // Avoid additional complexity |
| 876 | + |
| 877 | + // Execute the method under test |
| 878 | + List<RestoreIndicesResult> results = |
| 879 | + entityService.restoreIndices(opContext, args, message -> {}); |
| 880 | + |
| 881 | + // Verify results |
| 882 | + assertNotNull(results); |
| 883 | + // We should get exactly 2 results - one for each successful batch |
| 884 | + assertEquals(2, results.size()); |
| 885 | + |
| 886 | + // We expect to see rowsMigrated = 1 in each successful result |
| 887 | + assertEquals(1, results.get(0).rowsMigrated); |
| 888 | + assertEquals(1, results.get(1).rowsMigrated); |
| 889 | + |
| 890 | + // The failing aspect should not generate a result at all, as the implementation |
| 891 | + // filters out null results (batches that throw exceptions) |
| 892 | + |
| 893 | + // Verify total calls to produceMetadataChangeLog (one for each successful aspect) |
| 894 | + verify(mockEventProducer, times(2)) |
| 895 | + .produceMetadataChangeLog(any(OperationContext.class), any(), any(), any()); |
| 896 | + } |
619 | 897 | }
|
0 commit comments