Skip to content

Commit 1fcebfc

Browse files
authored
BRANCH MERGE - Azure fabric connector dev (#12827)
1 parent 48b6581 commit 1fcebfc

38 files changed

+4616
-32
lines changed

datahub-web-react/src/app/ingest/source/builder/constants.ts

+4
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import sacLogo from '../../../../images/saclogo.svg';
3939
import cassandraLogo from '../../../../images/cassandralogo.png';
4040
import datahubLogo from '../../../../images/datahublogo.png';
4141
import neo4j from '../../../../images/neo4j.png';
42+
import msfabricLogo from '../../../../images/msfabric.png';
4243

4344
export const ATHENA = 'athena';
4445
export const ATHENA_URN = `urn:li:dataPlatform:${ATHENA}`;
@@ -140,6 +141,8 @@ export const DATAHUB_BUSINESS_GLOSSARY = 'datahub-business-glossary';
140141
export const DATAHUB_URN = `urn:li:dataPlatform:${DATAHUB}`;
141142
export const NEO4J = 'neo4j';
142143
export const NEO4J_URN = `urn:li:dataPlatform:${NEO4J}`;
144+
export const MSFABRIC = 'msfabric';
145+
export const MSFABRIC_URN = `urn:li:dataPlatform:${MSFABRIC}`;
143146

144147
export const PLATFORM_URN_TO_LOGO = {
145148
[ATHENA_URN]: athenaLogo,
@@ -184,6 +187,7 @@ export const PLATFORM_URN_TO_LOGO = {
184187
[CASSANDRA_URN]: cassandraLogo,
185188
[DATAHUB_URN]: datahubLogo,
186189
[NEO4J_URN]: neo4j,
190+
[MSFABRIC_URN]: msfabricLogo,
187191
};
188192

189193
export const SOURCE_TO_PLATFORM_URN = {

datahub-web-react/src/app/ingest/source/builder/sources.json

+8
Original file line numberDiff line numberDiff line change
@@ -333,5 +333,13 @@
333333
"description": "Import Nodes and Relationships from Neo4j.",
334334
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/neo4j/",
335335
"recipe": "source:\n type: 'neo4j'\n config:\n uri: 'neo4j+ssc://host:7687'\n username: 'neo4j'\n password: 'password'\n env: 'PROD'\n\nsink:\n type: \"datahub-rest\"\n config:\n server: 'http://localhost:8080'"
336+
},
337+
{
338+
"urn": "urn:li:dataPlatform:msfabric",
339+
"name": "msfabric",
340+
"displayName": "Microsoft Fabric",
341+
"description": "Import Nodes and Relationships from Microsoft Fabric.",
342+
"docsUrl": "https://datahubproject.io/docs/generated/ingestion/sources/msfabric/",
343+
"recipe": ""
336344
}
337345
]
37.7 KB
Loading

metadata-ingestion/setup.py

+7
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,11 @@
467467
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
468468
"setuptools",
469469
},
470+
"msfabric": {"requests"} | {*abs_base, *data_lake_profiling} | microsoft_common
471+
| {"lark[regex]==1.1.4", "sqlparse", "more-itertools"}
472+
| sqlglot_lib
473+
| threading_timeout_common |
474+
sql_common | mssql_common | {"pyodbc"} | {*data_lake_profiling, *delta_lake},
470475
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
471476
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
472477
"mssql": sql_common | mssql_common,
@@ -677,6 +682,7 @@
677682
"sac",
678683
"cassandra",
679684
"neo4j",
685+
"msfabric"
680686
]
681687
if plugin
682688
for dependency in plugins[plugin]
@@ -799,6 +805,7 @@
799805
"sac = datahub.ingestion.source.sac.sac:SACSource",
800806
"cassandra = datahub.ingestion.source.cassandra.cassandra:CassandraSource",
801807
"neo4j = datahub.ingestion.source.neo4j.neo4j_source:Neo4jSource",
808+
"msfabric = datahub.ingestion.source.ms_fabric.source:AzureFabricSource",
802809
],
803810
"datahub.ingestion.transformer.plugins": [
804811
"pattern_cleanup_ownership = datahub.ingestion.transformer.pattern_cleanup_ownership:PatternCleanUpOwnership",

metadata-ingestion/src/datahub/ingestion/source/azure/azure_common.py

+66-20
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
from typing import Dict, Optional, Union
22

3-
from azure.identity import ClientSecretCredential
3+
from azure.core.exceptions import ClientAuthenticationError
4+
from azure.identity import (
5+
AzureCliCredential,
6+
ClientSecretCredential,
7+
DefaultAzureCredential,
8+
ManagedIdentityCredential,
9+
)
410
from azure.storage.blob import BlobServiceClient
511
from azure.storage.filedatalake import DataLakeServiceClient, FileSystemClient
612
from pydantic import Field, root_validator
@@ -20,30 +26,41 @@ class AzureConnectionConfig(ConfigModel):
2026
default="/",
2127
description="Base folder in hierarchical namespaces to start from.",
2228
)
23-
container_name: str = Field(
29+
container_name: Optional[str] = Field(
30+
default=None,
2431
description="Azure storage account container name.",
2532
)
26-
account_name: str = Field(
33+
account_name: Optional[str] = Field(
34+
default=None,
2735
description="Name of the Azure storage account. See [Microsoft official documentation on how to create a storage account.](https://docs.microsoft.com/en-us/azure/storage/blobs/create-data-lake-storage-account)",
2836
)
37+
# Authentication Options
38+
use_managed_identity: bool = Field(
39+
default=False,
40+
description="Whether to use Azure Managed Identity authentication.",
41+
)
42+
use_cli_auth: bool = Field(
43+
default=False,
44+
description="Whether to authenticate using the Azure CLI.",
45+
)
2946
account_key: Optional[str] = Field(
30-
description="Azure storage account access key that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
47+
description="Azure storage account access key.",
3148
default=None,
3249
)
3350
sas_token: Optional[str] = Field(
34-
description="Azure storage account Shared Access Signature (SAS) token that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
51+
description="Azure storage account SAS token.",
3552
default=None,
3653
)
37-
client_secret: Optional[str] = Field(
38-
description="Azure client secret that can be used as a credential. **An account key, a SAS token or a client secret is required for authentication.**",
54+
client_id: Optional[str] = Field(
55+
description="Azure client (Application) ID for service principal auth.",
3956
default=None,
4057
)
41-
client_id: Optional[str] = Field(
42-
description="Azure client (Application) ID required when a `client_secret` is used as a credential.",
58+
client_secret: Optional[str] = Field(
59+
description="Azure client secret for service principal auth.",
4360
default=None,
4461
)
4562
tenant_id: Optional[str] = Field(
46-
description="Azure tenant (Directory) ID required when a `client_secret` is used as a credential.",
63+
description="Azure tenant ID required for service principal auth.",
4764
default=None,
4865
)
4966

@@ -72,19 +89,47 @@ def get_data_lake_service_client(self) -> DataLakeServiceClient:
7289

7390
def get_credentials(
7491
self,
75-
) -> Union[Optional[str], ClientSecretCredential]:
76-
if self.client_id and self.client_secret and self.tenant_id:
77-
return ClientSecretCredential(
78-
tenant_id=self.tenant_id,
79-
client_id=self.client_id,
80-
client_secret=self.client_secret,
81-
)
82-
return self.sas_token if self.sas_token is not None else self.account_key
92+
) -> Union[
93+
AzureCliCredential,
94+
ClientSecretCredential,
95+
DefaultAzureCredential,
96+
ManagedIdentityCredential,
97+
str,
98+
]:
99+
"""Get appropriate Azure credential based on configuration"""
100+
try:
101+
if self.use_managed_identity:
102+
return ManagedIdentityCredential()
103+
104+
elif self.use_cli_auth:
105+
return AzureCliCredential()
106+
107+
elif self.client_id and self.client_secret and self.tenant_id:
108+
return ClientSecretCredential(
109+
tenant_id=self.tenant_id,
110+
client_id=self.client_id,
111+
client_secret=self.client_secret,
112+
)
113+
114+
elif self.account_key:
115+
return self.account_key
116+
117+
elif self.sas_token:
118+
return self.sas_token
119+
120+
else:
121+
return DefaultAzureCredential()
122+
123+
except ClientAuthenticationError as e:
124+
raise ConfigurationError(f"Failed to initialize Azure credential: {str(e)}")
83125

84126
@root_validator()
85127
def _check_credential_values(cls, values: Dict) -> Dict:
128+
"""Validate that at least one valid authentication method is configured"""
86129
if (
87-
values.get("account_key")
130+
values.get("use_managed_identity")
131+
or values.get("use_cli_auth")
132+
or values.get("account_key")
88133
or values.get("sas_token")
89134
or (
90135
values.get("client_id")
@@ -94,5 +139,6 @@ def _check_credential_values(cls, values: Dict) -> Dict:
94139
):
95140
return values
96141
raise ConfigurationError(
97-
"credentials missing, requires one combination of account_key or sas_token or (client_id and client_secret and tenant_id)"
142+
"Authentication configuration missing. Please provide one of: "
143+
"managed identity, CLI auth, account key, SAS token, or service principal credentials"
98144
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
def strip_onelake_prefix(path: str) -> str:
2+
"""Strip the OneLake prefix from an ABFSS path."""
3+
if path.startswith("abfss://"):
4+
# Split on @ and take everything after the first slash after onelake.dfs.fabric.microsoft.com
5+
parts = path.split("@")[1].split("/", 1)
6+
if len(parts) > 1:
7+
return parts[1]
8+
return path

metadata-ingestion/src/datahub/ingestion/source/delta_lake/config.py

+36-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
)
1414
from datahub.ingestion.source.aws.aws_common import AwsConnectionConfig
1515
from datahub.ingestion.source.aws.s3_util import is_s3_uri
16+
from datahub.ingestion.source.azure.abs_utils import is_abs_uri
17+
from datahub.ingestion.source.azure.azure_common import AzureConnectionConfig
1618
from datahub.ingestion.source.state.stateful_ingestion_base import (
1719
StatefulIngestionConfigBase,
1820
)
@@ -38,6 +40,27 @@ class S3(ConfigModel):
3840
)
3941

4042

43+
class Azure(ConfigModel):
44+
"""Azure-specific configuration for Delta Lake"""
45+
46+
azure_config: Optional[AzureConnectionConfig] = Field(
47+
default=None, description="Azure configuration"
48+
)
49+
50+
use_abs_container_properties: Optional[bool] = Field(
51+
False,
52+
description="Whether or not to create properties in datahub from the Azure blob container",
53+
)
54+
use_abs_blob_properties: Optional[bool] = Field(
55+
False,
56+
description="Whether or not to create properties in datahub from the Azure blob",
57+
)
58+
use_abs_blob_tags: Optional[bool] = Field(
59+
False,
60+
description="Whether or not to create tags in datahub from Azure blob tags",
61+
)
62+
63+
4164
class DeltaLakeSourceConfig(
4265
PlatformInstanceConfigMixin,
4366
EnvConfigMixin,
@@ -78,12 +101,24 @@ class DeltaLakeSourceConfig(
78101
"When set to `False`, number_of_files in delta table can not be reported.",
79102
)
80103

81-
s3: Optional[S3] = Field()
104+
s3: Optional[S3] = Field(default=None, description="S3 specific configuration")
105+
azure: Optional[Azure] = Field(
106+
default=None, description="Azure specific configuration"
107+
)
82108

83109
@cached_property
84110
def is_s3(self):
85111
return is_s3_uri(self.base_path or "")
86112

113+
@cached_property
114+
def is_azure(self):
115+
is_abfss = self.base_path.startswith("abfss://")
116+
is_abs = is_abs_uri(self.base_path or "")
117+
logger.debug(
118+
f"Checking if {self.base_path} is Azure path: abfss={is_abfss}, abs={is_abs}"
119+
)
120+
return is_abfss or is_abs
121+
87122
@cached_property
88123
def complete_path(self):
89124
complete_path = self.base_path

metadata-ingestion/src/datahub/ingestion/source/delta_lake/delta_lake_utils.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@
1919
def read_delta_table(
2020
path: str, opts: Dict[str, str], delta_lake_config: DeltaLakeSourceConfig
2121
) -> Optional[DeltaTable]:
22-
if not delta_lake_config.is_s3 and not pathlib.Path(path).exists():
22+
# For local paths, check existence
23+
if (
24+
not delta_lake_config.is_s3
25+
and not delta_lake_config.is_azure
26+
and not pathlib.Path(path).exists()
27+
):
2328
# The DeltaTable() constructor will create the path if it doesn't exist.
2429
# Hence we need an extra, manual check here.
2530
return None

0 commit comments

Comments
 (0)