Skip to content

Commit 4ccf733

Browse files
committed
support for standard version
1 parent 48b6581 commit 4ccf733

File tree

10 files changed

+2411
-38
lines changed

10 files changed

+2411
-38
lines changed

metadata-ingestion/src/datahub/ingestion/source/fivetran/config.py

+68-17
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import warnings
44
from typing import Dict, Optional
55

6-
import pydantic
76
from pydantic import Field, root_validator
87
from typing_extensions import Literal
98

@@ -19,6 +18,7 @@
1918
from datahub.ingestion.source.bigquery_v2.bigquery_config import (
2019
BigQueryConnectionConfig,
2120
)
21+
from datahub.ingestion.source.fivetran.fivetran_constants import FivetranMode
2222
from datahub.ingestion.source.snowflake.snowflake_connection import (
2323
SnowflakeConnectionConfig,
2424
)
@@ -85,15 +85,15 @@ class BigQueryDestinationConfig(BigQueryConnectionConfig):
8585

8686

8787
class FivetranLogConfig(ConfigModel):
88-
destination_platform: Literal["snowflake", "bigquery"] = pydantic.Field(
88+
destination_platform: Literal["snowflake", "bigquery"] = Field(
8989
default="snowflake",
9090
description="The destination platform where fivetran connector log tables are dumped.",
9191
)
92-
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = pydantic.Field(
92+
snowflake_destination_config: Optional[SnowflakeDestinationConfig] = Field(
9393
default=None,
9494
description="If destination platform is 'snowflake', provide snowflake configuration.",
9595
)
96-
bigquery_destination_config: Optional[BigQueryDestinationConfig] = pydantic.Field(
96+
bigquery_destination_config: Optional[BigQueryDestinationConfig] = Field(
9797
default=None,
9898
description="If destination platform is 'bigquery', provide bigquery configuration.",
9999
)
@@ -121,6 +121,19 @@ def validate_destination_platfrom_and_config(cls, values: Dict) -> Dict:
121121
return values
122122

123123

124+
class FivetranAPIConfig(ConfigModel):
125+
"""Configuration for the Fivetran API client."""
126+
127+
api_key: str = Field(description="Fivetran API key")
128+
api_secret: str = Field(description="Fivetran API secret")
129+
base_url: str = Field(
130+
default="https://api.fivetran.com", description="Fivetran API base URL"
131+
)
132+
request_timeout_sec: int = Field(
133+
default=30, description="Request timeout in seconds"
134+
)
135+
136+
124137
@dataclasses.dataclass
125138
class MetadataExtractionPerfReport(Report):
126139
connectors_metadata_extraction_sec: PerfTimer = dataclasses.field(
@@ -150,33 +163,47 @@ def report_connectors_dropped(self, connector: str) -> None:
150163

151164

152165
class PlatformDetail(ConfigModel):
153-
platform: Optional[str] = pydantic.Field(
166+
platform: Optional[str] = Field(
154167
default=None,
155168
description="Override the platform type detection.",
156169
)
157-
platform_instance: Optional[str] = pydantic.Field(
170+
platform_instance: Optional[str] = Field(
158171
default=None,
159172
description="The instance of the platform that all assets produced by this recipe belong to",
160173
)
161-
env: str = pydantic.Field(
174+
env: str = Field(
162175
default=DEFAULT_ENV,
163176
description="The environment that all assets produced by DataHub platform ingestion source belong to",
164177
)
165-
database: Optional[str] = pydantic.Field(
178+
database: Optional[str] = Field(
166179
default=None,
167180
description="The database that all assets produced by this connector belong to. "
168181
"For destinations, this defaults to the fivetran log config's database.",
169182
)
170-
include_schema_in_urn: bool = pydantic.Field(
183+
include_schema_in_urn: bool = Field(
171184
default=True,
172185
description="Include schema in the dataset URN. In some cases, the schema is not relevant to the dataset URN and Fivetran sets it to the source and destination table names in the connector.",
173186
)
174187

175188

176189
class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
177-
fivetran_log_config: FivetranLogConfig = pydantic.Field(
178-
description="Fivetran log connector destination server configurations.",
190+
fivetran_mode: FivetranMode = Field(
191+
default=FivetranMode.AUTO,
192+
description="Mode of operation: 'enterprise' for log tables access, 'standard' for REST API, or 'auto' to detect.",
193+
)
194+
195+
# Enterprise version configuration
196+
fivetran_log_config: Optional[FivetranLogConfig] = Field(
197+
default=None,
198+
description="Fivetran log connector destination server configurations for enterprise version.",
199+
)
200+
201+
# Standard version configuration
202+
api_config: Optional[FivetranAPIConfig] = Field(
203+
default=None,
204+
description="Fivetran REST API configuration for standard version.",
179205
)
206+
180207
connector_patterns: AllowDenyPattern = Field(
181208
default=AllowDenyPattern.allow_all(),
182209
description="Filtering regex patterns for connector names.",
@@ -193,22 +220,46 @@ class FivetranSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin
193220
)
194221

195222
# Configuration for stateful ingestion
196-
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = pydantic.Field(
197-
default=None, description="Airbyte Stateful Ingestion Config."
223+
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
224+
default=None, description="Fivetran Stateful Ingestion Config."
198225
)
199226

200227
# Fivetran connector all sources to platform instance mapping
201-
sources_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
228+
sources_to_platform_instance: Dict[str, PlatformDetail] = Field(
202229
default={},
203230
description="A mapping from connector id to its platform/instance/env/database details.",
204231
)
205232
# Fivetran destination to platform instance mapping
206-
destination_to_platform_instance: Dict[str, PlatformDetail] = pydantic.Field(
233+
destination_to_platform_instance: Dict[str, PlatformDetail] = Field(
207234
default={},
208235
description="A mapping of destination id to its platform/instance/env details.",
209236
)
210237

211-
@pydantic.root_validator(pre=True)
238+
@root_validator
239+
def validate_config_based_on_mode(cls, values: Dict) -> Dict:
240+
"""Validate configuration based on the selected mode."""
241+
mode = values.get("fivetran_mode", FivetranMode.AUTO)
242+
log_config = values.get("fivetran_log_config")
243+
api_config = values.get("api_config")
244+
245+
if mode == FivetranMode.ENTERPRISE:
246+
if not log_config:
247+
raise ValueError(
248+
"Enterprise mode requires 'fivetran_log_config' to be specified."
249+
)
250+
elif mode == FivetranMode.STANDARD:
251+
if not api_config:
252+
raise ValueError("Standard mode requires 'api_config' to be specified.")
253+
elif mode == FivetranMode.AUTO:
254+
# Auto-detect based on provided configs
255+
if not log_config and not api_config:
256+
raise ValueError(
257+
"Either 'fivetran_log_config' (for enterprise) or 'api_config' (for standard) must be specified."
258+
)
259+
260+
return values
261+
262+
@root_validator(pre=True)
212263
def compat_sources_to_database(cls, values: Dict) -> Dict:
213264
if "sources_to_database" in values:
214265
warnings.warn(
@@ -227,7 +278,7 @@ def compat_sources_to_database(cls, values: Dict) -> Dict:
227278

228279
return values
229280

230-
history_sync_lookback_period: int = pydantic.Field(
281+
history_sync_lookback_period: int = Field(
231282
7,
232283
description="The number of days to look back when extracting connectors' sync history.",
233284
)

metadata-ingestion/src/datahub/ingestion/source/fivetran/data_classes.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from dataclasses import dataclass
2-
from typing import List
1+
from dataclasses import dataclass, field
2+
from typing import Any, Dict, List
33

44

55
@dataclass
@@ -26,6 +26,7 @@ class Connector:
2626
user_id: str
2727
lineage: List[TableLineage]
2828
jobs: List["Job"]
29+
additional_properties: Dict[str, Any] = field(default_factory=dict)
2930

3031

3132
@dataclass

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py

+55-15
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
PlatformDetail,
2727
)
2828
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
29-
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
29+
from datahub.ingestion.source.fivetran.fivetran_access import (
30+
create_fivetran_access,
31+
)
3032
from datahub.ingestion.source.fivetran.fivetran_query import (
3133
MAX_JOBS_PER_CONNECTOR,
3234
MAX_TABLE_LINEAGE_PER_CONNECTOR,
@@ -60,7 +62,7 @@
6062
class FivetranSource(StatefulIngestionSourceBase):
6163
"""
6264
This plugin extracts fivetran users, connectors, destinations and sync history.
63-
This plugin is in beta and has only been tested on Snowflake connector.
65+
Supports both enterprise and standard versions.
6466
"""
6567

6668
config: FivetranSourceConfig
@@ -72,7 +74,11 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):
7274
self.config = config
7375
self.report = FivetranSourceReport()
7476

75-
self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)
77+
# Create the appropriate access implementation using the factory
78+
self.fivetran_access = create_fivetran_access(config)
79+
80+
# For backward compatibility with existing tests
81+
self.audit_log = self.fivetran_access
7682

7783
def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, str]:
7884
input_dataset_urn_list: List[DatasetUrn] = []
@@ -104,11 +110,33 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
104110
connector.destination_id, PlatformDetail()
105111
)
106112
if destination_details.platform is None:
107-
destination_details.platform = (
108-
self.config.fivetran_log_config.destination_platform
109-
)
113+
# If using enterprise version, get destination platform from config
114+
if (
115+
hasattr(self.config, "fivetran_log_config")
116+
and self.config.fivetran_log_config is not None
117+
):
118+
destination_details.platform = (
119+
self.config.fivetran_log_config.destination_platform
120+
)
121+
else:
122+
# For standard version, use the detected platform if available
123+
detected_platform = connector.additional_properties.get(
124+
"destination_platform"
125+
)
126+
if detected_platform:
127+
destination_details.platform = detected_platform
128+
else:
129+
# Default to snowflake if detection failed
130+
destination_details.platform = "snowflake"
131+
132+
# Ensure database is not None to avoid attribute errors when calling .lower()
110133
if destination_details.database is None:
111-
destination_details.database = self.audit_log.fivetran_log_database
134+
destination_details.database = (
135+
self.fivetran_access.fivetran_log_database or ""
136+
)
137+
138+
if source_details.database is None:
139+
source_details.database = ""
112140

113141
if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
114142
self.report.warning(
@@ -124,13 +152,17 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
124152
if source_details.include_schema_in_urn
125153
else lineage.source_table.split(".", 1)[1]
126154
)
155+
156+
# Safe access to database.lower() with None check
157+
source_table_name = (
158+
f"{source_details.database.lower()}.{source_table}"
159+
if source_details.database
160+
else source_table
161+
)
162+
127163
input_dataset_urn = DatasetUrn.create_from_ids(
128164
platform_id=source_details.platform,
129-
table_name=(
130-
f"{source_details.database.lower()}.{source_table}"
131-
if source_details.database
132-
else source_table
133-
),
165+
table_name=source_table_name,
134166
env=source_details.env,
135167
platform_instance=source_details.platform_instance,
136168
)
@@ -141,9 +173,17 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
141173
if destination_details.include_schema_in_urn
142174
else lineage.destination_table.split(".", 1)[1]
143175
)
176+
177+
# Safe access to database.lower() with None check
178+
destination_table_name = (
179+
f"{destination_details.database.lower()}.{destination_table}"
180+
if destination_details.database
181+
else destination_table
182+
)
183+
144184
output_dataset_urn = DatasetUrn.create_from_ids(
145185
platform_id=destination_details.platform,
146-
table_name=f"{destination_details.database.lower()}.{destination_table}",
186+
table_name=destination_table_name,
147187
env=destination_details.env,
148188
platform_instance=destination_details.platform_instance,
149189
)
@@ -211,7 +251,7 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
211251
env=self.config.env,
212252
platform_instance=self.config.platform_instance,
213253
)
214-
owner_email = self.audit_log.get_user_email(connector.user_id)
254+
owner_email = self.fivetran_access.get_user_email(connector.user_id)
215255
datajob = DataJob(
216256
id=connector.connector_id,
217257
flow_urn=dataflow_urn,
@@ -314,7 +354,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
314354
Datahub Ingestion framework invoke this method
315355
"""
316356
logger.info("Fivetran plugin execution is started")
317-
connectors = self.audit_log.get_allowed_connectors_list(
357+
connectors = self.fivetran_access.get_allowed_connectors_list(
318358
self.config.connector_patterns,
319359
self.config.destination_patterns,
320360
self.report,

0 commit comments

Comments
 (0)