Skip to content

Commit a18621e

Browse files
committed
refactoring to use MCP so Subtypes is supported
1 parent b76a237 commit a18621e

File tree

2 files changed

+290
-165
lines changed

2 files changed

+290
-165
lines changed

metadata-ingestion/src/datahub/ingestion/source/openapi.py

+65-36
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
import time
33
import warnings
44
from abc import ABC
5-
from typing import Dict, Iterable, Optional, Tuple
5+
from typing import Dict, Iterable, List, Optional, Tuple
66

77
from pydantic import validator
88
from pydantic.fields import Field
99

1010
from datahub.configuration.common import ConfigModel
1111
from datahub.emitter.mce_builder import make_tag_urn
12+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
1213
from datahub.ingestion.api.common import PipelineContext
1314
from datahub.ingestion.api.decorators import (
1415
SourceCapability,
@@ -33,8 +34,6 @@
3334
set_metadata,
3435
try_guessing,
3536
)
36-
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
37-
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
3837
from datahub.metadata.schema_classes import (
3938
AuditStampClass,
4039
DatasetPropertiesClass,
@@ -224,8 +223,9 @@ def report_bad_responses(self, status_code: int, type: str) -> None:
224223

225224
def init_dataset(
226225
self, endpoint_k: str, endpoint_dets: dict
227-
) -> Tuple[DatasetSnapshot, str]:
226+
) -> Tuple[str, str, List[MetadataWorkUnit]]:
228227
config = self.config
228+
workunits = []
229229

230230
dataset_name = endpoint_k[1:].replace("/", ".")
231231

@@ -235,22 +235,27 @@ def init_dataset(
235235
else:
236236
dataset_name = "root"
237237

238-
dataset_snapshot = DatasetSnapshot(
239-
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{config.name}.{dataset_name},PROD)",
240-
aspects=[],
241-
)
238+
dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{config.name}.{dataset_name},PROD)"
242239

243-
# adding description
244-
dataset_properties = DatasetPropertiesClass(
240+
# Create dataset properties aspect
241+
properties = DatasetPropertiesClass(
245242
description=endpoint_dets["description"], customProperties={}
246243
)
247-
dataset_snapshot.aspects.append(dataset_properties)
244+
wu = MetadataWorkUnit(
245+
id=dataset_name,
246+
mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties),
247+
)
248+
workunits.append(wu)
248249

249-
# adding tags
250+
# Create tags aspect
250251
tags_str = [make_tag_urn(t) for t in endpoint_dets["tags"]]
251252
tags_tac = [TagAssociationClass(t) for t in tags_str]
252253
gtc = GlobalTagsClass(tags_tac)
253-
dataset_snapshot.aspects.append(gtc)
254+
wu = MetadataWorkUnit(
255+
id=f"{dataset_name}-tags",
256+
mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=gtc),
257+
)
258+
workunits.append(wu)
254259

255260
# the link will appear in the "documentation"
256261
link_url = clean_url(config.url + self.url_basepath + endpoint_k)
@@ -262,19 +267,23 @@ def init_dataset(
262267
url=link_url, description=link_description, createStamp=creation
263268
)
264269
inst_memory = InstitutionalMemoryClass([link_metadata])
265-
dataset_snapshot.aspects.append(inst_memory)
270+
wu = MetadataWorkUnit(
271+
id=f"{dataset_name}-docs",
272+
mcp=MetadataChangeProposalWrapper(
273+
entityUrn=dataset_urn, aspect=inst_memory
274+
),
275+
)
276+
workunits.append(wu)
266277

267-
# Add API endpoint subtype
278+
# Create subtype aspect
268279
sub_types = SubTypesClass(typeNames=[DatasetSubTypes.API_ENDPOINT])
269-
dataset_snapshot.aspects.append(sub_types)
270-
271-
return dataset_snapshot, dataset_name
280+
wu = MetadataWorkUnit(
281+
id=f"{dataset_name}-subtype",
282+
mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=sub_types),
283+
)
284+
workunits.append(wu)
272285

273-
def build_wu(
274-
self, dataset_snapshot: DatasetSnapshot, dataset_name: str
275-
) -> ApiWorkUnit:
276-
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
277-
return ApiWorkUnit(id=dataset_name, mce=mce)
286+
return dataset_name, dataset_urn, workunits
278287

279288
def get_workunits_internal(self) -> Iterable[ApiWorkUnit]:
280289
config = self.config
@@ -300,16 +309,24 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]:
300309
if endpoint_k in config.ignore_endpoints:
301310
continue
302311

303-
dataset_snapshot, dataset_name = self.init_dataset(
312+
# Initialize dataset and get common aspects
313+
dataset_name, dataset_urn, workunits = self.init_dataset(
304314
endpoint_k, endpoint_dets
305315
)
316+
for wu in workunits:
317+
yield wu
306318

307-
# adding dataset fields
319+
# Handle schema metadata if available
308320
if "data" in endpoint_dets.keys():
309321
# we are lucky! data is defined in the swagger for this endpoint
310322
schema_metadata = set_metadata(dataset_name, endpoint_dets["data"])
311-
dataset_snapshot.aspects.append(schema_metadata)
312-
yield self.build_wu(dataset_snapshot, dataset_name)
323+
wu = MetadataWorkUnit(
324+
id=f"{dataset_name}-schema",
325+
mcp=MetadataChangeProposalWrapper(
326+
entityUrn=dataset_urn, aspect=schema_metadata
327+
),
328+
)
329+
yield wu
313330
elif endpoint_dets["method"] != "get":
314331
self.report.report_warning(
315332
title="Failed to Extract Endpoint Metadata",
@@ -344,9 +361,13 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]:
344361
context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}",
345362
)
346363
schema_metadata = set_metadata(dataset_name, fields2add)
347-
dataset_snapshot.aspects.append(schema_metadata)
348-
349-
yield self.build_wu(dataset_snapshot, dataset_name)
364+
wu = MetadataWorkUnit(
365+
id=f"{dataset_name}-schema",
366+
mcp=MetadataChangeProposalWrapper(
367+
entityUrn=dataset_urn, aspect=schema_metadata
368+
),
369+
)
370+
yield wu
350371
else:
351372
self.report_bad_responses(response.status_code, type=endpoint_k)
352373
else:
@@ -375,9 +396,13 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]:
375396
context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}",
376397
)
377398
schema_metadata = set_metadata(dataset_name, fields2add)
378-
dataset_snapshot.aspects.append(schema_metadata)
379-
380-
yield self.build_wu(dataset_snapshot, dataset_name)
399+
wu = MetadataWorkUnit(
400+
id=f"{dataset_name}-schema",
401+
mcp=MetadataChangeProposalWrapper(
402+
entityUrn=dataset_urn, aspect=schema_metadata
403+
),
404+
)
405+
yield wu
381406
else:
382407
self.report_bad_responses(response.status_code, type=endpoint_k)
383408
else:
@@ -406,9 +431,13 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]:
406431
context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}",
407432
)
408433
schema_metadata = set_metadata(dataset_name, fields2add)
409-
dataset_snapshot.aspects.append(schema_metadata)
410-
411-
yield self.build_wu(dataset_snapshot, dataset_name)
434+
wu = MetadataWorkUnit(
435+
id=f"{dataset_name}-schema",
436+
mcp=MetadataChangeProposalWrapper(
437+
entityUrn=dataset_urn, aspect=schema_metadata
438+
),
439+
)
440+
yield wu
412441
else:
413442
self.report_bad_responses(response.status_code, type=endpoint_k)
414443

0 commit comments

Comments
 (0)