Skip to content

Commit d259e91

Browse files
committed
Adding a new workunit processor to check correctness of an aspect
1 parent 3c388a5 commit d259e91

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

metadata-ingestion/src/datahub/ingestion/api/source_helpers.py

+19
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,25 @@ def create_dataset_props_patch_builder(
7878
return patch_builder
7979

8080

81+
def check_mcp_correctness(mcp: MetadataChangeProposalClass):
82+
logger.debug(f"Processing as MCP with urn: {mcp.entityUrn} and aspect: {mcp.aspectName}, change type: {mcp.changeType}")
83+
logger.debug(f"{mcp.aspect}")
84+
85+
86+
def check_mcpw_correctness(mcp: MetadataChangeProposalWrapper):
87+
logger.debug(f"Processing as MCP with urn: {mcp.entityUrn} and aspect: {mcp.aspectName}, change type: {mcp.changeType}")
88+
logger.debug(f"{mcp.aspect}")
89+
90+
91+
def check_workunit_correctness(stream: Iterable[MetadataWorkUnit]) -> Iterable[MetadataWorkUnit]:
92+
for wu in stream:
93+
logger.debug(f"Checking correctnes for workunit: {wu.id}")
94+
if isinstance(wu.metadata, MetadataChangeProposalClass):
95+
check_mcp_correctness(wu.metadata)
96+
elif isinstance(wu.metadata, MetadataChangeProposalWrapper):
97+
check_mcpw_correctness(wu.metadata)
98+
99+
81100
def create_dataset_owners_patch_builder(
82101
dataset_urn: str,
83102
ownership: Ownership,

metadata-ingestion/src/datahub/ingestion/source/unity/source.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
)
4343
from datahub.ingestion.api.source_helpers import (
4444
create_dataset_owners_patch_builder,
45-
create_dataset_props_patch_builder,
45+
create_dataset_props_patch_builder, check_workunit_correctness,
4646
)
4747
from datahub.ingestion.api.workunit import MetadataWorkUnit
4848
from datahub.ingestion.source.aws import s3_util
@@ -260,6 +260,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
260260
StaleEntityRemovalHandler.create(
261261
self, self.config, self.ctx
262262
).workunit_processor,
263+
check_workunit_correctness
263264
]
264265

265266
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

0 commit comments

Comments
 (0)