Skip to content

Commit 0903423

Browse files
committed
fix(py-sdk): DataJobPatchBuilder handling timestamps, output edges
1 parent 8a1c180 commit 0903423

File tree

2 files changed

+98
-10
lines changed

2 files changed

+98
-10
lines changed

metadata-ingestion/src/datahub/specific/datajob.py

+4-10
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
102102
103103
Notes:
104104
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
105-
it is converted to an Edge object and added with default audit stamps.
105+
it is converted to an Edge object and added without any audit stamps.
106106
"""
107107
if isinstance(input, Edge):
108108
input_urn: str = input.destinationUrn
@@ -114,8 +114,6 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
114114

115115
input_edge = Edge(
116116
destinationUrn=input_urn,
117-
created=self._mint_auditstamp(),
118-
lastModified=self._mint_auditstamp(),
119117
)
120118

121119
self._ensure_urn_type("dataJob", [input_edge], "add_input_datajob")
@@ -185,7 +183,7 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
185183
186184
Notes:
187185
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
188-
it is converted to an Edge object and added with default audit stamps.
186+
it is converted to an Edge object and added without any audit stamps.
189187
"""
190188
if isinstance(input, Edge):
191189
input_urn: str = input.destinationUrn
@@ -197,8 +195,6 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
197195

198196
input_edge = Edge(
199197
destinationUrn=input_urn,
200-
created=self._mint_auditstamp(),
201-
lastModified=self._mint_auditstamp(),
202198
)
203199

204200
self._ensure_urn_type("dataset", [input_edge], "add_input_dataset")
@@ -270,7 +266,7 @@ def add_output_dataset(
270266
271267
Notes:
272268
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
273-
it is converted to an Edge object and added with default audit stamps.
269+
it is converted to an Edge object and added without any audit stamps.
274270
"""
275271
if isinstance(output, Edge):
276272
output_urn: str = output.destinationUrn
@@ -282,15 +278,13 @@ def add_output_dataset(
282278

283279
output_edge = Edge(
284280
destinationUrn=output_urn,
285-
created=self._mint_auditstamp(),
286-
lastModified=self._mint_auditstamp(),
287281
)
288282

289283
self._ensure_urn_type("dataset", [output_edge], "add_output_dataset")
290284
self._add_patch(
291285
DataJobInputOutput.ASPECT_NAME,
292286
"add",
293-
path=f"/outputDatasetEdges/{self.quote(str(output))}",
287+
path=f"/outputDatasetEdges/{self.quote(output_urn)}",
294288
value=output_edge,
295289
)
296290
return self

smoke-test/tests/patch/test_datajob_patches.py

+94
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import time
12
import uuid
23

4+
import datahub.metadata.schema_classes as models
35
from datahub.emitter.mce_builder import make_data_job_urn, make_dataset_urn
46
from datahub.emitter.mcp import MetadataChangeProposalWrapper
57
from datahub.metadata.schema_classes import (
@@ -136,3 +138,95 @@ def test_datajob_inputoutput_dataset_patch(graph_client):
136138
inputoutput_lineage_read.inputDatasetEdges[0].destinationUrn
137139
== other_dataset_urn
138140
)
141+
142+
143+
def test_datajob_multiple_inputoutput_dataset_patch(graph_client):
144+
"""Test creating a data job with multiple input and output datasets and verifying the aspects."""
145+
# Create the data job
146+
datajob_urn = "urn:li:dataJob:(urn:li:dataFlow:(airflow,training,default),training)"
147+
148+
# Create input and output dataset URNs
149+
input_datasets = ["input_data_1", "input_data_2"]
150+
output_datasets = ["output_data_1", "output_data_2"]
151+
152+
input_dataset_urns = [
153+
make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
154+
for dataset in input_datasets
155+
]
156+
output_dataset_urns = [
157+
make_dataset_urn(platform="s3", name=f"test_patch_{dataset}", env="PROD")
158+
for dataset in output_datasets
159+
]
160+
161+
# Create edges for datasets
162+
def make_edge(urn, generate_auditstamp=False):
163+
audit_stamp = models.AuditStampClass(
164+
time=int(time.time() * 1000.0),
165+
actor="urn:li:corpuser:datahub",
166+
)
167+
return EdgeClass(
168+
destinationUrn=str(urn),
169+
lastModified=audit_stamp if generate_auditstamp else None,
170+
)
171+
172+
# Initialize empty input/output lineage
173+
initial_lineage = DataJobInputOutputClass(
174+
inputDatasets=[], outputDatasets=[], inputDatasetEdges=[], outputDatasetEdges=[]
175+
)
176+
177+
# Emit initial lineage
178+
mcpw = MetadataChangeProposalWrapper(entityUrn=datajob_urn, aspect=initial_lineage)
179+
graph_client.emit_mcp(mcpw)
180+
181+
# Create patches for input and output datasets
182+
patch_builder = DataJobPatchBuilder(datajob_urn)
183+
for input_urn in input_dataset_urns:
184+
patch_builder.add_input_dataset(make_edge(input_urn))
185+
for output_urn in output_dataset_urns:
186+
patch_builder.add_output_dataset(make_edge(output_urn))
187+
188+
# Apply patches
189+
for patch_mcp in patch_builder.build():
190+
graph_client.emit_mcp(patch_mcp)
191+
192+
# Verify the lineage was correctly applied
193+
lineage_aspect = graph_client.get_aspect(
194+
entity_urn=datajob_urn,
195+
aspect_type=DataJobInputOutputClass,
196+
)
197+
198+
# Assert lineage was created
199+
assert lineage_aspect is not None
200+
assert lineage_aspect.inputDatasetEdges is not None
201+
assert lineage_aspect.outputDatasetEdges is not None
202+
203+
# Verify input datasets
204+
assert len(lineage_aspect.inputDatasetEdges) == len(input_datasets)
205+
input_urns = {edge.destinationUrn for edge in lineage_aspect.inputDatasetEdges}
206+
expected_input_urns = {str(urn) for urn in input_dataset_urns}
207+
assert input_urns == expected_input_urns
208+
209+
# Verify output datasets
210+
assert len(lineage_aspect.outputDatasetEdges) == len(output_datasets)
211+
output_urns = {edge.destinationUrn for edge in lineage_aspect.outputDatasetEdges}
212+
expected_output_urns = {str(urn) for urn in output_dataset_urns}
213+
assert output_urns == expected_output_urns
214+
215+
# Test updating the same datasets again (idempotency)
216+
patch_builder = DataJobPatchBuilder(datajob_urn)
217+
for input_urn in input_dataset_urns:
218+
patch_builder.add_input_dataset(make_edge(input_urn))
219+
for output_urn in output_dataset_urns:
220+
patch_builder.add_output_dataset(make_edge(output_urn))
221+
222+
for patch_mcp in patch_builder.build():
223+
graph_client.emit_mcp(patch_mcp)
224+
225+
# Verify the aspect hasn't changed
226+
updated_lineage_aspect = graph_client.get_aspect(
227+
entity_urn=datajob_urn,
228+
aspect_type=DataJobInputOutputClass,
229+
)
230+
231+
assert updated_lineage_aspect is not None
232+
assert updated_lineage_aspect.to_obj() == lineage_aspect.to_obj()

0 commit comments

Comments
 (0)