Skip to content

Commit c571d73

Browse files
committed
[wip] update scripts
1 parent fdc6391 commit c571d73

15 files changed

+1066
-508
lines changed

metadata-ingestion/examples/ml/add_dataset_to_run.py

-51
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,109 @@
1-
import datahub.metadata.schema_classes as models
1+
import argparse
2+
3+
from datahub.api.entities.dataset.dataset import Dataset
24
from datahub.emitter.mcp import MetadataChangeProposalWrapper
35
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4-
import argparse
5-
import time
6+
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
7+
DataProcessInstanceInput,
8+
)
9+
from datahub.metadata.schema_classes import ChangeTypeClass
10+
from typing import Optional
11+
12+
def create_dataset(
13+
platform: str,
14+
name: str,
15+
token: str,
16+
description: Optional[str] = "",
17+
server_url: str = "http://localhost:8080",
18+
) -> Optional[str]:
19+
"""
20+
Create a dataset in DataHub and return its URN
21+
"""
22+
dataset = Dataset(id=name, platform=platform, name=name, description=description)
23+
24+
graph = DataHubGraph(
25+
DatahubClientConfig(
26+
server=server_url,
27+
token=token,
28+
extra_headers={"Authorization": f"Bearer {token}"},
29+
)
30+
)
31+
32+
with graph:
33+
for mcp in dataset.generate_mcp():
34+
graph.emit(mcp)
635

36+
return dataset.urn
737

8-
def add_input_dataset_to_run(
9-
run_urn: str,
10-
dataset_urn: str,
11-
token: str,
12-
server_url: str = "http://localhost:8080"
38+
39+
def add_input_datasets_to_run(
40+
run_urn: str,
41+
dataset_urns: list,
42+
token: str,
43+
server_url: str = "http://localhost:8080",
1344
) -> None:
45+
"""
46+
Add input datasets to a data process instance run
47+
"""
48+
# Create the input aspect
49+
inputs_aspect = DataProcessInstanceInput(inputs=dataset_urns)
1450

15-
# Create model propegp rties
16-
model_properties = models.MLModelPropertiesClass(
17-
groups=[model_group_urn]
51+
# Generate metadata change proposal
52+
mcp = MetadataChangeProposalWrapper(
53+
entityUrn=run_urn,
54+
entityType="dataProcessInstance",
55+
aspectName="dataProcessInstanceInput",
56+
aspect=inputs_aspect,
57+
changeType=ChangeTypeClass.UPSERT,
1858
)
1959

20-
# Generate metadata change proposals
21-
mcps = [
22-
MetadataChangeProposalWrapper(
23-
entityUrn=model_urn,
24-
entityType="mlModel",
25-
aspectName="mlModelProperties",
26-
aspect=model_properties,
27-
changeType=models.ChangeTypeClass.UPSERT,
28-
),
29-
]
30-
31-
# Connect to DataHub and emit the changes
32-
graph = DataHubGraph(DatahubClientConfig(
33-
server=server_url,
34-
token=token,
35-
extra_headers={"Authorization": f"Bearer {token}"},
36-
))
60+
# Connect to DataHub and emit the change
61+
graph = DataHubGraph(
62+
DatahubClientConfig(
63+
server=server_url,
64+
token=token,
65+
extra_headers={"Authorization": f"Bearer {token}"},
66+
)
67+
)
3768

3869
with graph:
39-
for mcp in mcps:
40-
graph.emit(mcp)
70+
graph.emit(mcp)
4171

4272

4373
if __name__ == "__main__":
4474
parser = argparse.ArgumentParser()
4575
parser.add_argument("--token", required=True, help="DataHub access token")
4676
args = parser.parse_args()
4777

48-
add_input_dataset_to_run(
49-
run_urn="urn:li",
50-
dataset_urn="unr:li"
51-
token=args.token
52-
)
78+
# Example: Create two datasets
79+
datasets = [
80+
{
81+
"platform": "hdfs",
82+
"name": "training_data",
83+
"description": "Training dataset for model",
84+
},
85+
{
86+
"platform": "s3",
87+
"name": "validation_data",
88+
"description": "Validation dataset for model",
89+
},
90+
]
91+
92+
# Create datasets and collect their URNs
93+
# if the dataset already exists, comment out the create_dataset function
94+
dataset_urns = []
95+
for dataset_info in datasets:
96+
dataset_urn = create_dataset(
97+
platform=dataset_info["platform"],
98+
name=dataset_info["name"],
99+
description=dataset_info["description"],
100+
token=args.token,
101+
)
102+
dataset_urns.append(dataset_urn)
103+
104+
# Link datasets to the run
105+
add_input_datasets_to_run(
106+
run_urn="urn:li:dataProcessInstance:c29762bd7cc66e35414d95350454e542",
107+
dataset_urns=dataset_urns,
108+
token=args.token,
109+
)
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
1+
import argparse
2+
13
import datahub.metadata.schema_classes as models
24
from datahub.emitter.mcp import MetadataChangeProposalWrapper
35
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4-
import argparse
5-
import time
6-
6+
from typing import Optional
77

88
def add_model_version_to_model(
9-
model_urn,
10-
model_group_urn,
11-
token: str,
12-
server_url: str = "http://localhost:8080"
9+
model_urn: str, model_group_urn: str, token: Optional[str], server_url: str = "http://localhost:8080"
1310
) -> None:
14-
1511
# Create model properties
16-
model_properties = models.MLModelPropertiesClass(
17-
groups=[model_group_urn]
18-
)
12+
model_properties = models.MLModelPropertiesClass(groups=[model_group_urn])
1913

2014
# Generate metadata change proposals
2115
mcps = [
@@ -24,16 +18,18 @@ def add_model_version_to_model(
2418
entityType="mlModel",
2519
aspectName="mlModelProperties",
2620
aspect=model_properties,
27-
changeType=models.ChangeTypeClass.UPSERT,
21+
changeType=models.ChangeTypeClass.UPSERT, # TODO: this overwrites the existing model properties.
2822
),
2923
]
3024

3125
# Connect to DataHub and emit the changes
32-
graph = DataHubGraph(DatahubClientConfig(
33-
server=server_url,
34-
token=token,
35-
extra_headers={"Authorization": f"Bearer {token}"},
36-
))
26+
graph = DataHubGraph(
27+
DatahubClientConfig(
28+
server=server_url,
29+
token=token,
30+
extra_headers={"Authorization": f"Bearer {token}"},
31+
)
32+
)
3733

3834
with graph:
3935
for mcp in mcps:
@@ -46,7 +42,7 @@ def add_model_version_to_model(
4642
args = parser.parse_args()
4743

4844
add_model_version_to_model(
49-
model_urn="urn:li:mlModel:(urn:li:dataPlatform:local,my_model)",
50-
model_group_urn="urn:li:mlModelGroup:(urn:li:dataPlatform:local,my_model_group)",
51-
token=args.token
52-
)
45+
model_urn="urn:li:mlModel:(urn:li:dataPlatform:mlflow,arima_model_2,PROD)",
46+
model_group_urn="urn:li:mlModelGroup:(urn:li:dataPlatform:mlflow,airline_forecast_models,PROD)",
47+
token=args.token,
48+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import argparse
2+
from typing import Optional, List
3+
4+
from datahub.api.entities.dataset.dataset import Dataset
5+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
6+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
7+
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
8+
DataProcessInstanceOutput,
9+
)
10+
from datahub.metadata.schema_classes import ChangeTypeClass
11+
12+
13+
def create_dataset(
14+
platform: str,
15+
name: str,
16+
token: str,
17+
description: str = "",
18+
server_url: str = "http://localhost:8080",
19+
) -> str:
20+
"""Create a dataset in DataHub and return its URN.
21+
22+
Args:
23+
platform: Platform identifier
24+
name: Dataset name
25+
token: DataHub access token
26+
description: Dataset description
27+
server_url: DataHub server URL
28+
29+
Returns:
30+
str: Dataset URN
31+
"""
32+
dataset = Dataset(id=name, platform=platform, name=name, description=description)
33+
34+
graph = DataHubGraph(
35+
DatahubClientConfig(
36+
server=server_url,
37+
token=token,
38+
extra_headers={"Authorization": f"Bearer {token}"},
39+
)
40+
)
41+
42+
with graph:
43+
for mcp in dataset.generate_mcp():
44+
graph.emit(mcp)
45+
46+
if dataset.urn is None:
47+
raise ValueError(f"Failed to create dataset URN for {name}")
48+
49+
return dataset.urn
50+
51+
52+
def add_output_datasets_to_run(
53+
run_urn: str,
54+
dataset_urns: List[str],
55+
token: str,
56+
server_url: str = "http://localhost:8080",
57+
) -> None:
58+
"""Add output datasets to a data process instance run.
59+
60+
Args:
61+
run_urn: Run URN
62+
dataset_urns: List of dataset URNs
63+
token: DataHub access token
64+
server_url: DataHub server URL
65+
"""
66+
# Create the output aspect
67+
outputs_aspect = DataProcessInstanceOutput(outputs=dataset_urns)
68+
69+
# Generate metadata change proposal
70+
mcp = MetadataChangeProposalWrapper(
71+
entityUrn=run_urn,
72+
entityType="dataProcessInstance",
73+
aspectName="dataProcessInstanceOutput",
74+
aspect=outputs_aspect,
75+
changeType=ChangeTypeClass.UPSERT,
76+
)
77+
78+
# Connect to DataHub and emit the change
79+
graph = DataHubGraph(
80+
DatahubClientConfig(
81+
server=server_url,
82+
token=token,
83+
extra_headers={"Authorization": f"Bearer {token}"},
84+
)
85+
)
86+
87+
with graph:
88+
graph.emit(mcp)
89+
90+
91+
if __name__ == "__main__":
92+
parser = argparse.ArgumentParser()
93+
parser.add_argument("--token", required=True, help="DataHub access token")
94+
args = parser.parse_args()
95+
96+
# Example: Create datasets
97+
datasets = [
98+
{
99+
"platform": "s3",
100+
"name": "output_data",
101+
"description": "output_dataset for model",
102+
}
103+
]
104+
105+
# Create datasets and collect their URNs
106+
dataset_urns: List[str] = []
107+
for dataset_info in datasets:
108+
try:
109+
dataset_urn = create_dataset(
110+
platform=dataset_info["platform"],
111+
name=dataset_info["name"],
112+
description=dataset_info["description"],
113+
token=args.token,
114+
)
115+
dataset_urns.append(dataset_urn)
116+
except ValueError as e:
117+
print(f"Failed to create dataset: {e}")
118+
continue
119+
120+
if dataset_urns: # Only proceed if we have valid URNs
121+
add_output_datasets_to_run(
122+
run_urn="urn:li:dataProcessInstance:c29762bd7cc66e35414d95350454e542",
123+
dataset_urns=dataset_urns,
124+
token=args.token,
125+
)

0 commit comments

Comments
 (0)