Skip to content

Commit 9fb2df1

Browse files
authored
fix(ingest): sort by last modified not working in the UI (#11343)
1 parent 090c514 commit 9fb2df1

File tree

43 files changed

+2754
-8130
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+2754
-8130
lines changed

metadata-ingestion/src/datahub/ingestion/api/auto_work_units/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
import dataclasses
2+
import json
3+
from typing import Dict, Iterable, Optional
4+
5+
from datahub.ingestion.api.workunit import MetadataWorkUnit
6+
from datahub.metadata.schema_classes import (
7+
ChangeTypeClass,
8+
DatasetPropertiesClass,
9+
GenericAspectClass,
10+
MetadataChangeProposalClass,
11+
OperationClass,
12+
TimeStampClass,
13+
)
14+
from datahub.specific.dataset import DatasetPatchBuilder
15+
from datahub.utilities.urns.urn import guess_entity_type
16+
17+
18+
@dataclasses.dataclass
19+
class TimestampPair:
20+
last_modified_dataset_props: Optional[
21+
TimeStampClass
22+
] # last_modified of datasetProperties aspect
23+
last_updated_timestamp_dataset_props: Optional[
24+
int
25+
] # lastUpdatedTimestamp of the operation aspect
26+
27+
28+
def try_aspect_from_metadata_change_proposal_class(
29+
wu: MetadataWorkUnit,
30+
) -> Optional[DatasetPropertiesClass]:
31+
if (
32+
isinstance(wu.metadata, MetadataChangeProposalClass)
33+
and wu.metadata.aspectName == "datasetProperties"
34+
and wu.metadata.changeType == ChangeTypeClass.PATCH
35+
and isinstance(wu.metadata.aspect, GenericAspectClass)
36+
):
37+
patch_dataset_properties = json.loads(wu.metadata.aspect.value)
38+
for operation in patch_dataset_properties:
39+
if operation.get("path") == "/lastModified":
40+
# Deserializing `lastModified` as the `auto_patch_last_modified` function relies on this property
41+
# to decide if a patch aspect for the datasetProperties aspect should be generated
42+
return DatasetPropertiesClass(
43+
lastModified=TimeStampClass(time=operation["value"]["time"])
44+
)
45+
46+
return None
47+
48+
49+
def auto_patch_last_modified(
50+
stream: Iterable[MetadataWorkUnit],
51+
) -> Iterable[MetadataWorkUnit]:
52+
"""
53+
Generate a patch request for datasetProperties aspect in-case
54+
1. `lastModified` of datasetProperties is not set
55+
2. And there are operation aspects
56+
in this case set the `lastModified` of datasetProperties to max value of operation aspects `lastUpdatedTimestamp`.
57+
58+
We need this functionality to support sort by `last modified` on UI.
59+
"""
60+
candidate_dataset_for_patch: Dict[str, TimestampPair] = {}
61+
62+
for wu in stream:
63+
if (
64+
guess_entity_type(wu.get_urn()) != "dataset"
65+
): # we are only processing datasets
66+
yield wu
67+
continue
68+
69+
dataset_properties_aspect = wu.get_aspect_of_type(
70+
DatasetPropertiesClass
71+
) or try_aspect_from_metadata_change_proposal_class(wu)
72+
dataset_operation_aspect = wu.get_aspect_of_type(OperationClass)
73+
74+
timestamp_pair = candidate_dataset_for_patch.get(wu.get_urn())
75+
76+
if timestamp_pair:
77+
# Update the timestamp_pair
78+
if dataset_properties_aspect and dataset_properties_aspect.lastModified:
79+
timestamp_pair.last_modified_dataset_props = (
80+
dataset_properties_aspect.lastModified
81+
)
82+
83+
if (
84+
dataset_operation_aspect
85+
and dataset_operation_aspect.lastUpdatedTimestamp
86+
):
87+
timestamp_pair.last_updated_timestamp_dataset_props = max(
88+
timestamp_pair.last_updated_timestamp_dataset_props or 0,
89+
dataset_operation_aspect.lastUpdatedTimestamp,
90+
)
91+
92+
else:
93+
# Create new TimestampPair
94+
last_modified_dataset_props: Optional[TimeStampClass] = None
95+
last_updated_timestamp_dataset_props: Optional[int] = None
96+
97+
if dataset_properties_aspect:
98+
last_modified_dataset_props = dataset_properties_aspect.lastModified
99+
100+
if dataset_operation_aspect:
101+
last_updated_timestamp_dataset_props = (
102+
dataset_operation_aspect.lastUpdatedTimestamp
103+
)
104+
105+
candidate_dataset_for_patch[wu.get_urn()] = TimestampPair(
106+
last_modified_dataset_props=last_modified_dataset_props,
107+
last_updated_timestamp_dataset_props=last_updated_timestamp_dataset_props,
108+
)
109+
110+
yield wu
111+
112+
# Emit a patch datasetProperties aspect for dataset where last_modified is None
113+
for entity_urn, timestamp_pair in candidate_dataset_for_patch.items():
114+
# Emit patch if last_modified is not set and last_updated_timestamp is set
115+
if (
116+
timestamp_pair.last_modified_dataset_props is None
117+
and timestamp_pair.last_updated_timestamp_dataset_props
118+
):
119+
dataset_patch_builder = DatasetPatchBuilder(urn=entity_urn)
120+
121+
dataset_patch_builder.set_last_modified(
122+
timestamp=TimeStampClass(
123+
time=timestamp_pair.last_updated_timestamp_dataset_props
124+
)
125+
)
126+
127+
yield from [
128+
MetadataWorkUnit(
129+
id=MetadataWorkUnit.generate_workunit_id(mcp),
130+
mcp_raw=mcp,
131+
)
132+
for mcp in dataset_patch_builder.build()
133+
]

metadata-ingestion/src/datahub/ingestion/api/source.py

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
from datahub.configuration.common import ConfigModel
2929
from datahub.configuration.source_common import PlatformInstanceConfigMixin
3030
from datahub.emitter.mcp_builder import mcps_from_mce
31+
from datahub.ingestion.api.auto_work_units.auto_dataset_properties_aspect import (
32+
auto_patch_last_modified,
33+
)
3134
from datahub.ingestion.api.closeable import Closeable
3235
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
3336
from datahub.ingestion.api.report import Report
@@ -443,6 +446,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
443446
),
444447
browse_path_processor,
445448
partial(auto_workunit_reporter, self.get_report()),
449+
auto_patch_last_modified,
446450
]
447451

448452
@staticmethod

metadata-ingestion/tests/integration/bigquery_v2/bigquery_lineage_usage_golden.json

+22
Original file line numberDiff line numberDiff line change
@@ -498,5 +498,27 @@
498498
"runId": "bigquery-2022_02_03-07_00_00",
499499
"lastRunId": "no-run-id-provided"
500500
}
501+
},
502+
{
503+
"entityType": "dataset",
504+
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,project-id-1.bigquery-dataset-1.view-1,PROD)",
505+
"changeType": "PATCH",
506+
"aspectName": "datasetProperties",
507+
"aspect": {
508+
"json": [
509+
{
510+
"op": "add",
511+
"path": "/lastModified",
512+
"value": {
513+
"time": 1643871600000
514+
}
515+
}
516+
]
517+
},
518+
"systemMetadata": {
519+
"lastObserved": 1643871600000,
520+
"runId": "bigquery-2022_02_03-07_00_00",
521+
"lastRunId": "no-run-id-provided"
522+
}
501523
}
502524
]

0 commit comments

Comments
 (0)