1
+ import logging
1
2
import re
2
3
from functools import lru_cache
3
- from typing import List , Optional , cast
4
+ from typing import List , Optional , Sequence , Union , cast
4
5
5
6
from datahub .configuration .common import TransformerSemanticsConfigModel
6
7
from datahub .emitter .mce_builder import Aspect
8
+ from datahub .emitter .mcp import MetadataChangeProposalWrapper
7
9
from datahub .ingestion .api .common import PipelineContext
8
10
from datahub .ingestion .transformer .dataset_transformer import DatasetTagsTransformer
11
+ from datahub .metadata ._schema_classes import MetadataChangeProposalClass
9
12
from datahub .metadata .schema_classes import (
10
13
GlobalTagsClass ,
11
14
OwnerClass ,
16
19
from datahub .utilities .urns .corpuser_urn import CorpuserUrn
17
20
from datahub .utilities .urns .tag_urn import TagUrn
18
21
22
+ logger = logging .getLogger (__name__ )
23
+
19
24
20
25
class ExtractOwnersFromTagsConfig (TransformerSemanticsConfigModel ):
21
26
tag_prefix : str
@@ -38,11 +43,13 @@ class ExtractOwnersFromTagsTransformer(DatasetTagsTransformer):
38
43
39
44
ctx : PipelineContext
40
45
config : ExtractOwnersFromTagsConfig
46
+ owner_mcps : List [MetadataChangeProposalWrapper ]
41
47
42
48
def __init__ (self , config : ExtractOwnersFromTagsConfig , ctx : PipelineContext ):
43
49
super ().__init__ ()
44
50
self .ctx = ctx
45
51
self .config = config
52
+ self .owner_mcps = []
46
53
47
54
@classmethod
48
55
def create (
@@ -56,6 +63,12 @@ def get_owner_urn(self, owner_str: str) -> str:
56
63
return owner_str + "@" + self .config .email_domain
57
64
return owner_str
58
65
66
+ def handle_end_of_stream (
67
+ self ,
68
+ ) -> Sequence [Union [MetadataChangeProposalWrapper , MetadataChangeProposalClass ]]:
69
+
70
+ return self .owner_mcps
71
+
59
72
def transform_aspect (
60
73
self , entity_urn : str , aspect_name : str , aspect : Optional [Aspect ]
61
74
) -> Optional [Aspect ]:
@@ -64,28 +77,39 @@ def transform_aspect(
64
77
return None
65
78
tags = in_tags_aspect .tags
66
79
owners : List [OwnerClass ] = []
80
+
67
81
for tag_class in tags :
68
82
tag_urn = TagUrn .from_string (tag_class .tag )
69
- tag_str = tag_urn .get_entity_id () [0 ]
83
+ tag_str = tag_urn .entity_ids [0 ]
70
84
re_match = re .search (self .config .tag_prefix , tag_str )
71
85
if re_match :
72
86
owner_str = tag_str [re_match .end () :].strip ()
73
87
owner_urn_str = self .get_owner_urn (owner_str )
74
88
if self .config .is_user :
75
- owner_urn = str (CorpuserUrn . create_from_id (owner_urn_str ))
89
+ owner_urn = str (CorpuserUrn (owner_urn_str ))
76
90
else :
77
- owner_urn = str (CorpGroupUrn . create_from_id (owner_urn_str ))
91
+ owner_urn = str (CorpGroupUrn (owner_urn_str ))
78
92
owner_type = get_owner_type (self .config .owner_type )
79
93
if owner_type == OwnershipTypeClass .CUSTOM :
80
94
assert (
81
95
self .config .owner_type_urn is not None
82
96
), "owner_type_urn must be set if owner_type is CUSTOM"
83
- owner = OwnerClass (
84
- owner = owner_urn ,
85
- type = owner_type ,
86
- typeUrn = self .config .owner_type_urn ,
97
+
98
+ owners .append (
99
+ OwnerClass (
100
+ owner = owner_urn ,
101
+ type = owner_type ,
102
+ typeUrn = self .config .owner_type_urn ,
103
+ )
87
104
)
88
- owners .append (owner )
89
105
90
- owner_aspect = OwnershipClass (owners = owners )
91
- return cast (Aspect , owner_aspect )
106
+ self .owner_mcps .append (
107
+ MetadataChangeProposalWrapper (
108
+ entityUrn = entity_urn ,
109
+ aspect = OwnershipClass (
110
+ owners = owners ,
111
+ ),
112
+ )
113
+ )
114
+
115
+ return None
0 commit comments