Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 0152e91

Browse files
anshbansaleagle-25
authored andcommittedFeb 5, 2025
feat(ingest): add datahub apply source (datahub-project#12482)
1 parent acfdb9e commit 0152e91

File tree

4 files changed

+225
-64
lines changed

4 files changed

+225
-64
lines changed
 

‎metadata-ingestion/setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -743,6 +743,7 @@
743743
"looker = datahub.ingestion.source.looker.looker_source:LookerDashboardSource",
744744
"lookml = datahub.ingestion.source.looker.lookml_source:LookMLSource",
745745
"datahub-gc = datahub.ingestion.source.gc.datahub_gc:DataHubGcSource",
746+
"datahub-apply = datahub.ingestion.source.apply.datahub_apply:DataHubApplySource",
746747
"datahub-lineage-file = datahub.ingestion.source.metadata.lineage:LineageFileSource",
747748
"datahub-business-glossary = datahub.ingestion.source.metadata.business_glossary:BusinessGlossaryFileSource",
748749
"mlflow = datahub.ingestion.source.mlflow:MLflowSource",

‎metadata-ingestion/src/datahub/cli/container_cli.py

+1-64
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,8 @@
11
import logging
2-
from typing import Any, List
32

43
import click
5-
import progressbar
64

7-
from datahub.emitter.mcp import MetadataChangeProposalWrapper
8-
from datahub.ingestion.graph.client import get_default_graph
9-
from datahub.metadata.schema_classes import (
10-
DomainsClass,
11-
GlossaryTermAssociationClass,
12-
OwnerClass,
13-
OwnershipTypeClass,
14-
TagAssociationClass,
15-
)
16-
from datahub.specific.dataset import DatasetPatchBuilder
5+
from datahub.ingestion.source.apply.datahub_apply import apply_association_to_container
176

187
logger = logging.getLogger(__name__)
198

@@ -24,58 +13,6 @@ def container() -> None:
2413
pass
2514

2615

27-
def apply_association_to_container(
28-
container_urn: str,
29-
association_urn: str,
30-
association_type: str,
31-
) -> None:
32-
"""
33-
Common function to add either tags, terms, domains, or owners to child datasets (for now).
34-
35-
Args:
36-
container_urn: The URN of the container
37-
association_urn: The URN of the tag, term, or user to apply
38-
association_type: One of 'tag', 'term', 'domain' or 'owner'
39-
"""
40-
urns: List[str] = []
41-
graph = get_default_graph()
42-
logger.info(f"Using {graph}")
43-
urns.extend(
44-
graph.get_urns_by_filter(
45-
container=container_urn, batch_size=1000, entity_types=["dataset"]
46-
)
47-
)
48-
49-
all_patches: List[Any] = []
50-
for urn in urns:
51-
builder = DatasetPatchBuilder(urn)
52-
patches: List[Any] = []
53-
if association_type == "tag":
54-
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
55-
elif association_type == "term":
56-
patches = builder.add_term(
57-
GlossaryTermAssociationClass(association_urn)
58-
).build()
59-
elif association_type == "owner":
60-
patches = builder.add_owner(
61-
OwnerClass(
62-
owner=association_urn,
63-
type=OwnershipTypeClass.TECHNICAL_OWNER,
64-
)
65-
).build()
66-
elif association_type == "domain":
67-
patches = [
68-
MetadataChangeProposalWrapper(
69-
entityUrn=urn,
70-
aspect=DomainsClass(domains=[association_urn]),
71-
)
72-
]
73-
all_patches.extend(patches)
74-
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
75-
for mcp in mcps_iter:
76-
graph.emit(mcp)
77-
78-
7916
@container.command()
8017
@click.option("--container-urn", required=True, type=str)
8118
@click.option("--tag-urn", required=True, type=str)

‎metadata-ingestion/src/datahub/ingestion/source/apply/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
import logging
2+
from functools import partial
3+
from typing import Any, Iterable, List, Optional, Union
4+
5+
import progressbar
6+
from pydantic import Field
7+
8+
from datahub.configuration.common import ConfigModel
9+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
10+
from datahub.ingestion.api.common import PipelineContext
11+
from datahub.ingestion.api.decorators import (
12+
SupportStatus,
13+
config_class,
14+
platform_name,
15+
support_status,
16+
)
17+
from datahub.ingestion.api.source import MetadataWorkUnitProcessor, Source, SourceReport
18+
from datahub.ingestion.api.source_helpers import auto_workunit_reporter
19+
from datahub.ingestion.api.workunit import MetadataWorkUnit
20+
from datahub.ingestion.graph.client import DataHubGraph, get_default_graph
21+
from datahub.metadata.schema_classes import (
22+
DomainsClass,
23+
GlossaryTermAssociationClass,
24+
MetadataChangeProposalClass,
25+
OwnerClass,
26+
OwnershipTypeClass,
27+
TagAssociationClass,
28+
)
29+
from datahub.specific.dataset import DatasetPatchBuilder
30+
31+
logger = logging.getLogger(__name__)
32+
33+
34+
def apply_association_to_container(
35+
container_urn: str,
36+
association_urn: str,
37+
association_type: str,
38+
emit: bool = True,
39+
graph: Optional[DataHubGraph] = None,
40+
) -> Optional[List[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]]:
41+
"""
42+
Common function to add either tags, terms, domains, or owners to child datasets (for now).
43+
44+
Args:
45+
container_urn: The URN of the container
46+
association_urn: The URN of the tag, term, or user to apply
47+
association_type: One of 'tag', 'term', 'domain' or 'owner'
48+
"""
49+
urns: List[str] = [container_urn]
50+
if not graph:
51+
graph = get_default_graph()
52+
logger.info(f"Using {graph}")
53+
urns.extend(
54+
graph.get_urns_by_filter(
55+
container=container_urn,
56+
batch_size=1000,
57+
entity_types=["dataset", "container"],
58+
)
59+
)
60+
61+
all_patches: List[Any] = []
62+
for urn in urns:
63+
builder = DatasetPatchBuilder(urn)
64+
patches: List[Any] = []
65+
if association_type == "tag":
66+
patches = builder.add_tag(TagAssociationClass(association_urn)).build()
67+
elif association_type == "term":
68+
patches = builder.add_term(
69+
GlossaryTermAssociationClass(association_urn)
70+
).build()
71+
elif association_type == "owner":
72+
patches = builder.add_owner(
73+
OwnerClass(
74+
owner=association_urn,
75+
type=OwnershipTypeClass.TECHNICAL_OWNER,
76+
)
77+
).build()
78+
elif association_type == "domain":
79+
patches = [
80+
MetadataChangeProposalWrapper(
81+
entityUrn=urn,
82+
aspect=DomainsClass(domains=[association_urn]),
83+
)
84+
]
85+
all_patches.extend(patches)
86+
if emit:
87+
mcps_iter = progressbar.progressbar(all_patches, redirect_stdout=True)
88+
for mcp in mcps_iter:
89+
graph.emit(mcp)
90+
return None
91+
else:
92+
return all_patches
93+
94+
95+
class DomainApplyConfig(ConfigModel):
96+
assets: List[str] = Field(
97+
default_factory=list,
98+
description="List of assets to apply domain hierarchichaly. Currently only containers and datasets are supported",
99+
)
100+
domain_urn: str = Field(default="")
101+
102+
103+
class TagApplyConfig(ConfigModel):
104+
assets: List[str] = Field(
105+
default_factory=list,
106+
description="List of assets to apply tag hierarchichaly. Currently only containers and datasets are supported",
107+
)
108+
tag_urn: str = Field(default="")
109+
110+
111+
class TermApplyConfig(ConfigModel):
112+
assets: List[str] = Field(
113+
default_factory=list,
114+
description="List of assets to apply term hierarchichaly. Currently only containers and datasets are supported",
115+
)
116+
term_urn: str = Field(default="")
117+
118+
119+
class OwnerApplyConfig(ConfigModel):
120+
assets: List[str] = Field(
121+
default_factory=list,
122+
description="List of assets to apply owner hierarchichaly. Currently only containers and datasets are supported",
123+
)
124+
owner_urn: str = Field(default="")
125+
126+
127+
class DataHubApplyConfig(ConfigModel):
128+
domain_apply: Optional[List[DomainApplyConfig]] = Field(
129+
default=None,
130+
description="List to apply domains to assets",
131+
)
132+
tag_apply: Optional[List[TagApplyConfig]] = Field(
133+
default=None,
134+
description="List to apply tags to assets",
135+
)
136+
term_apply: Optional[List[TermApplyConfig]] = Field(
137+
default=None,
138+
description="List to apply terms to assets",
139+
)
140+
owner_apply: Optional[List[OwnerApplyConfig]] = Field(
141+
default=None,
142+
description="List to apply owners to assets",
143+
)
144+
145+
146+
@platform_name("DataHubApply")
147+
@config_class(DataHubApplyConfig)
148+
@support_status(SupportStatus.TESTING)
149+
class DataHubApplySource(Source):
150+
"""
151+
This source is a helper over CLI
152+
so people can use the helper to apply various metadata changes to DataHub
153+
via Managed Ingestion
154+
"""
155+
156+
def __init__(self, ctx: PipelineContext, config: DataHubApplyConfig):
157+
self.ctx = ctx
158+
self.config = config
159+
self.report = SourceReport()
160+
self.graph = ctx.require_graph()
161+
162+
def _yield_workunits(
163+
self,
164+
proposals: List[
165+
Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]
166+
],
167+
) -> Iterable[MetadataWorkUnit]:
168+
for proposal in proposals:
169+
if isinstance(proposal, MetadataChangeProposalWrapper):
170+
yield proposal.as_workunit()
171+
else:
172+
yield MetadataWorkUnit(
173+
id=MetadataWorkUnit.generate_workunit_id(proposal),
174+
mcp_raw=proposal,
175+
)
176+
177+
def _handle_assets(
178+
self, assets: List[str], apply_urn: str, apply_type: str
179+
) -> Iterable[MetadataWorkUnit]:
180+
for asset in assets:
181+
change_proposals = apply_association_to_container(
182+
asset, apply_urn, apply_type, emit=False, graph=self.graph
183+
)
184+
assert change_proposals is not None
185+
yield from self._yield_workunits(change_proposals)
186+
187+
def _yield_domain(self) -> Iterable[MetadataWorkUnit]:
188+
if not self.config.domain_apply:
189+
return
190+
for apply in self.config.domain_apply:
191+
yield from self._handle_assets(apply.assets, apply.domain_urn, "domain")
192+
193+
def _yield_tag(self) -> Iterable[MetadataWorkUnit]:
194+
if not self.config.tag_apply:
195+
return
196+
for apply in self.config.tag_apply:
197+
yield from self._handle_assets(apply.assets, apply.tag_urn, "tag")
198+
199+
def _yield_term(self) -> Iterable[MetadataWorkUnit]:
200+
if not self.config.term_apply:
201+
return
202+
for apply in self.config.term_apply:
203+
yield from self._handle_assets(apply.assets, apply.term_urn, "term")
204+
205+
def _yield_owner(self) -> Iterable[MetadataWorkUnit]:
206+
if not self.config.owner_apply:
207+
return
208+
for apply in self.config.owner_apply:
209+
yield from self._handle_assets(apply.assets, apply.owner_urn, "owner")
210+
211+
def get_workunits_internal(
212+
self,
213+
) -> Iterable[MetadataWorkUnit]:
214+
yield from self._yield_domain()
215+
yield from self._yield_tag()
216+
yield from self._yield_term()
217+
yield from self._yield_owner()
218+
219+
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
220+
return [partial(auto_workunit_reporter, self.get_report())]
221+
222+
def get_report(self) -> SourceReport:
223+
return self.report

0 commit comments

Comments
 (0)
Please sign in to comment.