|
3 | 3 |
|
4 | 4 | import pytest
|
5 | 5 |
|
| 6 | +import datahub.metadata.schema_classes as models |
6 | 7 | from datahub.emitter.mce_builder import make_dashboard_urn
|
7 | 8 | from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
8 | 9 | from datahub.emitter.serialization_helper import pre_json_transform
|
|
12 | 13 | ChangeAuditStampsClass,
|
13 | 14 | DashboardInfoClass,
|
14 | 15 | )
|
| 16 | +from datahub.metadata.urns import MlModelUrn |
15 | 17 | from tests.consistency_utils import wait_for_writes_to_sync
|
16 | 18 | from tests.restli.restli_test import MetadataChangeProposalInvalidWrapper
|
17 | 19 | from tests.utils import delete_urns
|
18 | 20 |
|
19 | 21 | generated_urns: List[str] = []
|
20 | 22 |
|
21 | 23 |
|
22 |
| -@pytest.fixture(scope="module") |
| 24 | +@pytest.fixture(scope="module", autouse=True) |
23 | 25 | def ingest_cleanup_data(auth_session, graph_client, request):
|
24 | 26 | yield
|
25 | 27 | delete_urns(graph_client, generated_urns)
|
@@ -84,6 +86,29 @@ def _create_invalid_dashboard_mcp() -> MetadataChangeProposalClass:
|
84 | 86 | return mcp_invalid.make_mcp()
|
85 | 87 |
|
86 | 88 |
|
| 89 | +def _create_invalid_dataset_mcps() -> List[MetadataChangeProposalWrapper]: |
| 90 | + dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:kafka,my_dataset,PROD)" |
| 91 | + model_urn = MlModelUrn("mlflow", "my_model", "PROD").urn() |
| 92 | + bad_mcps = [ |
| 93 | + MetadataChangeProposalWrapper( |
| 94 | + entityUrn=dataset_urn, |
| 95 | + aspect=models.StatusClass(removed=False), |
| 96 | + ), |
| 97 | + MetadataChangeProposalWrapper( |
| 98 | + entityUrn=dataset_urn, |
| 99 | + aspect=models.UpstreamLineageClass( |
| 100 | + upstreams=[ |
| 101 | + models.UpstreamClass( |
| 102 | + dataset=model_urn, |
| 103 | + type=models.DatasetLineageTypeClass.TRANSFORMED, |
| 104 | + ) |
| 105 | + ] |
| 106 | + ), |
| 107 | + ), |
| 108 | + ] |
| 109 | + return bad_mcps |
| 110 | + |
| 111 | + |
87 | 112 | def test_restli_batch_ingestion_sync(graph_client):
|
88 | 113 | # Positive Test (all valid MetadataChangeProposal)
|
89 | 114 | mcps = _create_valid_dashboard_mcps()
|
@@ -133,3 +158,30 @@ def test_restli_batch_ingestion_async(graph_client):
|
133 | 158 | assert aspect.title == "Dummy Title For Testing"
|
134 | 159 | assert aspect.description == "Dummy Description For Testing"
|
135 | 160 | assert aspect.lastModified is not None
|
| 161 | + |
| 162 | + |
| 163 | +def test_restli_batch_ingestion_exception_sync(graph_client): |
| 164 | + """ |
| 165 | + Test Batch ingestion when an exception occurs in sync mode |
| 166 | + """ |
| 167 | + bad_mcps = _create_invalid_dataset_mcps() |
| 168 | + generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn]) |
| 169 | + |
| 170 | + try: |
| 171 | + graph_client.emit_mcps(bad_mcps, async_flag=False) |
| 172 | + raise AssertionError("should have thrown an exception") |
| 173 | + except Exception as e: |
| 174 | + if isinstance(e, AssertionError): |
| 175 | + raise e |
| 176 | + print(f"Error emitting MCPs due to {e}") |
| 177 | + |
| 178 | + |
| 179 | +def test_restli_batch_ingestion_exception_async(graph_client): |
| 180 | + """ |
| 181 | + Test Batch ingestion when an exception occurs in async mode |
| 182 | + """ |
| 183 | + bad_mcps = _create_invalid_dataset_mcps() |
| 184 | + generated_urns.extend([mcp.entityUrn for mcp in bad_mcps if mcp.entityUrn]) |
| 185 | + # TODO expectation is that it throws exception, but it doesn't currently.this test case need to change after fix. |
| 186 | + ret = graph_client.emit_mcps(bad_mcps, async_flag=True) |
| 187 | + assert ret >= 0 |
0 commit comments