Skip to content

Commit fdc6391

Browse files
committed
[wip] add script
1 parent 946854c commit fdc6391

11 files changed

+893
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import datahub.metadata.schema_classes as models
2+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4+
import argparse
5+
import time
6+
7+
8+
def add_model_version_to_run(
9+
model_urn,
10+
run_urn,
11+
token: str = None,
12+
server_url: str = "http://localhost:8080"
13+
) -> None:
14+
15+
# Create model properties
16+
model_properties = models.MLModelPropertiesClass(
17+
trainingJobs=[run_urn]
18+
)
19+
20+
# Generate metadata change proposals
21+
mcps = [
22+
MetadataChangeProposalWrapper(
23+
entityUrn=model_urn,
24+
entityType="mlModel",
25+
aspectName="mlModelProperties",
26+
aspect=model_properties,
27+
changeType=models.ChangeTypeClass.UPSERT,
28+
),
29+
]
30+
31+
# Connect to DataHub and emit the changes
32+
graph = DataHubGraph(DatahubClientConfig(
33+
server=server_url,
34+
token=token,
35+
extra_headers={"Authorization": f"Bearer {token}"},
36+
))
37+
38+
with graph:
39+
for mcp in mcps:
40+
graph.emit(mcp)
41+
42+
43+
if __name__ == "__main__":
44+
parser = argparse.ArgumentParser()
45+
parser.add_argument("--token", required=True, help="DataHub access token")
46+
args = parser.parse_args()
47+
48+
add_model_version_to_run(
49+
model_urn="urn:li:mlModel:(urn:li:dataPlatform:local,my_model)",
50+
run_urn="urn:li:dataProcessInstance:(urn:li:container:(urn:li:dataPlatform:local,my_experiment,PROD),my_run)",
51+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import datahub.metadata.schema_classes as models
2+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4+
import argparse
5+
import time
6+
7+
8+
def add_input_dataset_to_run(
9+
run_urn: str,
10+
dataset_urn: str,
11+
token: str,
12+
server_url: str = "http://localhost:8080"
13+
) -> None:
14+
15+
# Create model propegp rties
16+
model_properties = models.MLModelPropertiesClass(
17+
groups=[model_group_urn]
18+
)
19+
20+
# Generate metadata change proposals
21+
mcps = [
22+
MetadataChangeProposalWrapper(
23+
entityUrn=model_urn,
24+
entityType="mlModel",
25+
aspectName="mlModelProperties",
26+
aspect=model_properties,
27+
changeType=models.ChangeTypeClass.UPSERT,
28+
),
29+
]
30+
31+
# Connect to DataHub and emit the changes
32+
graph = DataHubGraph(DatahubClientConfig(
33+
server=server_url,
34+
token=token,
35+
extra_headers={"Authorization": f"Bearer {token}"},
36+
))
37+
38+
with graph:
39+
for mcp in mcps:
40+
graph.emit(mcp)
41+
42+
43+
if __name__ == "__main__":
44+
parser = argparse.ArgumentParser()
45+
parser.add_argument("--token", required=True, help="DataHub access token")
46+
args = parser.parse_args()
47+
48+
add_input_dataset_to_run(
49+
run_urn="urn:li",
50+
dataset_urn="unr:li"
51+
token=args.token
52+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import datahub.metadata.schema_classes as models
2+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4+
import argparse
5+
import time
6+
7+
8+
def add_model_version_to_model(
9+
model_urn,
10+
model_group_urn,
11+
token: str,
12+
server_url: str = "http://localhost:8080"
13+
) -> None:
14+
15+
# Create model properties
16+
model_properties = models.MLModelPropertiesClass(
17+
groups=[model_group_urn]
18+
)
19+
20+
# Generate metadata change proposals
21+
mcps = [
22+
MetadataChangeProposalWrapper(
23+
entityUrn=model_urn,
24+
entityType="mlModel",
25+
aspectName="mlModelProperties",
26+
aspect=model_properties,
27+
changeType=models.ChangeTypeClass.UPSERT,
28+
),
29+
]
30+
31+
# Connect to DataHub and emit the changes
32+
graph = DataHubGraph(DatahubClientConfig(
33+
server=server_url,
34+
token=token,
35+
extra_headers={"Authorization": f"Bearer {token}"},
36+
))
37+
38+
with graph:
39+
for mcp in mcps:
40+
graph.emit(mcp)
41+
42+
43+
if __name__ == "__main__":
44+
parser = argparse.ArgumentParser()
45+
parser.add_argument("--token", required=True, help="DataHub access token")
46+
args = parser.parse_args()
47+
48+
add_model_version_to_model(
49+
model_urn="urn:li:mlModel:(urn:li:dataPlatform:local,my_model)",
50+
model_group_urn="urn:li:mlModelGroup:(urn:li:dataPlatform:local,my_model_group)",
51+
token=args.token
52+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import datahub.metadata.schema_classes as models
2+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4+
import argparse
5+
import time
6+
7+
8+
def add_run_to_model(
9+
model_urn,
10+
run_urn,
11+
token: str = None,
12+
server_url: str = "http://localhost:8080"
13+
) -> None:
14+
15+
# Create model properties
16+
model_properties = models.MLModelPropertiesClass(
17+
trainingJobs=[run_urn]
18+
)
19+
20+
# Generate metadata change proposals
21+
mcps = [
22+
MetadataChangeProposalWrapper(
23+
entityUrn=model_urn,
24+
entityType="mlModel",
25+
aspectName="mlModelProperties",
26+
aspect=model_properties,
27+
changeType=models.ChangeTypeClass.UPSERT,
28+
),
29+
]
30+
31+
# Connect to DataHub and emit the changes
32+
graph = DataHubGraph(DatahubClientConfig(
33+
server=server_url,
34+
token=token,
35+
extra_headers={"Authorization": f"Bearer {token}"},
36+
))
37+
38+
with graph:
39+
for mcp in mcps:
40+
graph.emit(mcp)
41+
42+
43+
if __name__ == "__main__":
44+
parser = argparse.ArgumentParser()
45+
parser.add_argument("--token", required=True, help="DataHub access token")
46+
args = parser.parse_args()
47+
48+
add_run_to_model(
49+
model_urn="urn:li:mlModel:(urn:li:dataPlatform:local,my_model)",
50+
run_urn="urn:li:dataProcessInstance:(urn:li:container:(urn:li:dataPlatform:local,my_experiment,PROD),my_run)",
51+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import datahub.metadata.schema_classes as models
2+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4+
import argparse
5+
import time
6+
7+
8+
def add_run_to_model_group(
9+
model_group_urn,
10+
run_urn,
11+
token: str = None,
12+
server_url: str = "http://localhost:8080"
13+
) -> None:
14+
15+
# Create model properties
16+
model_group_properties = models.MLModelGroupPropertiesClass(
17+
trainingJobs=[run_urn]
18+
)
19+
20+
# Generate metadata change proposals
21+
mcps = [
22+
MetadataChangeProposalWrapper(
23+
entityUrn=model_group_urn,
24+
entityType="mlModelGroup",
25+
aspectName="mlModelGroupProperties",
26+
aspect=model_group_properties,
27+
changeType=models.ChangeTypeClass.UPSERT,
28+
),
29+
]
30+
31+
# Connect to DataHub and emit the changes
32+
graph = DataHubGraph(DatahubClientConfig(
33+
server=server_url,
34+
token=token,
35+
extra_headers={"Authorization": f"Bearer {token}"},
36+
))
37+
38+
with graph:
39+
for mcp in mcps:
40+
graph.emit(mcp)
41+
42+
43+
if __name__ == "__main__":
44+
parser = argparse.ArgumentParser()
45+
parser.add_argument("--token", required=True, help="DataHub access token")
46+
args = parser.parse_args()
47+
48+
add_run_to_model_group(
49+
model_group_urn="urn:li:mlModelGroup:my_test_model",
50+
run_urn="urn:li:dataProcessInstance:(urn:li:container:(urn:li:dataPlatform:local,my_experiment,PROD),my_run)",
51+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import datahub.metadata.schema_classes as models
2+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4+
import argparse
5+
6+
def create_experiment(
7+
experiment_id: str,
8+
name: str,
9+
description: str,
10+
platform: str,
11+
custom_properties: dict,
12+
token: str = None,
13+
server_url: str = "http://localhost:8080"
14+
) -> None:
15+
# Create basic experiment properties
16+
platform_urn = f"urn:li:dataPlatform:{platform}"
17+
container_urn = f"urn:li:container:({platform_urn},{experiment_id},PROD)"
18+
container_subtype = models.SubTypesClass(typeNames=["ML Experiment"])
19+
container_info = models.ContainerPropertiesClass(
20+
name=name,
21+
description=description,
22+
customProperties=custom_properties,
23+
)
24+
browse_path = models.BrowsePathsV2Class(path=[])
25+
platform_instance = models.DataPlatformInstanceClass(
26+
platform=platform_urn,
27+
instance="PROD",
28+
)
29+
30+
# Generate metadata change proposal
31+
mcps = MetadataChangeProposalWrapper.construct_many(
32+
entityUrn=container_urn,
33+
aspects=[container_subtype, container_info, browse_path, platform_instance],
34+
)
35+
36+
# Connect to DataHub and emit the changes
37+
graph = DataHubGraph(DatahubClientConfig(
38+
server=server_url,
39+
token=token,
40+
extra_headers={"Authorization": f"Bearer {token}"},
41+
))
42+
43+
with graph:
44+
for mcp in mcps:
45+
graph.emit(mcp)
46+
47+
48+
if __name__ == "__main__":
49+
# Example usage
50+
parser = argparse.ArgumentParser()
51+
parser.add_argument("--token", required=True, help="DataHub access token")
52+
args = parser.parse_args()
53+
54+
create_experiment(
55+
experiment_id="airline_forecast_experiment",
56+
name="Airline Forecast Experiment",
57+
description="Experiment for forecasting airline passengers",
58+
platform="mlflow",
59+
custom_properties={"experiment_type": "forecasting"},
60+
token=args.token
61+
)

0 commit comments

Comments
 (0)