diff --git a/datahub-web-react/src/app/ingest/source/builder/sources.json b/datahub-web-react/src/app/ingest/source/builder/sources.json index 102cce0f491e36..c2882214830268 100644 --- a/datahub-web-react/src/app/ingest/source/builder/sources.json +++ b/datahub-web-react/src/app/ingest/source/builder/sources.json @@ -333,5 +333,12 @@ "description": "Import Nodes and Relationships from Neo4j.", "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/", "recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'" + }, + { + "urn": "urn:li:dataPlatform:vertexai", + "name": "vertexai", + "displayName": "VertexAI", + "docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/vertexai/", + "recipe": "source:\n type: vertexai\n config:\n project_id: # you GCP project ID \n region: # region where your GCP project resides \n # Credentials\n # Add GCP credentials" } ] diff --git a/datahub-web-react/src/images/vertexai.png b/datahub-web-react/src/images/vertexai.png new file mode 100644 index 00000000000000..93b43b7d61200f Binary files /dev/null and b/datahub-web-react/src/images/vertexai.png differ diff --git a/metadata-ingestion/docs/sources/vertexai/vertexai_pre.md b/metadata-ingestion/docs/sources/vertexai/vertexai_pre.md new file mode 100644 index 00000000000000..a1e0259dd7263d --- /dev/null +++ b/metadata-ingestion/docs/sources/vertexai/vertexai_pre.md @@ -0,0 +1,48 @@ +Ingesting metadata from VertexAI requires using the **Vertex AI** module. + +#### Prerequisites +Please refer to the [Vertex AI documentation](https://cloud.google.com/vertex-ai/docs) for basic information on Vertex AI. + +#### Credentials to access to GCP +Please read the section to understand how to set up application default Credentials to GCP [GCP docs](https://cloud.google.com/docs/authentication/provide-credentials-adc#how-to). + +#### Create a service account and assign roles + +1. Setup a ServiceAccount as per [GCP docs](https://cloud.google.com/iam/docs/creating-managing-service-accounts#iam-service-accounts-create-console) and assign the previously created role to this service account. +2. Download a service account JSON keyfile. + - Example credential file: + + ```json + { + "type": "service_account", + "project_id": "project-id-1234567", + "private_key_id": "d0121d0000882411234e11166c6aaa23ed5d74e0", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----", + "client_email": "test@suppproject-id-1234567.iam.gserviceaccount.com", + "client_id": "113545814931671546333", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%suppproject-id-1234567.iam.gserviceaccount.com" + } + ``` + +3. To provide credentials to the source, you can either: + +- Set an environment variable: + + ```sh + $ export GOOGLE_APPLICATION_CREDENTIALS="/path/to/keyfile.json" + ``` + + _or_ + +- Set credential config in your source based on the credential json file. For example: + + ```yml + credential: + private_key_id: "d0121d0000882411234e11166c6aaa23ed5d74e0" + private_key: "-----BEGIN PRIVATE KEY-----\nMIIyourkey\n-----END PRIVATE KEY-----\n" + client_email: "test@suppproject-id-1234567.iam.gserviceaccount.com" + client_id: "123456678890" + ``` diff --git a/metadata-ingestion/docs/sources/vertexai/vertexai_recipe.yml b/metadata-ingestion/docs/sources/vertexai/vertexai_recipe.yml new file mode 100644 index 00000000000000..684ccc9fc7c70e --- /dev/null +++ b/metadata-ingestion/docs/sources/vertexai/vertexai_recipe.yml @@ -0,0 +1,16 @@ +source: + type: vertexai + config: + project_id: "acryl-poc" + region: "us-west2" +# You must either set GOOGLE_APPLICATION_CREDENTIALS or provide credential as shown below +# credential: +# private_key: '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n' +# private_key_id: "project_key_id" +# client_email: "client_email" +# client_id: "client_id" + +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index fbeba1e510b645..7e172ed9cc2bca 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -532,6 +532,7 @@ "sigma": sqlglot_lib | {"requests"}, "sac": sac, "neo4j": {"pandas", "neo4j"}, + "vertexai": {"google-cloud-aiplatform>=1.80.0"}, } # This is mainly used to exclude plugins from the Docker image. @@ -677,6 +678,7 @@ "sac", "cassandra", "neo4j", + "vertexai", ] if plugin for dependency in plugins[plugin] @@ -710,6 +712,7 @@ "mariadb", "redash", "vertica", + "vertexai" ] if plugin for dependency in plugins[plugin] @@ -799,6 +802,7 @@ "sac = datahub.ingestion.source.sac.sac:SACSource", "cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource", "neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource", + "vertexai = datahub.ingestion.source.vertexai:VertexAISource", ], "datahub.ingestion.transformer.plugins": [ "pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership", diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py index 57bfa2e3090d31..1f777feeccf781 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py @@ -1,8 +1,6 @@ -import json import logging import os import re -import tempfile from datetime import timedelta from typing import Any, Dict, List, Optional, Union @@ -17,10 +15,10 @@ PlatformInstanceConfigMixin, ) from datahub.configuration.validate_field_removal import pydantic_removed_field -from datahub.configuration.validate_multiline_string import pydantic_multiline_string from datahub.ingestion.glossary.classification_mixin import ( ClassificationSourceConfigMixin, ) +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential from datahub.ingestion.source.data_lake_common.path_spec import PathSpec from datahub.ingestion.source.sql.sql_config import SQLCommonConfig, SQLFilterConfig from datahub.ingestion.source.state.stateful_ingestion_base import ( @@ -107,50 +105,8 @@ class BigQueryUsageConfig(BaseUsageConfig): ) -class BigQueryCredential(ConfigModel): - project_id: str = Field(description="Project id to set the credentials") - private_key_id: str = Field(description="Private key id") - private_key: str = Field( - description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'" - ) - client_email: str = Field(description="Client email") - client_id: str = Field(description="Client Id") - auth_uri: str = Field( - default="https://accounts.google.com/o/oauth2/auth", - description="Authentication uri", - ) - token_uri: str = Field( - default="https://oauth2.googleapis.com/token", description="Token uri" - ) - auth_provider_x509_cert_url: str = Field( - default="https://www.googleapis.com/oauth2/v1/certs", - description="Auth provider x509 certificate url", - ) - type: str = Field(default="service_account", description="Authentication type") - client_x509_cert_url: Optional[str] = Field( - default=None, - description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email", - ) - - _fix_private_key_newlines = pydantic_multiline_string("private_key") - - @root_validator(skip_on_failure=True) - def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]: - if values.get("client_x509_cert_url") is None: - values["client_x509_cert_url"] = ( - f"https://www.googleapis.com/robot/v1/metadata/x509/{values['client_email']}" - ) - return values - - def create_credential_temp_file(self) -> str: - with tempfile.NamedTemporaryFile(delete=False) as fp: - cred_json = json.dumps(self.dict(), indent=4, separators=(",", ": ")) - fp.write(cred_json.encode()) - return fp.name - - class BigQueryConnectionConfig(ConfigModel): - credential: Optional[BigQueryCredential] = Field( + credential: Optional[GCPCredential] = Field( default=None, description="BigQuery credential informations" ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/gcp_credentials_config.py b/metadata-ingestion/src/datahub/ingestion/source/common/gcp_credentials_config.py new file mode 100644 index 00000000000000..a1c9cac9319c8c --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/common/gcp_credentials_config.py @@ -0,0 +1,53 @@ +import json +import tempfile +from typing import Any, Dict, Optional + +from pydantic import Field, root_validator + +from datahub.configuration import ConfigModel +from datahub.configuration.validate_multiline_string import pydantic_multiline_string + + +class GCPCredential(ConfigModel): + project_id: Optional[str] = Field(description="Project id to set the credentials") + private_key_id: str = Field(description="Private key id") + private_key: str = Field( + description="Private key in a form of '-----BEGIN PRIVATE KEY-----\\nprivate-key\\n-----END PRIVATE KEY-----\\n'" + ) + client_email: str = Field(description="Client email") + client_id: str = Field(description="Client Id") + auth_uri: str = Field( + default="https://accounts.google.com/o/oauth2/auth", + description="Authentication uri", + ) + token_uri: str = Field( + default="https://oauth2.googleapis.com/token", description="Token uri" + ) + auth_provider_x509_cert_url: str = Field( + default="https://www.googleapis.com/oauth2/v1/certs", + description="Auth provider x509 certificate url", + ) + type: str = Field(default="service_account", description="Authentication type") + client_x509_cert_url: Optional[str] = Field( + default=None, + description="If not set it will be default to https://www.googleapis.com/robot/v1/metadata/x509/client_email", + ) + + _fix_private_key_newlines = pydantic_multiline_string("private_key") + + @root_validator(skip_on_failure=True) + def validate_config(cls, values: Dict[str, Any]) -> Dict[str, Any]: + if values.get("client_x509_cert_url") is None: + values["client_x509_cert_url"] = ( + f"https://www.googleapis.com/robot/v1/metadata/x509/{values['client_email']}" + ) + return values + + def create_credential_temp_file(self, project_id: Optional[str] = None) -> str: + configs = self.dict() + if project_id: + configs["project_id"] = project_id + with tempfile.NamedTemporaryFile(delete=False) as fp: + cred_json = json.dumps(configs, indent=4, separators=(",", ": ")) + fp.write(cred_json.encode()) + return fp.name diff --git a/metadata-ingestion/src/datahub/ingestion/source/vertexai.py b/metadata-ingestion/src/datahub/ingestion/source/vertexai.py new file mode 100644 index 00000000000000..2952712ce6083c --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/vertexai.py @@ -0,0 +1,697 @@ +import dataclasses +import logging +from datetime import datetime +from typing import Any, Dict, Iterable, List, Optional, TypeVar + +from google.api_core.exceptions import GoogleAPICallError +from google.cloud import aiplatform +from google.cloud.aiplatform import ( + AutoMLForecastingTrainingJob, + AutoMLImageTrainingJob, + AutoMLTabularTrainingJob, + AutoMLTextTrainingJob, + AutoMLVideoTrainingJob, + Endpoint, +) +from google.cloud.aiplatform.base import VertexAiResourceNoun +from google.cloud.aiplatform.models import Model, VersionInfo +from google.oauth2 import service_account +from pydantic import PrivateAttr +from pydantic.fields import Field + +import datahub.emitter.mce_builder as builder +from datahub.configuration.source_common import EnvConfigMixin +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import ProjectIdKey, gen_containers +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceCapability, SourceReport +from datahub.ingestion.api.source_helpers import auto_workunit +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential +from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import ( + MLTrainingRunProperties, +) +from datahub.metadata.schema_classes import ( + AuditStampClass, + ContainerClass, + DataProcessInstanceInputClass, + DataProcessInstancePropertiesClass, + DatasetPropertiesClass, + MLModelDeploymentPropertiesClass, + MLModelGroupPropertiesClass, + MLModelPropertiesClass, + SubTypesClass, + TimeStampClass, + VersionTagClass, +) +from datahub.utilities.str_enum import StrEnum +from datahub.utilities.time import datetime_to_ts_millis + +T = TypeVar("T") + +logger = logging.getLogger(__name__) + + +class VertexAIConfig(EnvConfigMixin): + credential: Optional[GCPCredential] = Field( + default=None, description="GCP credential information" + ) + project_id: str = Field(description=("Project ID in Google Cloud Platform")) + region: str = Field( + description=("Region of your project in Google Cloud Platform"), + ) + bucket_uri: Optional[str] = Field( + default=None, + description=("Bucket URI used in your project"), + ) + vertexai_url: Optional[str] = Field( + default="https://console.cloud.google.com/vertex-ai", + description=("VertexUI URI"), + ) + + _credentials_path: Optional[str] = PrivateAttr(None) + + def __init__(self, **data: Any): + super().__init__(**data) + if self.credential: + self._credentials_path = self.credential.create_credential_temp_file( + project_id=self.project_id + ) + + +class MLTypes(StrEnum): + TRAINING_JOB = "Training Job" + MODEL = "ML Model" + MODEL_GROUP = "ML Model Group" + ENDPOINT = "Endpoint" + DATASET = "Dataset" + PROJECT = "Project" + + +@dataclasses.dataclass +class TrainingJobMetadata: + job: VertexAiResourceNoun + input_dataset: Optional[VertexAiResourceNoun] = None + output_model: Optional[Model] = None + output_model_version: Optional[VersionInfo] = None + + +@dataclasses.dataclass +class ModelMetadata: + model: Model + model_version: VersionInfo + training_job_urn: Optional[str] = None + endpoints: Optional[List[Endpoint]] = None + + +@platform_name("Vertex AI", id="vertexai") +@config_class(VertexAIConfig) +@support_status(SupportStatus.TESTING) +@capability( + SourceCapability.DESCRIPTIONS, + "Extract descriptions for Vertex AI Registered Models and Model Versions", +) +@capability(SourceCapability.TAGS, "Extract tags for Vertex AI Registered Model Stages") +class VertexAISource(Source): + platform: str = "vertexai" + + def __init__(self, ctx: PipelineContext, config: VertexAIConfig): + super().__init__(ctx) + self.config = config + self.report = SourceReport() + + credentials = ( + service_account.Credentials.from_service_account_file( + self.config._credentials_path + ) + if self.config.credential + else None + ) + + aiplatform.init( + project=config.project_id, location=config.region, credentials=credentials + ) + self.client = aiplatform + self.endpoints: Optional[Dict[str, List[Endpoint]]] = None + self.datasets: Optional[Dict[str, VertexAiResourceNoun]] = None + + def get_report(self) -> SourceReport: + return self.report + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + """ + Main Function to fetch and yields mcps for various VertexAI resources. + - Models and Model Versions from the Model Registry + - Training Jobs + """ + + # Ingest Project + yield from self._gen_project_workunits() + # Fetch and Ingest Models, Model Versions a from Model Registry + yield from auto_workunit(self._get_ml_models_mcps()) + # Fetch and Ingest Training Jobs + yield from auto_workunit(self._get_training_jobs_mcps()) + # TODO Fetch Experiments and Experiment Runs + + def _gen_project_workunits(self) -> Iterable[MetadataWorkUnit]: + yield from gen_containers( + container_key=self._get_project_container(), + name=self.config.project_id, + sub_types=[MLTypes.PROJECT], + ) + + def _get_ml_models_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: + """ + Fetch List of Models in Model Registry and generate a corresponding mcp. + """ + registered_models = self.client.Model.list() + for model in registered_models: + # create mcp for Model Group (= Model in VertexAI) + yield from self._gen_ml_group_mcps(model) + model_versions = model.versioning_registry.list_versions() + for model_version in model_versions: + # create mcp for Model (= Model Version in VertexAI) + logger.info( + f"Ingesting a model (name: {model.display_name} id:{model.name})" + ) + yield from self._get_ml_model_mcps( + model=model, model_version=model_version + ) + + def _get_ml_model_mcps( + self, model: Model, model_version: VersionInfo + ) -> Iterable[MetadataChangeProposalWrapper]: + model_meta: ModelMetadata = self._get_ml_model_metadata(model, model_version) + # Create ML Model Entity + yield from self._gen_ml_model_mcps(model_meta) + # Create Endpoint Entity + yield from self._gen_endpoints_mcps(model_meta) + + def _get_ml_model_metadata( + self, model: Model, model_version: VersionInfo + ) -> ModelMetadata: + model_meta = ModelMetadata(model=model, model_version=model_version) + # Search for endpoints associated with the model + endpoints = self._search_endpoint(model) + model_meta.endpoints = endpoints + return model_meta + + def _get_training_jobs_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: + """ + Fetches training jobs from Vertex AI and generates corresponding mcps. + This method retrieves various types of training jobs from Vertex AI, including + CustomJob, CustomTrainingJob, CustomContainerTrainingJob, CustomPythonPackageTrainingJob, + AutoMLTabularTrainingJob, AutoMLTextTrainingJob, AutoMLImageTrainingJob, AutoMLVideoTrainingJob, + and AutoMLForecastingTrainingJob. For each job, it generates mcps containing metadata + about the job, its inputs, and its outputs. + """ + class_names = [ + "CustomJob", + "CustomTrainingJob", + "CustomContainerTrainingJob", + "CustomPythonPackageTrainingJob", + "AutoMLTabularTrainingJob", + "AutoMLTextTrainingJob", + "AutoMLImageTrainingJob", + "AutoMLVideoTrainingJob", + "AutoMLForecastingTrainingJob", + ] + # Iterate over class names and call the list() function + for class_name in class_names: + logger.info(f"Fetching a list of {class_name}s from VertexAI server") + for job in getattr(self.client, class_name).list(): + yield from self._get_training_job_mcps(job) + + def _get_training_job_mcps( + self, job: VertexAiResourceNoun + ) -> Iterable[MetadataChangeProposalWrapper]: + job_meta: TrainingJobMetadata = self._get_training_job_metadata(job) + # Create DataProcessInstance for the training job + yield from self._gen_training_job_mcps(job_meta) + # Create Dataset entity for Input Dataset of Training job + yield from self._get_input_dataset_mcps(job_meta) + # Create ML Model entity for output ML model of this training job + yield from self._gen_output_model_mcps(job_meta) + + def _gen_output_model_mcps( + self, job_meta: TrainingJobMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + if job_meta.output_model and job_meta.output_model_version: + job = job_meta.job + job_urn = builder.make_data_process_instance_urn( + self._make_vertexai_job_name(entity_id=job.name) + ) + + yield from self._gen_ml_model_mcps( + ModelMetadata( + model=job_meta.output_model, + model_version=job_meta.output_model_version, + training_job_urn=job_urn, + ) + ) + + def _gen_training_job_mcps( + self, job_meta: TrainingJobMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Generate a mcp for VertexAI Training Job + """ + job = job_meta.job + job_id = self._make_vertexai_job_name(entity_id=job.name) + job_urn = builder.make_data_process_instance_urn(job_id) + + created_time = ( + datetime_to_ts_millis(job.create_time) + if job.create_time + else datetime_to_ts_millis(datetime.now()) + ) + created_actor = "urn:li:corpuser:datahub" + + # If Training job has Input Dataset + dataset_urn = ( + builder.make_dataset_urn( + platform=self.platform, + name=self._make_vertexai_dataset_name( + entity_id=job_meta.input_dataset.name + ), + env=self.config.env, + ) + if job_meta.input_dataset + else None + ) + + yield from MetadataChangeProposalWrapper.construct_many( + job_urn, + aspects=[ + DataProcessInstancePropertiesClass( + name=job_id, + created=AuditStampClass( + time=created_time, + actor=created_actor, + ), + externalUrl=self._make_job_external_url(job), + customProperties={ + "displayName": job.display_name, + "jobType": job.__class__.__name__, + }, + ), + MLTrainingRunProperties( + externalUrl=self._make_job_external_url(job), id=job.name + ), + SubTypesClass(typeNames=[MLTypes.TRAINING_JOB]), + ContainerClass(container=self._get_project_container().as_urn()), + DataProcessInstanceInputClass(inputs=[dataset_urn]) + if dataset_urn + else None, + ], + ) + + def _gen_ml_group_mcps( + self, + model: Model, + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Generate an MLModelGroup mcp for a VertexAI Model. + """ + ml_model_group_urn = self._make_ml_model_group_urn(model) + + yield from MetadataChangeProposalWrapper.construct_many( + ml_model_group_urn, + aspects=[ + MLModelGroupPropertiesClass( + name=self._make_vertexai_model_group_name(model.name), + description=model.description, + created=( + TimeStampClass(time=datetime_to_ts_millis(model.create_time)) + if model.create_time + else None + ), + lastModified=( + TimeStampClass(time=datetime_to_ts_millis(model.update_time)) + if model.update_time + else None + ), + customProperties={"displayName": model.display_name}, + ), + # TODO add following when metadata model for mlgroup is updated (these aspects not supported currently) + # SubTypesClass(typeNames=[MLTypes.MODEL_GROUP]), + # ContainerClass(container=self._get_project_container().as_urn()) + ], + ) + + def _make_ml_model_group_urn(self, model: Model) -> str: + urn = builder.make_ml_model_group_urn( + platform=self.platform, + group_name=self._make_vertexai_model_group_name(model.name), + env=self.config.env, + ) + return urn + + def _get_project_container(self) -> ProjectIdKey: + return ProjectIdKey(project_id=self.config.project_id, platform=self.platform) + + def _is_automl_job(self, job: VertexAiResourceNoun) -> bool: + return ( + isinstance(job, AutoMLTabularTrainingJob) + or isinstance(job, AutoMLTextTrainingJob) + or isinstance(job, AutoMLImageTrainingJob) + or isinstance(job, AutoMLVideoTrainingJob) + or isinstance(job, AutoMLForecastingTrainingJob) + ) + + def _search_model_version( + self, model: Model, version_id: str + ) -> Optional[VersionInfo]: + for version in model.versioning_registry.list_versions(): + if version.version_id == version_id: + return version + return None + + def _search_dataset(self, dataset_id: str) -> Optional[VertexAiResourceNoun]: + """ + Search for a dataset by its ID in Vertex AI. + This method iterates through different types of datasets (Text, Tabular, Image, + TimeSeries, and Video) to find a dataset that matches the given dataset ID. + """ + + dataset_types = [ + "TextDataset", + "TabularDataset", + "ImageDataset", + "TimeSeriesDataset", + "VideoDataset", + ] + + if self.datasets is None: + self.datasets = {} + + for dtype in dataset_types: + dataset_class = getattr(self.client.datasets, dtype) + for ds in dataset_class.list(): + self.datasets[ds.name] = ds + + return self.datasets.get(dataset_id) + + def _get_input_dataset_mcps( + self, job_meta: TrainingJobMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Create a DatasetPropertiesClass aspect for a given Vertex AI dataset. + """ + ds = job_meta.input_dataset + + if ds: + # Create URN of Input Dataset for Training Job + dataset_name = self._make_vertexai_dataset_name(entity_id=ds.name) + dataset_urn = builder.make_dataset_urn( + platform=self.platform, + name=dataset_name, + env=self.config.env, + ) + + yield from MetadataChangeProposalWrapper.construct_many( + dataset_urn, + aspects=[ + DatasetPropertiesClass( + name=self._make_vertexai_dataset_name(ds.name), + created=( + TimeStampClass(time=datetime_to_ts_millis(ds.create_time)) + if ds.create_time + else None + ), + description=f"Dataset: {ds.display_name}", + customProperties={ + "displayName": ds.display_name, + "resourceName": ds.resource_name, + }, + qualifiedName=ds.resource_name, + ), + SubTypesClass(typeNames=[MLTypes.DATASET]), + ContainerClass(container=self._get_project_container().as_urn()), + ], + ) + + def _get_training_job_metadata( + self, job: VertexAiResourceNoun + ) -> TrainingJobMetadata: + """ + Retrieve metadata for a given Vertex AI training job. + This method extracts metadata for a Vertex AI training job, including input datasets + and output models. It checks if the job is an AutoML job and retrieves the relevant + input dataset and output model information. + """ + job_meta = TrainingJobMetadata(job=job) + # Check if the job is an AutoML job + job_conf = job.to_dict() + # Check if input dataset is present in the job configuration + if "inputDataConfig" in job_conf and "datasetId" in job_conf["inputDataConfig"]: + # Create URN of Input Dataset for Training Job + dataset_id = job_conf["inputDataConfig"]["datasetId"] + logger.info( + f"Found input dataset (id: {dataset_id}) for training job ({job.display_name})" + ) + + if dataset_id: + input_ds = self._search_dataset(dataset_id) + if input_ds: + logger.info( + f"Found the name of input dataset ({input_ds.display_name}) with dataset id ({dataset_id})" + ) + job_meta.input_dataset = input_ds + + # Check if output model is present in the job configuration + if ( + "modelToUpload" in job_conf + and "name" in job_conf["modelToUpload"] + and job_conf["modelToUpload"]["name"] + and job_conf["modelToUpload"]["versionId"] + ): + model_name = job_conf["modelToUpload"]["name"] + model_version_str = job_conf["modelToUpload"]["versionId"] + try: + model = Model(model_name=model_name) + model_version = self._search_model_version(model, model_version_str) + if model and model_version: + logger.info( + f"Found output model (name:{model.display_name} id:{model_version_str}) " + f"for training job: {job.display_name}" + ) + job_meta.output_model = model + job_meta.output_model_version = model_version + except GoogleAPICallError as e: + self.report.report_failure( + title="Unable to fetch model and model version", + message="Encountered an error while fetching output model and model version which training job generates", + exc=e, + ) + + return job_meta + + def _gen_endpoints_mcps( + self, model_meta: ModelMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + model: Model = model_meta.model + model_version: VersionInfo = model_meta.model_version + + if model_meta.endpoints: + for endpoint in model_meta.endpoints: + endpoint_urn = builder.make_ml_model_deployment_urn( + platform=self.platform, + deployment_name=self._make_vertexai_endpoint_name( + entity_id=endpoint.name + ), + env=self.config.env, + ) + + yield from MetadataChangeProposalWrapper.construct_many( + endpoint_urn, + aspects=[ + MLModelDeploymentPropertiesClass( + description=model.description, + createdAt=datetime_to_ts_millis(endpoint.create_time), + version=VersionTagClass( + versionTag=str(model_version.version_id) + ), + customProperties={"displayName": endpoint.display_name}, + ), + # TODO add followings when metadata for MLModelDeployment is updated (these aspects not supported currently) + # ContainerClass(container=self._get_project_container().as_urn()), + # SubTypesClass(typeNames=[MLTypes.ENDPOINT]) + ], + ) + + def _gen_ml_model_mcps( + self, ModelMetadata: ModelMetadata + ) -> Iterable[MetadataChangeProposalWrapper]: + """ + Generate an MLModel and Endpoint mcp for an VertexAI Model Version. + """ + + model: Model = ModelMetadata.model + model_version: VersionInfo = ModelMetadata.model_version + training_job_urn: Optional[str] = ModelMetadata.training_job_urn + endpoints: Optional[List[Endpoint]] = ModelMetadata.endpoints + endpoint_urns: List[str] = list() + + logging.info(f"generating model mcp for {model.name}") + + # Generate list of endpoint URL + if endpoints: + for endpoint in endpoints: + logger.info( + f"found endpoint ({endpoint.display_name}) for model ({model.resource_name})" + ) + endpoint_urns.append( + builder.make_ml_model_deployment_urn( + platform=self.platform, + deployment_name=self._make_vertexai_endpoint_name( + entity_id=endpoint.display_name + ), + env=self.config.env, + ) + ) + + # Create URN for Model and Model Version + model_group_urn = self._make_ml_model_group_urn(model) + model_name = self._make_vertexai_model_name(entity_id=model.name) + model_version_name = f"{model_name}_{model_version.version_id}" + model_urn = self._make_ml_model_urn(model_version, model_name=model_name) + + yield from MetadataChangeProposalWrapper.construct_many( + entityUrn=model_urn, + aspects=[ + MLModelPropertiesClass( + name=model_version_name, + description=model_version.version_description, + customProperties={ + "displayName": f"{model_version.model_display_name}", + "versionId": f"{model_version.version_id}", + "resourceName": model.resource_name, + }, + created=( + TimeStampClass( + datetime_to_ts_millis(model_version.version_create_time) + ) + if model_version.version_create_time + else None + ), + lastModified=( + TimeStampClass( + datetime_to_ts_millis(model_version.version_update_time) + ) + if model_version.version_update_time + else None + ), + version=VersionTagClass(versionTag=str(model_version.version_id)), + groups=[model_group_urn], # link model version to model group + trainingJobs=( + [training_job_urn] if training_job_urn else None + ), # link to training job + deployments=endpoint_urns, + externalUrl=self._make_model_version_external_url(model), + type="ML Model", + ), + # TODO Add a container for Project as parent of the dataset + # ContainerClass( + # container=self._get_project_container().as_urn(), + # ) + ], + ) + + def _search_endpoint(self, model: Model) -> List[Endpoint]: + """ + Search for an endpoint associated with the model. + """ + if self.endpoints is None: + endpoint_dict: Dict[str, List[Endpoint]] = {} + for endpoint in self.client.Endpoint.list(): + for resource in endpoint.list_models(): + if resource.model not in endpoint_dict: + endpoint_dict[resource.model] = [] + endpoint_dict[resource.model].append(endpoint) + self.endpoints = endpoint_dict + + endpoints = ( + self.endpoints[model.resource_name] + if model.resource_name in self.endpoints + else [] + ) + return endpoints + + def _make_ml_model_urn(self, model_version: VersionInfo, model_name: str) -> str: + urn = builder.make_ml_model_urn( + platform=self.platform, + model_name=f"{model_name}_{model_version.version_id}", + env=self.config.env, + ) + return urn + + def _make_training_job_urn(self, job: VertexAiResourceNoun) -> str: + job_id = self._make_vertexai_job_name(entity_id=job.name) + urn = builder.make_data_process_instance_urn(dataProcessInstanceId=job_id) + return urn + + def _make_vertexai_model_group_name( + self, + entity_id: str, + ) -> str: + return f"{self.config.project_id}.model_group.{entity_id}" + + def _make_vertexai_endpoint_name(self, entity_id: str) -> str: + return f"{self.config.project_id}.endpoint.{entity_id}" + + def _make_vertexai_model_name(self, entity_id: str) -> str: + return f"{self.config.project_id}.model.{entity_id}" + + def _make_vertexai_dataset_name(self, entity_id: str) -> str: + return f"{self.config.project_id}.dataset.{entity_id}" + + def _make_vertexai_job_name( + self, + entity_id: Optional[str], + ) -> str: + return f"{self.config.project_id}.job.{entity_id}" + + def _make_job_external_url(self, job: VertexAiResourceNoun) -> str: + """ + Model external URL in Vertex AI + Sample URLs: + https://console.cloud.google.com/vertex-ai/training/training-pipelines?project=acryl-poc&trainingPipelineId=5401695018589093888 + """ + external_url: str = ( + f"{self.config.vertexai_url}/training/training-pipelines?trainingPipelineId={job.name}" + f"?project={self.config.project_id}" + ) + return external_url + + def _make_model_external_url(self, model: Model) -> str: + """ + Model external URL in Vertex AI + Sample URL: + https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/812468724182286336?project=acryl-poc + """ + external_url: str = ( + f"{self.config.vertexai_url}/models/locations/{self.config.region}/models/{model.name}" + f"?project={self.config.project_id}" + ) + return external_url + + def _make_model_version_external_url(self, model: Model) -> str: + """ + Model Version external URL in Vertex AI + Sample URL: + https://console.cloud.google.com/vertex-ai/models/locations/us-west2/models/812468724182286336/versions/1?project=acryl-poc + """ + external_url: str = ( + f"{self.config.vertexai_url}/models/locations/{self.config.region}/models/{model.name}" + f"/versions/{model.version_id}" + f"?project={self.config.project_id}" + ) + return external_url diff --git a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py index 9f53ae2382d406..1aab4e9419014f 100644 --- a/metadata-ingestion/tests/integration/fivetran/test_fivetran.py +++ b/metadata-ingestion/tests/integration/fivetran/test_fivetran.py @@ -9,7 +9,7 @@ from datahub.configuration.common import ConfigurationWarning from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.run.pipeline import Pipeline -from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryCredential +from datahub.ingestion.source.common.gcp_credentials_config import GCPCredential from datahub.ingestion.source.fivetran.config import ( BigQueryDestinationConfig, FivetranSourceConfig, @@ -398,7 +398,7 @@ def test_fivetran_snowflake_destination_config(): @freeze_time(FROZEN_TIME) def test_fivetran_bigquery_destination_config(): bigquery_dest = BigQueryDestinationConfig( - credential=BigQueryCredential( + credential=GCPCredential( private_key_id="testprivatekey", project_id="test-project", client_email="fivetran-connector@test-project.iam.gserviceaccount.com", diff --git a/metadata-ingestion/tests/integration/vertexai/__init__.py b/metadata-ingestion/tests/integration/vertexai/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/tests/integration/vertexai/mock_vertexai.py b/metadata-ingestion/tests/integration/vertexai/mock_vertexai.py new file mode 100644 index 00000000000000..3bcd9b26c12d74 --- /dev/null +++ b/metadata-ingestion/tests/integration/vertexai/mock_vertexai.py @@ -0,0 +1,82 @@ +from datetime import datetime +from typing import List +from unittest.mock import MagicMock + +from google.cloud.aiplatform import AutoMLTabularTrainingJob, CustomJob, Model +from google.cloud.aiplatform.base import VertexAiResourceNoun +from google.cloud.aiplatform.models import Endpoint, VersionInfo +from google.protobuf import timestamp_pb2 + + +def gen_mock_model(num: int = 1) -> Model: + mock_model = MagicMock(spec=Model) + mock_model.name = f"mock_prediction_model_{num}" + + mock_model.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_model.version_id = f"{num}" + mock_model.display_name = f"mock_prediction_model_{num}_display_name" + mock_model.description = f"mock_prediction_model_{num}_description" + mock_model.resource_name = ( + f"projects/123/locations/us-central1/models/{num + 1}{num + 2}{num + 3}" + ) + return mock_model + + +def gen_mock_models(num: int = 2) -> List[Model]: + return [gen_mock_model(i) for i in range(1, num + 1)] + + +def gen_mock_training_custom_job() -> CustomJob: + mock_training_job = MagicMock(spec=CustomJob) + mock_training_job.name = "mock_training_job" + mock_training_job.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_training_job.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_training_job.display_name = "mock_training_job_display_name" + mock_training_job.description = "mock_training_job_description" + + return mock_training_job + + +def gen_mock_training_automl_job() -> AutoMLTabularTrainingJob: + mock_automl_job = MagicMock(spec=AutoMLTabularTrainingJob) + mock_automl_job.name = "mock_auto_automl_tabular_job" + mock_automl_job.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_automl_job.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_automl_job.display_name = "mock_auto_automl_tabular_job_display_name" + mock_automl_job.description = "mock_auto_automl_tabular_job_display_name" + mock_automl_job.to_dict.return_value = { + "inputDataConfig": {"datasetId": "2562882439508656128"} + } + return mock_automl_job + + +def gen_mock_model_version(mock_model: Model) -> VersionInfo: + version = "1" + return VersionInfo( + version_id=version, + version_description="test", + version_create_time=timestamp_pb2.Timestamp().GetCurrentTime(), + version_update_time=timestamp_pb2.Timestamp().GetCurrentTime(), + model_display_name=mock_model.name, + model_resource_name=mock_model.resource_name, + ) + + +def gen_mock_dataset() -> VertexAiResourceNoun: + mock_dataset = MagicMock(spec=VertexAiResourceNoun) + mock_dataset.name = "2562882439508656128" + mock_dataset.create_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_dataset.update_time = timestamp_pb2.Timestamp().GetCurrentTime() + mock_dataset.display_name = "mock_dataset_display_name" + mock_dataset.description = "mock_dataset_description" + mock_dataset.resource_name = "projects/123/locations/us-central1/datasets/456" + return mock_dataset + + +def gen_mock_endpoint() -> Endpoint: + mock_endpoint = MagicMock(spec=Endpoint) + mock_endpoint.description = "test endpoint" + mock_endpoint.create_time = datetime.now() + mock_endpoint.display_name = "test endpoint display name" + return mock_endpoint diff --git a/metadata-ingestion/tests/integration/vertexai/test_vertexai.py b/metadata-ingestion/tests/integration/vertexai/test_vertexai.py new file mode 100644 index 00000000000000..ce5d78bc2405a9 --- /dev/null +++ b/metadata-ingestion/tests/integration/vertexai/test_vertexai.py @@ -0,0 +1,109 @@ +import contextlib +from pathlib import Path +from typing import Any, Dict, TypeVar +from unittest.mock import patch + +from pytest import Config + +from datahub.ingestion.run.pipeline import Pipeline +from tests.integration.vertexai.mock_vertexai import ( + gen_mock_dataset, + gen_mock_model, + gen_mock_models, + gen_mock_training_automl_job, + gen_mock_training_custom_job, +) +from tests.test_helpers import mce_helpers + +T = TypeVar("T") + +PROJECT_ID = "test-project-id" +REGION = "us-west2" + + +def get_pipeline_config(sink_file_path: str) -> Dict[str, Any]: + source_type = "vertexai" + return { + "run_id": "vertexai-source-test", + "source": { + "type": source_type, + "config": { + "project_id": PROJECT_ID, + "region": REGION, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": sink_file_path, + }, + }, + } + + +def test_vertexai_source_ingestion(pytestconfig: Config, tmp_path: Path) -> None: + with contextlib.ExitStack() as exit_stack: + # Mock the Vertex API with empty list + for func_to_mock in [ + "google.cloud.aiplatform.init", + "google.cloud.aiplatform.datasets.TextDataset.list", + "google.cloud.aiplatform.datasets.ImageDataset.list", + "google.cloud.aiplatform.datasets.TimeSeriesDataset.list", + "google.cloud.aiplatform.datasets.VideoDataset.list", + "google.cloud.aiplatform.CustomTrainingJob.list", + "google.cloud.aiplatform.CustomContainerTrainingJob.list", + "google.cloud.aiplatform.CustomPythonPackageTrainingJob.list", + "google.cloud.aiplatform.AutoMLTextTrainingJob.list", + "google.cloud.aiplatform.AutoMLImageTrainingJob.list", + "google.cloud.aiplatform.AutoMLVideoTrainingJob.list", + "google.cloud.aiplatform.AutoMLForecastingTrainingJob.list", + "google.cloud.aiplatform.TabularDataset.list", + ]: + mock = exit_stack.enter_context(patch(func_to_mock)) + mock.return_value = [] + + # Mock the Vertex AI with Mock data list + mock_models = exit_stack.enter_context( + patch("google.cloud.aiplatform.Model.list") + ) + mock_models.return_value = gen_mock_models() + + mock_custom_job = exit_stack.enter_context( + patch("google.cloud.aiplatform.CustomJob.list") + ) + mock_custom_job.return_value = [gen_mock_training_custom_job()] + + mock_automl_job = exit_stack.enter_context( + patch("google.cloud.aiplatform.AutoMLTabularTrainingJob.list") + ) + mock_automl_job.return_value = [gen_mock_training_automl_job()] + + mock_ds = exit_stack.enter_context( + patch("google.cloud.aiplatform.datasets.TabularDataset.list") + ) + mock_ds.return_value = [gen_mock_dataset()] + + mock_model = exit_stack.enter_context( + patch("google.cloud.aiplatform.models.Model") + ) + mock_model.return_value = gen_mock_model() + + golden_file_path = ( + pytestconfig.rootpath + / "tests/integration/vertexai/vertexai_mcps_golden.json" + ) + + sink_file_path = str(tmp_path / "vertexai_source_mcps.json") + print(f"Output mcps file path: {str(sink_file_path)}") + print(f"Golden file path: {str(golden_file_path)}") + + pipeline = Pipeline.create(get_pipeline_config(sink_file_path)) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status() + + mce_helpers.check_golden_file( + pytestconfig=pytestconfig, + output_path=sink_file_path, + golden_path=golden_file_path, + ) diff --git a/metadata-ingestion/tests/integration/vertexai/vertexai_mcps_golden.json b/metadata-ingestion/tests/integration/vertexai/vertexai_mcps_golden.json new file mode 100644 index 00000000000000..3e393a00104ff3 --- /dev/null +++ b/metadata-ingestion/tests/integration/vertexai/vertexai_mcps_golden.json @@ -0,0 +1,458 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "vertexai", + "project_id": "test-project-id" + }, + "name": "test-project-id" + } + }, + "systemMetadata": { + "lastObserved": 1741552466405, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741552466405, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:vertexai" + } + }, + "systemMetadata": { + "lastObserved": 1741552466405, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741552466406, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [] + } + }, + "systemMetadata": { + "lastObserved": 1741552466406, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelGroupProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_1_display_name" + }, + "name": "test-project-id.model_group.mock_prediction_model_1", + "description": "mock_prediction_model_1_description" + } + }, + "systemMetadata": { + "lastObserved": 1741552466407, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_2,PROD)", + "changeType": "UPSERT", + "aspectName": "mlModelGroupProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_prediction_model_2_display_name" + }, + "name": "test-project-id.model_group.mock_prediction_model_2", + "description": "mock_prediction_model_2_description" + } + }, + "systemMetadata": { + "lastObserved": 1741552466408, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_training_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_training_job_display_name", + "jobType": "CustomJob" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_training_job?project=test-project-id", + "name": "test-project-id.job.mock_training_job", + "created": { + "time": 1741552466408, + "actor": "urn:li:platformResource:vertexai" + } + } + }, + "systemMetadata": { + "lastObserved": 1741552466409, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_training_job", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_training_job?project=test-project-id", + "id": "mock_training_job" + } + }, + "systemMetadata": { + "lastObserved": 1741552466409, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_training_job", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Training Job" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741552466409, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_training_job", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741552466410, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_auto_automl_tabular_job_display_name", + "jobType": "AutoMLTabularTrainingJob" + }, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "name": "test-project-id.job.mock_auto_automl_tabular_job", + "created": { + "time": 1741552466409, + "actor": "urn:li:platformResource:vertexai" + } + } + }, + "systemMetadata": { + "lastObserved": 1741552466410, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "mlTrainingRunProperties", + "aspect": { + "json": { + "customProperties": {}, + "externalUrl": "https://console.cloud.google.com/vertex-ai/training/training-pipelines?trainingPipelineId=mock_auto_automl_tabular_job?project=test-project-id", + "id": "mock_auto_automl_tabular_job" + } + }, + "systemMetadata": { + "lastObserved": 1741552466411, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Training Job" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741552466411, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741552466411, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.2562882439508656128,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741552466412, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.2562882439508656128,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "displayName": "mock_dataset_display_name", + "resourceName": "projects/123/locations/us-central1/datasets/456" + }, + "name": "test-project-id.dataset.2562882439508656128", + "qualifiedName": "projects/123/locations/us-central1/datasets/456", + "description": "Dataset: mock_dataset_display_name", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1741552466412, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.2562882439508656128,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "Dataset" + ] + } + }, + "systemMetadata": { + "lastObserved": 1741552466412, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.2562882439508656128,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + }, + "systemMetadata": { + "lastObserved": 1741552466413, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.2562882439508656128,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "urn:li:container:29746a9030349f4340ed74b46913dab6", + "urn": "urn:li:container:29746a9030349f4340ed74b46913dab6" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1741552466413, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_auto_automl_tabular_job", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741552466413, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:test-project-id.job.mock_training_job", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741552466414, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:vertexai,test-project-id.dataset.2562882439508656128,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741552466414, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_1,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741552466414, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "mlModelGroup", + "entityUrn": "urn:li:mlModelGroup:(urn:li:dataPlatform:vertexai,test-project-id.model_group.mock_prediction_model_2,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1741552466415, + "runId": "vertexai-source-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/test_vertexai_source.py b/metadata-ingestion/tests/unit/test_vertexai_source.py new file mode 100644 index 00000000000000..668eea9ba5eebf --- /dev/null +++ b/metadata-ingestion/tests/unit/test_vertexai_source.py @@ -0,0 +1,426 @@ +import contextlib +import json +from typing import List +from unittest.mock import patch + +import pytest + +import datahub.emitter.mce_builder as builder +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.vertexai import ( + MLTypes, + ModelMetadata, + TrainingJobMetadata, + VertexAIConfig, + VertexAISource, +) +from datahub.metadata.com.linkedin.pegasus2avro.ml.metadata import ( + MLModelGroupProperties, + MLModelProperties, +) +from datahub.metadata.schema_classes import ( + ContainerClass, + DataProcessInstanceInputClass, + DataProcessInstancePropertiesClass, + DatasetPropertiesClass, + MLModelDeploymentPropertiesClass, + MLModelGroupPropertiesClass, + MLTrainingRunPropertiesClass, + SubTypesClass, +) +from tests.integration.vertexai.mock_vertexai import ( + gen_mock_dataset, + gen_mock_endpoint, + gen_mock_model, + gen_mock_model_version, + gen_mock_models, + gen_mock_training_automl_job, + gen_mock_training_custom_job, +) + +PROJECT_ID = "acryl-poc" +REGION = "us-west2" + + +@pytest.fixture +def source() -> VertexAISource: + return VertexAISource( + ctx=PipelineContext(run_id="vertexai-source-test"), + config=VertexAIConfig(project_id=PROJECT_ID, region=REGION), + ) + + +def test_get_ml_model_mcps(source: VertexAISource) -> None: + mock_models = gen_mock_models() + with contextlib.ExitStack() as exit_stack: + mock = exit_stack.enter_context(patch("google.cloud.aiplatform.Model.list")) + mock.return_value = mock_models + + # Running _get_ml_models_mcps + actual_mcps = [mcp for mcp in source._get_ml_models_mcps()] + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + expected_urns = [ + builder.make_ml_model_group_urn( + platform=source.platform, + group_name=source._make_vertexai_model_group_name(mock_model.name), + env=source.config.env, + ) + for mock_model in mock_models + ] + # expect 2 model groups + assert actual_urns == expected_urns + + # assert expected aspect classes + expected_classes = [MLModelGroupPropertiesClass] * 2 + actual_classes = [mcp.aspect.__class__ for mcp in actual_mcps] + + assert expected_classes == actual_classes + + for mcp in actual_mcps: + assert isinstance(mcp, MetadataChangeProposalWrapper) + aspect = mcp.aspect + if isinstance(aspect, MLModelGroupProperties): + assert ( + aspect.name + == f"{source._make_vertexai_model_group_name(mock_models[0].name)}" + or aspect.name + == f"{source._make_vertexai_model_group_name(mock_models[1].name)}" + ) + assert ( + aspect.description == mock_models[0].description + or aspect.description == mock_models[1].description + ) + + +def test_get_ml_model_properties_mcps( + source: VertexAISource, +) -> None: + mock_model = gen_mock_model() + model_version = gen_mock_model_version(mock_model) + model_meta = ModelMetadata(mock_model, model_version) + + # Run _gen_ml_model_mcps + mcps = list(source._gen_ml_model_mcps(model_meta)) + assert len(mcps) == 1 + mcp = mcps[0] + assert isinstance(mcp, MetadataChangeProposalWrapper) + + # Assert URN + assert mcp.entityUrn == source._make_ml_model_urn( + model_version, source._make_vertexai_model_name(mock_model.name) + ) + aspect = mcp.aspect + # Assert Aspect Class + assert isinstance(aspect, MLModelProperties) + assert ( + aspect.name + == f"{source._make_vertexai_model_name(mock_model.name)}_{mock_model.version_id}" + ) + assert aspect.description == model_version.version_description + assert aspect.date == model_version.version_create_time + assert aspect.hyperParams is None + + +def test_get_endpoint_mcps( + source: VertexAISource, +) -> None: + mock_model = gen_mock_model() + model_version = gen_mock_model_version(mock_model) + mock_endpoint = gen_mock_endpoint() + model_meta = ModelMetadata( + model=mock_model, model_version=model_version, endpoints=[mock_endpoint] + ) + + # Run _gen_endpoint_mcps + actual_mcps = list(source._gen_endpoints_mcps(model_meta)) + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + endpoint_urn = builder.make_ml_model_deployment_urn( + platform=source.platform, + deployment_name=source._make_vertexai_endpoint_name( + entity_id=mock_endpoint.name + ), + env=source.config.env, + ) + expected_urns = [endpoint_urn] + # Assert URN, expect 1 endpoint urn + assert actual_urns == expected_urns + + # Assert Aspect Classes + expected_classes = [MLModelDeploymentPropertiesClass] + actual_classes = [mcp.aspect.__class__ for mcp in actual_mcps] + assert actual_classes == expected_classes + + for mcp in source._gen_endpoints_mcps(model_meta): + assert isinstance(mcp, MetadataChangeProposalWrapper) + aspect = mcp.aspect + if isinstance(aspect, MLModelDeploymentPropertiesClass): + assert aspect.description == mock_model.description + assert aspect.customProperties == { + "displayName": mock_endpoint.display_name + } + assert aspect.createdAt == int(mock_endpoint.create_time.timestamp() * 1000) + # TODO: Add following when container/subtype supported + # elif isinstance(aspect, ContainerClass): + # assert aspect.container == source._get_project_container().as_urn() + # elif isinstance(aspect, SubTypesClass): + # assert aspect.typeNames == ["Endpoint"] + + +def test_get_training_jobs_mcps( + source: VertexAISource, +) -> None: + mock_training_job = gen_mock_training_custom_job() + mock_training_automl_job = gen_mock_training_automl_job() + with contextlib.ExitStack() as exit_stack: + for func_to_mock in [ + "google.cloud.aiplatform.init", + "google.cloud.aiplatform.CustomJob.list", + "google.cloud.aiplatform.CustomTrainingJob.list", + "google.cloud.aiplatform.CustomContainerTrainingJob.list", + "google.cloud.aiplatform.CustomPythonPackageTrainingJob.list", + "google.cloud.aiplatform.AutoMLTabularTrainingJob.list", + "google.cloud.aiplatform.AutoMLImageTrainingJob.list", + "google.cloud.aiplatform.AutoMLTextTrainingJob.list", + "google.cloud.aiplatform.AutoMLVideoTrainingJob.list", + "google.cloud.aiplatform.AutoMLForecastingTrainingJob.list", + ]: + mock = exit_stack.enter_context(patch(func_to_mock)) + if func_to_mock == "google.cloud.aiplatform.CustomJob.list": + mock.return_value = [mock_training_job] + else: + mock.return_value = [] + + """ + Test the retrieval of training jobs work units from Vertex AI. + This function mocks customJob and AutoMLTabularTrainingJob, + and verifies the properties of the work units + """ + + # Run _get_training_jobs_mcps + actual_mcps = [mcp for mcp in source._get_training_jobs_mcps()] + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + expected_urns = [ + builder.make_data_process_instance_urn( + source._make_vertexai_job_name(mock_training_job.name) + ) + ] * 4 # expect 4 aspects + assert actual_urns == expected_urns + + # Assert Aspect Classes + expected_classes = [ + DataProcessInstancePropertiesClass, + MLTrainingRunPropertiesClass, + SubTypesClass, + ContainerClass, + ] + actual_classes = [mcp.aspect.__class__ for mcp in actual_mcps] + assert set(expected_classes) == set(actual_classes) + + # Assert Aspect Contents + for mcp in actual_mcps: + assert isinstance(mcp, MetadataChangeProposalWrapper) + aspect = mcp.aspect + if isinstance(aspect, DataProcessInstancePropertiesClass): + assert ( + aspect.name + == f"{source.config.project_id}.job.{mock_training_job.name}" + or f"{source.config.project_id}.job.{mock_training_automl_job.name}" + ) + assert ( + aspect.customProperties["displayName"] + == mock_training_job.display_name + or mock_training_automl_job.display_name + ) + if isinstance(aspect, MLTrainingRunPropertiesClass): + assert aspect.id == mock_training_job.name + assert aspect.externalUrl == source._make_job_external_url( + mock_training_job + ) + if isinstance(aspect, SubTypesClass): + assert aspect.typeNames == [MLTypes.TRAINING_JOB] + + if isinstance(aspect, ContainerClass): + assert aspect.container == source._get_project_container().as_urn() + + +def test_gen_training_job_mcps(source: VertexAISource) -> None: + mock_training_job = gen_mock_training_custom_job() + mock_dataset = gen_mock_dataset() + job_meta = TrainingJobMetadata(mock_training_job, input_dataset=mock_dataset) + + actual_mcps = [mcp for mcp in source._gen_training_job_mcps(job_meta)] + + # Assert Entity Urns + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + expected_urns = [ + builder.make_data_process_instance_urn( + source._make_vertexai_job_name(mock_training_job.name) + ) + ] * 5 # expect 5 aspects under the same urn for the job + assert actual_urns == expected_urns + + # Assert Aspect Classes + expected_classes = [ + DataProcessInstancePropertiesClass, + MLTrainingRunPropertiesClass, + SubTypesClass, + ContainerClass, + DataProcessInstanceInputClass, + ] + actual_classes = [mcp.aspect.__class__ for mcp in actual_mcps] + assert set(expected_classes) == set(actual_classes) + + dataset_name = source._make_vertexai_dataset_name(entity_id=mock_dataset.name) + dataset_urn = builder.make_dataset_urn( + platform=source.platform, + name=dataset_name, + env=source.config.env, + ) + + for mcp in actual_mcps: + assert isinstance(mcp, MetadataChangeProposalWrapper) + aspect = mcp.aspect + if isinstance(aspect, DataProcessInstancePropertiesClass): + assert ( + aspect.name + == f"{source.config.project_id}.job.{mock_training_job.name}" + ) + assert ( + aspect.customProperties["displayName"] == mock_training_job.display_name + ) + if isinstance(aspect, MLTrainingRunPropertiesClass): + assert aspect.id == mock_training_job.name + assert aspect.externalUrl == source._make_job_external_url( + mock_training_job + ) + + if isinstance(aspect, SubTypesClass): + assert aspect.typeNames == [MLTypes.TRAINING_JOB] + + if isinstance(aspect, ContainerClass): + assert aspect.container == source._get_project_container().as_urn() + + if isinstance(aspect, DataProcessInstanceInputClass): + assert aspect.inputs == [dataset_urn] + + +def test_vertexai_config_init(): + config_data = { + "project_id": "test-project", + "region": "us-central1", + "bucket_uri": "gs://test-bucket", + "vertexai_url": "https://console.cloud.google.com/vertex-ai", + "credential": { + "private_key_id": "test-key-id", + "private_key": "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n", + "client_email": "test-email@test-project.iam.gserviceaccount.com", + "client_id": "test-client-id", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "type": "service_account", + }, + } + + config = VertexAIConfig(**config_data) + + assert config.project_id == "test-project" + assert config.region == "us-central1" + assert config.bucket_uri == "gs://test-bucket" + assert config.vertexai_url == "https://console.cloud.google.com/vertex-ai" + assert config.credential is not None + assert config.credential.private_key_id == "test-key-id" + assert ( + config.credential.private_key + == "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n" + ) + assert ( + config.credential.client_email + == "test-email@test-project.iam.gserviceaccount.com" + ) + assert config.credential.client_id == "test-client-id" + assert config.credential.auth_uri == "https://accounts.google.com/o/oauth2/auth" + assert config.credential.token_uri == "https://oauth2.googleapis.com/token" + assert ( + config.credential.auth_provider_x509_cert_url + == "https://www.googleapis.com/oauth2/v1/certs" + ) + + assert config._credentials_path is not None + with open(config._credentials_path, "r") as file: + content = json.loads(file.read()) + assert content["project_id"] == "test-project" + assert content["private_key_id"] == "test-key-id" + assert content["private_key_id"] == "test-key-id" + assert ( + content["private_key"] + == "-----BEGIN PRIVATE KEY-----\ntest-private-key\n-----END PRIVATE KEY-----\n" + ) + assert ( + content["client_email"] == "test-email@test-project.iam.gserviceaccount.com" + ) + assert content["client_id"] == "test-client-id" + assert content["auth_uri"] == "https://accounts.google.com/o/oauth2/auth" + assert content["token_uri"] == "https://oauth2.googleapis.com/token" + assert ( + content["auth_provider_x509_cert_url"] + == "https://www.googleapis.com/oauth2/v1/certs" + ) + + +def test_get_input_dataset_mcps(source: VertexAISource) -> None: + mock_dataset = gen_mock_dataset() + mock_job = gen_mock_training_custom_job() + job_meta = TrainingJobMetadata(mock_job, input_dataset=mock_dataset) + + actual_mcps: List[MetadataChangeProposalWrapper] = list( + source._get_input_dataset_mcps(job_meta) + ) + + actual_urns = [mcp.entityUrn for mcp in actual_mcps] + assert job_meta.input_dataset is not None + expected_urns = [ + builder.make_dataset_urn( + platform=source.platform, + name=source._make_vertexai_dataset_name( + entity_id=job_meta.input_dataset.name + ), + env=source.config.env, + ) + ] * 3 # expect 3 aspects + assert actual_urns == expected_urns + + # Assert Aspect Classes + actual_classes = [mcp.aspect.__class__ for mcp in actual_mcps] + expected_classes = [DatasetPropertiesClass, ContainerClass, SubTypesClass] + assert set(expected_classes) == set(actual_classes) + + # Run _get_input_dataset_mcps + for mcp in actual_mcps: + assert isinstance(mcp, MetadataChangeProposalWrapper) + aspect = mcp.aspect + if isinstance(aspect, DataProcessInstancePropertiesClass): + assert aspect.name == f"{source._make_vertexai_job_name(mock_dataset.name)}" + assert aspect.customProperties["displayName"] == mock_dataset.display_name + elif isinstance(aspect, ContainerClass): + assert aspect.container == source._get_project_container().as_urn() + elif isinstance(aspect, SubTypesClass): + assert aspect.typeNames == ["Dataset"] + + +def test_make_model_external_url(source: VertexAISource) -> None: + mock_model = gen_mock_model() + assert ( + source._make_model_external_url(mock_model) + == f"{source.config.vertexai_url}/models/locations/{source.config.region}/models/{mock_model.name}" + f"?project={source.config.project_id}" + ) + + +def test_make_job_urn(source: VertexAISource) -> None: + mock_training_job = gen_mock_training_automl_job() + assert ( + source._make_training_job_urn(mock_training_job) + == f"{builder.make_data_process_instance_urn(source._make_vertexai_job_name(mock_training_job.name))}" + ) diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml index 4c4a7c2183073c..a4a4e426258d4a 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml @@ -747,4 +747,14 @@ displayName: Neo4j type: OTHERS logoUrl: "/assets/platforms/neo4j.png" +- entityUrn: urn:li:dataPlatform:vertexai + entityType: dataPlatform + aspectName: dataPlatformInfo + changeType: UPSERT + aspect: + datasetNameDelimiter: "." + name: vertexai + displayName: vertexai + type: OTHERS + logoUrl: "/assets/platforms/vertexai.png"