Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion) Adding vertexAI ingestion source (v1 - model group and model) #12632

Merged
merged 75 commits into from
Mar 13, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
45ce05e
feat(ingestion) Adding vertexAI ingestion source
ryota-cloud Feb 13, 2025
9a1355d
lintfix
ryota-cloud Feb 13, 2025
04315d4
minor comment change
ryota-cloud Feb 13, 2025
e3a17b5
minor
ryota-cloud Feb 13, 2025
2a5ea58
minor change in unit test
ryota-cloud Feb 13, 2025
3739c20
Adding sources and documents
ryota-cloud Feb 18, 2025
520eda6
delete unnecessary file
ryota-cloud Feb 18, 2025
c320a6c
fetch list of training jobs
ryota-cloud Feb 22, 2025
bc9e451
adding comments
ryota-cloud Feb 23, 2025
960129b
feat(ingest): add vertex AI sample data ingestion
ryota-cloud Feb 12, 2025
95712f5
Update vertexai.py
ryota-cloud Feb 24, 2025
78d184b
added endopint workunit creation and refactored
ryota-cloud Feb 24, 2025
d746a4c
commit temporarily
ryota-cloud Feb 24, 2025
5fbe0e5
lintfix
ryota-cloud Feb 24, 2025
9f8e8a3
removing unnecesary commits
ryota-cloud Feb 24, 2025
85d1830
cleanup recipe
ryota-cloud Feb 24, 2025
aae6893
minor change in config
ryota-cloud Feb 24, 2025
764f8fd
fixing dataset
ryota-cloud Feb 24, 2025
29ddcff
adding comments for dataset
ryota-cloud Feb 24, 2025
437e7d2
minor fix
ryota-cloud Feb 24, 2025
a2a1f0a
adding vertex to dev requirements in setup.py
ryota-cloud Feb 24, 2025
bf869da
minor fix
ryota-cloud Feb 24, 2025
c1f24b7
caching dataset list acquisitions
ryota-cloud Feb 25, 2025
453688d
review comment on dataset
ryota-cloud Feb 25, 2025
be03cf5
minor chagne
ryota-cloud Feb 25, 2025
8c76435
change name
ryota-cloud Feb 25, 2025
33a19c9
lint fix
ryota-cloud Feb 25, 2025
b76ec25
Refactor code to use auto_workunit
ryota-cloud Feb 25, 2025
c7d5165
flattern make_vertexai_name
ryota-cloud Feb 25, 2025
482c159
lint type error is fixed
ryota-cloud Feb 25, 2025
1032630
adding credentail config
ryota-cloud Feb 26, 2025
616b76a
refactor and changed GCP credential to pass project_id
ryota-cloud Feb 26, 2025
1dcfce1
Adding more unit test case coverage, fixed lint and test case
ryota-cloud Feb 26, 2025
f16c8f5
fix platform name
ryota-cloud Feb 26, 2025
1de43a0
fixed _get_data_process_input_workunit test case
ryota-cloud Feb 26, 2025
ea577cb
Adding subtype and container to dataset and training job
ryota-cloud Feb 27, 2025
46ff526
fix UI issue on timestamp and refactor
ryota-cloud Feb 27, 2025
9b6c01e
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Feb 27, 2025
7b0fb70
removed token
ryota-cloud Feb 27, 2025
cf9c242
Adding integration test for VertexAI
ryota-cloud Feb 28, 2025
398c380
Adding unit test cases
ryota-cloud Feb 28, 2025
4703cd9
increasing unit test coverage
ryota-cloud Feb 28, 2025
63e8e8e
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Feb 28, 2025
ba26abb
adding more unit tests
ryota-cloud Feb 28, 2025
3a85d8a
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Feb 28, 2025
84ebae0
fixed review comments
ryota-cloud Mar 3, 2025
0b6b7db
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Mar 3, 2025
5472929
fixed review comments, adding unit test cases
ryota-cloud Mar 3, 2025
0eeeb72
minor change
ryota-cloud Mar 3, 2025
6c43ecc
Change BigQueryCredentail to common function: GCPCredential
ryota-cloud Mar 3, 2025
d381b9e
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Mar 3, 2025
1f64a95
fixed one unit test case failure, and naming chagne
ryota-cloud Mar 3, 2025
b559286
Added Enum and refactoring
ryota-cloud Mar 3, 2025
4edd575
add comment
ryota-cloud Mar 3, 2025
5765025
fixed review comments
ryota-cloud Mar 4, 2025
4b09365
delete test case using real model
ryota-cloud Mar 4, 2025
eb261c3
delete commented out code
ryota-cloud Mar 4, 2025
e6feb8a
consolidate use of auto_workunit and change func output to mcps
ryota-cloud Mar 4, 2025
a8d7980
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Mar 4, 2025
b31d0f6
fix comment
ryota-cloud Mar 4, 2025
99269aa
Add POJO for model and change logic of model extraction and mcps crea…
ryota-cloud Mar 5, 2025
a517173
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Mar 5, 2025
f900f6d
use datetime_to_ts_millis helper
ryota-cloud Mar 5, 2025
5c46c59
refactored unit test case for better assertion
ryota-cloud Mar 5, 2025
1772b7e
Modified integration test to cover relationship between job to datase…
ryota-cloud Mar 5, 2025
8e40b7c
fix import error in test case
ryota-cloud Mar 5, 2025
2a91e6d
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Mar 5, 2025
706a123
Fixed review comments, refactored unit and integration test case, com…
ryota-cloud Mar 9, 2025
55e9cfc
Merge remote-tracking branch 'oss-datahub/master' into vertex_src_temp
ryota-cloud Mar 9, 2025
35f0081
renamed mock data file
ryota-cloud Mar 9, 2025
a4d9c5b
changed function name to _make_training_job_urn
ryota-cloud Mar 9, 2025
85561af
Fixed CI/CD error
ryota-cloud Mar 10, 2025
dedab91
pushed CI/CD fix
ryota-cloud Mar 10, 2025
ba6585b
Fixed review comment
ryota-cloud Mar 13, 2025
8e54b33
refactor mcp.construct_many for aspects
ryota-cloud Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 16 additions & 16 deletions metadata-ingestion/docs/sources/vertexai/vertexai_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ Please read the section to understand how to set up application default Credenti

1. Setup a ServiceAccount as per [GCP docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and assign the previously created role to this service account.
2. Download a service account JSON keyfile.
- Example credential file:

```json
{
"type": "service_account",
"project_id": "project-id-1234567",
"private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----",
"client_email": "[email protected]",
"client_id": "113545814931671546333",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com"
}
```
- Example credential file:

```json
{
"type": "service_account",
"project_id": "project-id-1234567",
"private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----",
"client_email": "[email protected]",
"client_id": "113545814931671546333",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com"
}
```

3. To provide credentials to the source, you can either:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ source:
config:
project_id: "acryl-poc"
region: "us-west2"
# Note that GOOGLE_APPLICATION_CREDENTIALS or credential section below is required for authentication.
# You must either set GOOGLE_APPLICATION_CREDENTIALS or provide credential as shown below
# credential:
# private_key: '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'
# private_key_id: "project_key_id"
Expand Down
183 changes: 94 additions & 89 deletions metadata-ingestion/src/datahub/ingestion/source/vertexai.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import dataclasses
import logging
from collections import defaultdict
from datetime import datetime
from typing import Any, Iterable, List, Optional, TypeVar
from typing import Any, Dict, Iterable, List, Optional, TypeVar

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import aiplatform
Expand Down Expand Up @@ -77,17 +76,14 @@ class VertexAIConfig(EnvConfigMixin):
default="https://console.cloud.google.com/vertex-ai",
description=("VertexUI URI"),
)

_credentials_path: Optional[str] = PrivateAttr(None)

def __init__(self, **data: Any):
super().__init__(**data)

if self.credential:
self._credentials_path = self.credential.create_credential_temp_file(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we actually need to create a credentials file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, not needed while GCPcredential need it, deleted

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a bit confused - it looks like we're still writing the credentials to a file

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a blocker - but we should not be writing credentials to disk if we can avoid it

Copy link
Collaborator Author

@ryota-cloud ryota-cloud Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file is actually used to create credentials object using service_account.Credentials util, which feed into aitplatform.init()

        credentials = (
            service_account.Credentials.from_service_account_file(
                self.config._credentials_path
            )
            if self.config.credential
            else None
        )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right - but we're getting the config as json or as a true file path in the original GCPCredential

then we take that credential and write it to a new file, storing the file path in self.config._credentials_path. and then we load self.config._credentials_path again.

that flow is pretty strange

Copy link
Collaborator Author

@ryota-cloud ryota-cloud Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not so unusual to see this pattern (cred part only is written to temp file and loaded), but I understand your point of avoiding yet another file write, how about changing it to something like below,

     credentials = (
            service_account.Credentials.from_service_account_info(
                self.config.get_credentials(). --> passing dict
            )

self.project_id
)
logger.debug(
f"Creating temporary credential file at {self._credentials_path}"
project_id=self.project_id
)


Expand Down Expand Up @@ -144,8 +140,8 @@ def __init__(self, ctx: PipelineContext, config: VertexAIConfig):
project=config.project_id, location=config.region, credentials=credentials
)
self.client = aiplatform
self.endpoints: Optional[dict] = None
self.datasets: Optional[dict] = None
self.endpoints: Optional[Dict[str, List[Endpoint]]] = None
self.datasets: Optional[Dict[str, VertexAiResourceNoun]] = None

def get_report(self) -> SourceReport:
return self.report
Expand Down Expand Up @@ -197,7 +193,7 @@ def _get_ml_model_mcps(
# Create ML Model Entity
yield from self._gen_ml_model_mcps(model_meta)
# Create Endpoint Entity
yield from self._gen_endpoint_mcps(model_meta)
yield from self._gen_endpoints_mcps(model_meta)

def _get_ml_model_metadata(
self, model: Model, model_version: VersionInfo
Expand Down Expand Up @@ -333,14 +329,16 @@ def _gen_ml_group_mcps(
MLModelGroupPropertiesClass(
name=self._make_vertexai_model_group_name(model.name),
description=model.description,
created=TimeStampClass(time=datetime_to_ts_millis(model.create_time))
if model.create_time
else None,
lastModified=TimeStampClass(
time=datetime_to_ts_millis(model.update_time)
)
if model.update_time
else None,
created=(
TimeStampClass(time=datetime_to_ts_millis(model.create_time))
if model.create_time
else None
),
lastModified=(
TimeStampClass(time=datetime_to_ts_millis(model.update_time))
if model.update_time
else None
),
customProperties={"displayName": model.display_name},
)
)
Expand Down Expand Up @@ -397,14 +395,14 @@ def _search_dataset(self, dataset_id: str) -> Optional[VertexAiResourceNoun]:
]

if self.datasets is None:
self.datasets = dict()
self.datasets = {}

for dtype in dataset_types:
dataset_class = getattr(self.client.datasets, dtype)
for ds in dataset_class.list():
self.datasets[ds.name] = ds

return self.datasets.get(dataset_id) if dataset_id in self.datasets else None
return self.datasets[dataset_id] if dataset_id in self.datasets else None

def _get_input_dataset_mcps(
self, job_meta: TrainingJobMetadata
Expand All @@ -428,9 +426,11 @@ def _get_input_dataset_mcps(
aspects.append(
DatasetPropertiesClass(
name=self._make_vertexai_dataset_name(ds.name),
created=TimeStampClass(time=datetime_to_ts_millis(ds.create_time))
if ds.create_time
else None,
created=(
TimeStampClass(time=datetime_to_ts_millis(ds.create_time))
if ds.create_time
else None
),
description=f"Dataset: {ds.display_name}",
customProperties={
"displayName": ds.display_name,
Expand Down Expand Up @@ -458,54 +458,54 @@ def _get_training_job_metadata(
and output models. It checks if the job is an AutoML job and retrieves the relevant
input dataset and output model information.
"""

job_meta = TrainingJobMetadata(job=job)

# Check if the job is an AutoML job
if self._is_automl_job(job):
# Check if input dataset is present in the job configuration
if (
hasattr(job, "_gca_resource")
and hasattr(job._gca_resource, "input_data_config")
and hasattr(job._gca_resource.input_data_config, "dataset_id")
):
# Create URN of Input Dataset for Training Job
dataset_id = job._gca_resource.input_data_config.dataset_id
logger.info(
f"Found input dataset (id: {dataset_id}) for training job ({job.display_name})"
)
job_conf = job.to_dict()
# Check if input dataset is present in the job configuration
if "inputDataConfig" in job_conf and "datasetId" in job_conf["inputDataConfig"]:
# Create URN of Input Dataset for Training Job
dataset_id = job_conf["inputDataConfig"]["datasetId"]
logger.info(
f"Found input dataset (id: {dataset_id}) for training job ({job.display_name})"
)

if dataset_id:
input_ds = self._search_dataset(dataset_id)
if input_ds:
logger.info(
f"Found the name of input dataset ({input_ds.display_name}) with dataset id ({dataset_id})"
)
if dataset_id:
input_ds = self._search_dataset(dataset_id)
if input_ds:
logger.info(
f"Found the name of input dataset ({input_ds.display_name}) with dataset id ({dataset_id})"
)
job_meta.input_dataset = input_ds

# Check if output model is present in the job configuration
if hasattr(job, "_gca_resource") and hasattr(
job._gca_resource, "model_to_upload"
):
model_version_str = job._gca_resource.model_to_upload.version_id
model_name = job._gca_resource.model_to_upload.name
try:
model = Model(model_name=model_name)
model_version = self._search_model_version(model, model_version_str)
if model and model_version:
logger.info(
f"Found output model (name:{model.display_name} id:{model_version_str}) "
f"for training job: {job.display_name}"
)
job_meta.output_model = model
job_meta.output_model_version = model_version
except GoogleAPICallError:
logger.error(
f"Error while fetching model version {model_version_str}"
# Check if output model is present in the job configuration
if (
"modelToUpload" in job_conf
and "name" in job_conf["modelToUpload"]
and job_conf["modelToUpload"]["name"]
and job_conf["modelToUpload"]["versionId"]
):
model_name = job_conf["modelToUpload"]["name"]
model_version_str = job_conf["modelToUpload"]["versionId"]
try:
model = Model(model_name=model_name)
model_version = self._search_model_version(model, model_version_str)
if model and model_version:
logger.info(
f"Found output model (name:{model.display_name} id:{model_version_str}) "
f"for training job: {job.display_name}"
)
job_meta.output_model = model
job_meta.output_model_version = model_version
except GoogleAPICallError as e:
self.report.report_failure(
title="Unable to fetch model and model version",
message="Encountered an error while fetching output model and model version which training job generates",
exc=e,
)

return job_meta

def _gen_endpoint_mcps(
def _gen_endpoints_mcps(
self, model_meta: ModelMetadata
) -> Iterable[MetadataChangeProposalWrapper]:
model: Model = model_meta.model
Expand Down Expand Up @@ -592,21 +592,25 @@ def _gen_ml_model_mcps(
"versionId": f"{model_version.version_id}",
"resourceName": model.resource_name,
},
created=TimeStampClass(
datetime_to_ts_millis(model_version.version_create_time)
)
if model_version.version_create_time
else None,
lastModified=TimeStampClass(
datetime_to_ts_millis(model_version.version_update_time)
)
if model_version.version_update_time
else None,
created=(
TimeStampClass(
datetime_to_ts_millis(model_version.version_create_time)
)
if model_version.version_create_time
else None
),
lastModified=(
TimeStampClass(
datetime_to_ts_millis(model_version.version_update_time)
)
if model_version.version_update_time
else None
),
version=VersionTagClass(versionTag=str(model_version.version_id)),
groups=[model_group_urn], # link model version to model group
trainingJobs=[training_job_urn]
if training_job_urn
else None, # link to training job
trainingJobs=(
[training_job_urn] if training_job_urn else None
), # link to training job
deployments=endpoint_urns,
externalUrl=self._make_model_version_external_url(model),
type="ML Model",
Expand All @@ -629,13 +633,19 @@ def _search_endpoint(self, model: Model) -> List[Endpoint]:
Search for an endpoint associated with the model.
"""
if self.endpoints is None:
endpoint_dict = defaultdict(list)
endpoint_dict: Dict[str, List[Endpoint]] = {}
for endpoint in self.client.Endpoint.list():
for resource in endpoint.list_models():
if resource.model not in endpoint_dict:
endpoint_dict[resource.model] = []
endpoint_dict[resource.model].append(endpoint)
self.endpoints = endpoint_dict

endpoints = self.endpoints[model.resource_name]
endpoints = (
self.endpoints[model.resource_name]
if model.resource_name in self.endpoints
else []
)
return endpoints

def _make_ml_model_urn(self, model_version: VersionInfo, model_name: str) -> str:
Expand All @@ -646,7 +656,7 @@ def _make_ml_model_urn(self, model_version: VersionInfo, model_name: str) -> str
)
return urn

def _make_job_urn(self, job: VertexAiResourceNoun) -> str:
def _make_training_job_urn(self, job: VertexAiResourceNoun) -> str:
job_id = self._make_vertexai_job_name(entity_id=job.name)
urn = builder.make_data_process_instance_urn(dataProcessInstanceId=job_id)
return urn
Expand All @@ -655,27 +665,22 @@ def _make_vertexai_model_group_name(
self,
entity_id: str,
) -> str:
separator: str = "."
return f"{self.config.project_id}{separator}model_group{separator}{entity_id}"
return f"{self.config.project_id}.model_group.{entity_id}"

def _make_vertexai_endpoint_name(self, entity_id: str) -> str:
separator: str = "."
return f"{self.config.project_id}{separator}endpoint{separator}{entity_id}"
return f"{self.config.project_id}.endpoint.{entity_id}"

def _make_vertexai_model_name(self, entity_id: str) -> str:
separator: str = "."
return f"{self.config.project_id}{separator}model{separator}{entity_id}"
return f"{self.config.project_id}.model.{entity_id}"

def _make_vertexai_dataset_name(self, entity_id: str) -> str:
separator: str = "."
return f"{self.config.project_id}{separator}dataset{separator}{entity_id}"
return f"{self.config.project_id}.dataset.{entity_id}"

def _make_vertexai_job_name(
self,
entity_id: Optional[str],
) -> str:
separator: str = "."
return f"{self.config.project_id}{separator}job{separator}{entity_id}"
return f"{self.config.project_id}.job.{entity_id}"

def _make_job_external_url(self, job: VertexAiResourceNoun) -> str:
"""
Expand Down
Empty file.
Loading
Loading