Skip to content

Commit 6c74e90

Browse files
committed
fix tranformer
1 parent 53c7790 commit 6c74e90

File tree

3 files changed

+45
-15
lines changed

3 files changed

+45
-15
lines changed

metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
def _update_work_unit_id(
2121
envelope: RecordEnvelope, urn: str, aspect_name: str
2222
) -> Dict[Any, Any]:
23-
structured_urn = Urn.create_from_string(urn)
24-
simple_name = "-".join(structured_urn.get_entity_id())
23+
structured_urn = Urn.from_string(urn)
24+
simple_name = "-".join(structured_urn.entity_ids)
2525
record_metadata = envelope.metadata.copy()
2626
record_metadata.update({"workunit_id": f"txform-{simple_name}-{aspect_name}"})
2727
return record_metadata

metadata-ingestion/src/datahub/ingestion/transformer/extract_ownership_from_tags.py

+35-11
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
1+
import logging
12
import re
23
from functools import lru_cache
3-
from typing import List, Optional, cast
4+
from typing import List, Optional, Sequence, Union, cast
45

56
from datahub.configuration.common import TransformerSemanticsConfigModel
67
from datahub.emitter.mce_builder import Aspect
8+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
79
from datahub.ingestion.api.common import PipelineContext
810
from datahub.ingestion.transformer.dataset_transformer import DatasetTagsTransformer
11+
from datahub.metadata._schema_classes import MetadataChangeProposalClass
912
from datahub.metadata.schema_classes import (
1013
GlobalTagsClass,
1114
OwnerClass,
@@ -16,6 +19,8 @@
1619
from datahub.utilities.urns.corpuser_urn import CorpuserUrn
1720
from datahub.utilities.urns.tag_urn import TagUrn
1821

22+
logger = logging.getLogger(__name__)
23+
1924

2025
class ExtractOwnersFromTagsConfig(TransformerSemanticsConfigModel):
2126
tag_prefix: str
@@ -38,11 +43,13 @@ class ExtractOwnersFromTagsTransformer(DatasetTagsTransformer):
3843

3944
ctx: PipelineContext
4045
config: ExtractOwnersFromTagsConfig
46+
owner_mcps: List[MetadataChangeProposalWrapper]
4147

4248
def __init__(self, config: ExtractOwnersFromTagsConfig, ctx: PipelineContext):
4349
super().__init__()
4450
self.ctx = ctx
4551
self.config = config
52+
self.owner_mcps = []
4653

4754
@classmethod
4855
def create(
@@ -56,6 +63,12 @@ def get_owner_urn(self, owner_str: str) -> str:
5663
return owner_str + "@" + self.config.email_domain
5764
return owner_str
5865

66+
def handle_end_of_stream(
67+
self,
68+
) -> Sequence[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:
69+
70+
return self.owner_mcps
71+
5972
def transform_aspect(
6073
self, entity_urn: str, aspect_name: str, aspect: Optional[Aspect]
6174
) -> Optional[Aspect]:
@@ -64,28 +77,39 @@ def transform_aspect(
6477
return None
6578
tags = in_tags_aspect.tags
6679
owners: List[OwnerClass] = []
80+
6781
for tag_class in tags:
6882
tag_urn = TagUrn.from_string(tag_class.tag)
69-
tag_str = tag_urn.get_entity_id()[0]
83+
tag_str = tag_urn.entity_ids[0]
7084
re_match = re.search(self.config.tag_prefix, tag_str)
7185
if re_match:
7286
owner_str = tag_str[re_match.end() :].strip()
7387
owner_urn_str = self.get_owner_urn(owner_str)
7488
if self.config.is_user:
75-
owner_urn = str(CorpuserUrn.create_from_id(owner_urn_str))
89+
owner_urn = str(CorpuserUrn(owner_urn_str))
7690
else:
77-
owner_urn = str(CorpGroupUrn.create_from_id(owner_urn_str))
91+
owner_urn = str(CorpGroupUrn(owner_urn_str))
7892
owner_type = get_owner_type(self.config.owner_type)
7993
if owner_type == OwnershipTypeClass.CUSTOM:
8094
assert (
8195
self.config.owner_type_urn is not None
8296
), "owner_type_urn must be set if owner_type is CUSTOM"
83-
owner = OwnerClass(
84-
owner=owner_urn,
85-
type=owner_type,
86-
typeUrn=self.config.owner_type_urn,
97+
98+
owners.append(
99+
OwnerClass(
100+
owner=owner_urn,
101+
type=owner_type,
102+
typeUrn=self.config.owner_type_urn,
103+
)
87104
)
88-
owners.append(owner)
89105

90-
owner_aspect = OwnershipClass(owners=owners)
91-
return cast(Aspect, owner_aspect)
106+
self.owner_mcps.append(
107+
MetadataChangeProposalWrapper(
108+
entityUrn=entity_urn,
109+
aspect=OwnershipClass(
110+
owners=owners,
111+
),
112+
)
113+
)
114+
115+
return None

metadata-ingestion/tests/unit/test_transform_dataset.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -648,18 +648,24 @@ def _test_owner(
648648
)
649649
]
650650
)
651+
651652
transformer = ExtractOwnersFromTagsTransformer.create(
652653
config,
653654
PipelineContext(run_id="test"),
654655
)
655-
transformed = list(
656+
657+
list(
656658
transformer.transform(
657659
[
658660
RecordEnvelope(dataset, metadata={}),
659661
]
660662
)
661663
)
662-
owners_aspect = transformed[0].record.proposedSnapshot.aspects[0]
664+
665+
mcp: MetadataChangeProposalWrapper = cast(
666+
MetadataChangeProposalWrapper, transformer.handle_end_of_stream()[0]
667+
)
668+
owners_aspect = cast(OwnershipClass, mcp.aspect)
663669
owners = owners_aspect.owners
664670
owner = owners[0]
665671
if expected_owner_type is not None:

0 commit comments

Comments
 (0)