Skip to content

Commit c7737ec

Browse files
committed
Update hive.py
1 parent 840bbeb commit c7737ec

File tree

1 file changed

+71
-84
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/sql

1 file changed

+71
-84
lines changed

metadata-ingestion/src/datahub/ingestion/source/sql/hive.py

+71-84
Original file line numberDiff line numberDiff line change
@@ -301,13 +301,11 @@ def _get_fine_grained_lineages(
301301
storage_urn: str,
302302
dataset_schema: SchemaMetadataClass,
303303
storage_schema: SchemaMetadataClass,
304-
) -> Optional[List[FineGrainedLineageClass]]:
304+
) -> Iterable[FineGrainedLineageClass]:
305305
"""Generate column-level lineage between dataset and storage"""
306306

307307
if not self.config.include_column_lineage:
308-
return None
309-
310-
fine_grained_lineages: List[FineGrainedLineageClass] = []
308+
return
311309

312310
for dataset_field in dataset_schema.fields:
313311
dataset_path = dataset_field.fieldPath
@@ -320,82 +318,82 @@ def _get_fine_grained_lineages(
320318

321319
if matching_field:
322320
if self.config.hive_storage_lineage_direction == "upstream":
323-
fine_grained_lineages.append(
324-
FineGrainedLineageClass(
325-
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
326-
upstreams=[
327-
make_schema_field_urn(
328-
parent_urn=storage_urn,
329-
field_path=matching_field.fieldPath,
330-
)
331-
],
332-
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
333-
downstreams=[
334-
make_schema_field_urn(
335-
parent_urn=dataset_urn,
336-
field_path=dataset_path,
337-
)
338-
],
339-
)
321+
yield FineGrainedLineageClass(
322+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
323+
upstreams=[
324+
make_schema_field_urn(
325+
parent_urn=storage_urn,
326+
field_path=matching_field.fieldPath,
327+
)
328+
],
329+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
330+
downstreams=[
331+
make_schema_field_urn(
332+
parent_urn=dataset_urn,
333+
field_path=dataset_path,
334+
)
335+
],
340336
)
341337
else:
342-
fine_grained_lineages.append(
343-
FineGrainedLineageClass(
344-
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
345-
upstreams=[
346-
make_schema_field_urn(
347-
parent_urn=dataset_urn,
348-
field_path=dataset_path,
349-
)
350-
],
351-
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
352-
downstreams=[
353-
make_schema_field_urn(
354-
parent_urn=storage_urn,
355-
field_path=matching_field.fieldPath,
356-
)
357-
],
358-
)
338+
yield FineGrainedLineageClass(
339+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
340+
upstreams=[
341+
make_schema_field_urn(
342+
parent_urn=dataset_urn,
343+
field_path=dataset_path,
344+
)
345+
],
346+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
347+
downstreams=[
348+
make_schema_field_urn(
349+
parent_urn=storage_urn,
350+
field_path=matching_field.fieldPath,
351+
)
352+
],
359353
)
360354

361-
return fine_grained_lineages if fine_grained_lineages else None
362-
363355
def _create_lineage_mcp(
364356
self,
365357
source_urn: str,
366358
target_urn: str,
367-
fine_grained_lineages: Optional[List[FineGrainedLineageClass]] = None,
368-
) -> MetadataChangeProposalWrapper:
359+
fine_grained_lineages: Optional[Iterable[FineGrainedLineageClass]] = None,
360+
) -> Iterable[MetadataWorkUnit]:
369361
"""Create lineage MCP between source and target datasets"""
370362

363+
lineages_list = (
364+
list(fine_grained_lineages) if fine_grained_lineages is not None else None
365+
)
366+
371367
upstream_lineage = UpstreamLineageClass(
372368
upstreams=[
373369
UpstreamClass(dataset=source_urn, type=DatasetLineageTypeClass.COPY)
374370
],
375-
fineGrainedLineages=fine_grained_lineages,
371+
fineGrainedLineages=lineages_list,
376372
)
377373

378-
return MetadataChangeProposalWrapper(
379-
entityUrn=target_urn, aspect=upstream_lineage
374+
yield MetadataWorkUnit(
375+
id=f"{source_urn}-{target_urn}-lineage",
376+
mcp=MetadataChangeProposalWrapper(
377+
entityUrn=target_urn, aspect=upstream_lineage
378+
),
380379
)
381380

382381
def get_storage_dataset_mcp(
383382
self,
384383
storage_location: str,
385384
platform_instance: Optional[str] = None,
386385
schema_metadata: Optional[SchemaMetadataClass] = None,
387-
) -> Optional[List[MetadataChangeProposalWrapper]]:
386+
) -> Iterable[MetadataWorkUnit]:
388387
"""
389388
Generate MCPs for storage dataset if needed.
390389
This creates the storage dataset entity in DataHub.
391390
"""
392391

393-
platform_instance = None
394392
storage_info = StoragePathParser.parse_storage_location(
395393
storage_location,
396394
)
397395
if not storage_info:
398-
return None
396+
return
399397

400398
platform, path = storage_info
401399
platform_name = StoragePathParser.get_platform_name(platform)
@@ -414,52 +412,50 @@ def get_storage_dataset_mcp(
414412
platform_instance=platform_instance,
415413
)
416414

417-
mcps = []
418-
415+
# Dataset properties
419416
props = DatasetPropertiesClass(name=path)
420-
421-
mcps.append(
422-
MetadataChangeProposalWrapper(
417+
yield MetadataWorkUnit(
418+
id=f"storage-{storage_urn}-props",
419+
mcp=MetadataChangeProposalWrapper(
423420
entityUrn=storage_urn,
424421
aspect=props,
425-
)
422+
),
426423
)
427424

428-
# Add platform instance
425+
# Platform instance
429426
platform_instance_aspect = self._make_dataset_platform_instance(
430427
platform=platform_name,
431428
instance=platform_instance,
432429
)
433-
434-
mcps.append(
435-
MetadataChangeProposalWrapper(
430+
yield MetadataWorkUnit(
431+
id=f"storage-{storage_urn}-platform",
432+
mcp=MetadataChangeProposalWrapper(
436433
entityUrn=storage_urn, aspect=platform_instance_aspect
437-
)
434+
),
438435
)
439436

440-
# Add schema if available
437+
# Schema if available
441438
if schema_metadata:
442439
storage_schema = SchemaMetadataClass(
443440
schemaName=f"{platform.value}_schema",
444441
platform=f"urn:li:dataPlatform:{platform.value}",
445442
version=0,
446-
fields=schema_metadata.fields, # Use the same fields as the table
443+
fields=schema_metadata.fields,
447444
hash="",
448445
platformSchema=OtherSchemaClass(rawSchema=""),
449446
)
450-
451-
mcps.append(
452-
MetadataChangeProposalWrapper(
447+
yield MetadataWorkUnit(
448+
id=f"storage-{storage_urn}-schema",
449+
mcp=MetadataChangeProposalWrapper(
453450
entityUrn=storage_urn, aspect=storage_schema
454-
)
451+
),
455452
)
456453

457-
return mcps
458454
except Exception as e:
459455
logger.error(
460456
f"Failed to create storage dataset MCPs for {storage_location}: {e}"
461457
)
462-
return None
458+
return
463459

464460
def get_lineage_mcp(
465461
self,
@@ -502,17 +498,11 @@ def get_lineage_mcp(
502498
platform_instance = self.config.storage_platform_instance.lower()
503499

504500
# Create storage dataset entity
505-
storage_mcps = self.get_storage_dataset_mcp(
501+
yield from self.get_storage_dataset_mcp(
506502
storage_location=storage_location,
507503
platform_instance=platform_instance,
508504
schema_metadata=dataset_schema,
509505
)
510-
if storage_mcps:
511-
for mcp in storage_mcps:
512-
yield MetadataWorkUnit(
513-
id=f"storage-{storage_urn}",
514-
mcp=mcp,
515-
)
516506

517507
# Get storage schema if available (implement based on storage system)
518508
storage_schema = (
@@ -522,31 +512,28 @@ def get_lineage_mcp(
522512
)
523513

524514
# Generate fine-grained lineage if schemas available
525-
fine_grained_lineages = None
526-
if dataset_schema and storage_schema:
527-
fine_grained_lineages = self._get_fine_grained_lineages(
515+
fine_grained_lineages = (
516+
None
517+
if not (dataset_schema and storage_schema)
518+
else self._get_fine_grained_lineages(
528519
dataset_urn, storage_urn, dataset_schema, storage_schema
529520
)
521+
)
530522

531523
# Create lineage MCP
532524
if self.config.hive_storage_lineage_direction == "upstream":
533-
mcp = self._create_lineage_mcp(
525+
yield from self._create_lineage_mcp(
534526
source_urn=storage_urn,
535527
target_urn=dataset_urn,
536528
fine_grained_lineages=fine_grained_lineages,
537529
)
538530
else:
539-
mcp = self._create_lineage_mcp(
531+
yield from self._create_lineage_mcp(
540532
source_urn=dataset_urn,
541533
target_urn=storage_urn,
542534
fine_grained_lineages=fine_grained_lineages,
543535
)
544536

545-
yield MetadataWorkUnit(
546-
id=f"{dataset_urn}-{storage_urn}-lineage",
547-
mcp=mcp,
548-
)
549-
550537
def _get_storage_schema(
551538
self,
552539
storage_location: str,

0 commit comments

Comments
 (0)