@@ -80,16 +80,24 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):
80
80
# For backward compatibility with existing tests
81
81
self .audit_log = self .fivetran_access
82
82
83
- def _extend_lineage (self , connector : Connector , datajob : DataJob ) -> Dict [str , str ]:
83
+ def _extend_lineage (
84
+ self ,
85
+ connector : Connector ,
86
+ datajob : DataJob ,
87
+ source_details : Optional [PlatformDetail ] = None ,
88
+ destination_details : Optional [PlatformDetail ] = None ,
89
+ ) -> Dict [str , str ]:
84
90
"""Build lineage between source and destination datasets."""
85
91
# Initialize empty lists for dataset URNs and fine-grained lineage
86
92
input_dataset_urn_list : List [DatasetUrn ] = []
87
93
output_dataset_urn_list : List [DatasetUrn ] = []
88
94
fine_grained_lineage : List [FineGrainedLineage ] = []
89
95
90
- # Obtain source and destination platform details
91
- source_details = self ._get_source_details (connector )
92
- destination_details = self ._get_destination_details (connector )
96
+ # Obtain source and destination platform details if not provided
97
+ if source_details is None :
98
+ source_details = self ._get_source_details (connector )
99
+ if destination_details is None :
100
+ destination_details = self ._get_destination_details (connector )
93
101
94
102
# Handle lineage truncation if needed
95
103
if len (connector .lineage ) >= MAX_TABLE_LINEAGE_PER_CONNECTOR :
@@ -110,15 +118,19 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
110
118
is_source = False ,
111
119
)
112
120
121
+ # Skip if either URN creation failed
122
+ if not source_urn or not dest_urn :
123
+ continue
124
+
113
125
# Add URNs to lists (avoiding duplicates)
114
- if source_urn and source_urn not in input_dataset_urn_list :
126
+ if source_urn not in input_dataset_urn_list :
115
127
input_dataset_urn_list .append (source_urn )
116
128
117
- if dest_urn and dest_urn not in output_dataset_urn_list :
129
+ if dest_urn not in output_dataset_urn_list :
118
130
output_dataset_urn_list .append (dest_urn )
119
131
120
132
# Create column lineage if enabled
121
- if self .config .include_column_lineage and source_urn and dest_urn :
133
+ if self .config .include_column_lineage :
122
134
self ._create_column_lineage (
123
135
lineage = lineage ,
124
136
source_urn = source_urn ,
@@ -132,12 +144,30 @@ def _extend_lineage(self, connector: Connector, datajob: DataJob) -> Dict[str, s
132
144
datajob .fine_grained_lineages .extend (fine_grained_lineage )
133
145
134
146
# Build properties from details and connector properties
135
- return self ._build_lineage_properties (
147
+ lineage_properties = self ._build_lineage_properties (
136
148
connector = connector ,
137
149
source_details = source_details ,
138
150
destination_details = destination_details ,
139
151
)
140
152
153
+ # Add source and destination platform information to properties
154
+ if source_details .platform :
155
+ lineage_properties ["source.platform" ] = source_details .platform
156
+ if destination_details .platform :
157
+ lineage_properties ["destination.platform" ] = destination_details .platform
158
+
159
+ # Add database information if available
160
+ if source_details .database :
161
+ lineage_properties ["source.database" ] = source_details .database
162
+ if destination_details .database :
163
+ lineage_properties ["destination.database" ] = destination_details .database
164
+
165
+ # Add environment information
166
+ lineage_properties ["source.env" ] = source_details .env or "PROD"
167
+ lineage_properties ["destination.env" ] = destination_details .env or "PROD"
168
+
169
+ return lineage_properties
170
+
141
171
def _get_source_details (self , connector : Connector ) -> PlatformDetail :
142
172
"""Get source platform details for a connector."""
143
173
source_details = self .config .sources_to_platform_instance .get (
@@ -202,22 +232,30 @@ def _create_dataset_urn(
202
232
if not table_name :
203
233
return None
204
234
205
- # Handle schema inclusion based on configuration
206
- if not details .include_schema_in_urn and "." in table_name :
207
- table_name = table_name .split ("." , 1 )[1 ]
235
+ try :
236
+ # Handle schema inclusion based on configuration
237
+ if not details .include_schema_in_urn and "." in table_name :
238
+ table_name = table_name .split ("." , 1 )[1 ]
239
+
240
+ # Ensure we have a platform
241
+ platform = details .platform
242
+ if not platform :
243
+ platform = "snowflake" if not is_source else "external"
244
+
245
+ # Include database in the table name if available
246
+ full_table_name = (
247
+ f"{ details .database .lower ()} .{ table_name } "
248
+ if details .database
249
+ else table_name
250
+ )
208
251
209
- # Include database in the table name if available
210
- full_table_name = (
211
- f"{ details .database .lower ()} .{ table_name } "
212
- if details .database
213
- else table_name
214
- )
252
+ # Ensure environment is set
253
+ env = details .env or "PROD"
215
254
216
- try :
217
255
return DatasetUrn .create_from_ids (
218
- platform_id = details . platform ,
256
+ platform_id = platform ,
219
257
table_name = full_table_name ,
220
- env = details . env ,
258
+ env = env ,
221
259
platform_instance = details .platform_instance ,
222
260
)
223
261
except Exception as e :
@@ -238,30 +276,42 @@ def _report_lineage_truncation(self, connector: Connector) -> None:
238
276
def _create_column_lineage (
239
277
self ,
240
278
lineage ,
241
- source_urn : DatasetUrn ,
242
- dest_urn : DatasetUrn ,
279
+ source_urn : Optional [ DatasetUrn ] ,
280
+ dest_urn : Optional [ DatasetUrn ] ,
243
281
fine_grained_lineage : List [FineGrainedLineage ],
244
282
) -> None :
245
283
"""Create column-level lineage between source and destination tables."""
284
+ if not source_urn or not dest_urn :
285
+ return
286
+
246
287
for column_lineage in lineage .column_lineage :
247
- fine_grained_lineage .append (
248
- FineGrainedLineage (
249
- upstreamType = FineGrainedLineageUpstreamType .FIELD_SET ,
250
- upstreams = [
251
- builder .make_schema_field_urn (
252
- str (source_urn ),
253
- column_lineage .source_column ,
254
- )
255
- ],
256
- downstreamType = FineGrainedLineageDownstreamType .FIELD ,
257
- downstreams = [
258
- builder .make_schema_field_urn (
259
- str (dest_urn ),
260
- column_lineage .destination_column ,
261
- )
262
- ],
288
+ if (
289
+ not column_lineage .source_column
290
+ or not column_lineage .destination_column
291
+ ):
292
+ continue
293
+
294
+ try :
295
+ source_field_urn = builder .make_schema_field_urn (
296
+ str (source_urn ),
297
+ column_lineage .source_column ,
298
+ )
299
+
300
+ dest_field_urn = builder .make_schema_field_urn (
301
+ str (dest_urn ),
302
+ column_lineage .destination_column ,
263
303
)
264
- )
304
+
305
+ fine_grained_lineage .append (
306
+ FineGrainedLineage (
307
+ upstreamType = FineGrainedLineageUpstreamType .FIELD_SET ,
308
+ upstreams = [source_field_urn ],
309
+ downstreamType = FineGrainedLineageDownstreamType .FIELD ,
310
+ downstreams = [dest_field_urn ],
311
+ )
312
+ )
313
+ except Exception as e :
314
+ logger .warning (f"Failed to create column lineage: { e } " )
265
315
266
316
def _build_lineage_properties (
267
317
self ,
@@ -416,31 +466,44 @@ def _detect_destination_platform(self, connector: Connector) -> str:
416
466
417
467
def _create_destination_urn (
418
468
self , lineage , destination_details : PlatformDetail
419
- ) -> DatasetUrn :
469
+ ) -> Optional [ DatasetUrn ] :
420
470
"""Create a dataset URN for a destination table."""
421
- destination_table = (
422
- lineage .destination_table
423
- if destination_details .include_schema_in_urn
424
- else lineage .destination_table .split ("." , 1 )[1 ]
425
- )
471
+ if not lineage .destination_table :
472
+ return None
426
473
427
- # Safe access to database.lower() with None check
428
- destination_table_name = (
429
- f"{ destination_details .database .lower ()} .{ destination_table } "
430
- if destination_details .database
431
- else destination_table
432
- )
474
+ try :
475
+ destination_table = (
476
+ lineage .destination_table
477
+ if destination_details .include_schema_in_urn
478
+ else lineage .destination_table .split ("." , 1 )[1 ]
479
+ if "." in lineage .destination_table
480
+ else lineage .destination_table
481
+ )
433
482
434
- return DatasetUrn .create_from_ids (
435
- platform_id = destination_details .platform ,
436
- table_name = destination_table_name ,
437
- env = destination_details .env ,
438
- platform_instance = destination_details .platform_instance ,
439
- )
483
+ # Safe access to database with None check
484
+ destination_table_name = (
485
+ f"{ destination_details .database .lower ()} .{ destination_table } "
486
+ if destination_details .database
487
+ else destination_table
488
+ )
489
+
490
+ return DatasetUrn .create_from_ids (
491
+ platform_id = destination_details .platform
492
+ or "snowflake" , # Default to snowflake if not specified
493
+ table_name = destination_table_name ,
494
+ env = destination_details .env ,
495
+ platform_instance = destination_details .platform_instance ,
496
+ )
497
+ except Exception as e :
498
+ logger .warning (f"Failed to create destination URN: { e } " )
499
+ return None
440
500
441
501
def _generate_dataflow_from_connector (self , connector : Connector ) -> DataFlow :
442
502
"""Generate a DataFlow entity from a connector."""
443
503
# Extract connector-specific metadata to enrich the dataflow
504
+ connector_name = (
505
+ connector .connector_name or f"Fivetran-{ connector .connector_id } "
506
+ )
444
507
description = f"Fivetran connector for { connector .connector_type } "
445
508
properties = {}
446
509
@@ -454,11 +517,16 @@ def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow:
454
517
properties ["paused" ] = str (connector .paused )
455
518
properties ["destination_id" ] = connector .destination_id
456
519
520
+ # Get destination platform if available
521
+ if "destination_platform" in connector .additional_properties :
522
+ destination = connector .additional_properties .get ("destination_platform" )
523
+ description += f" to { destination } "
524
+
457
525
return DataFlow (
458
526
orchestrator = Constant .ORCHESTRATOR ,
459
527
id = connector .connector_id ,
460
- env = self .config .env ,
461
- name = connector . connector_name ,
528
+ env = self .config .env or "PROD" ,
529
+ name = connector_name ,
462
530
description = description ,
463
531
properties = properties ,
464
532
platform_instance = self .config .platform_instance ,
@@ -469,31 +537,55 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
469
537
dataflow_urn = DataFlowUrn .create_from_ids (
470
538
orchestrator = Constant .ORCHESTRATOR ,
471
539
flow_id = connector .connector_id ,
472
- env = self .config .env ,
540
+ env = self .config .env or "PROD" ,
473
541
platform_instance = self .config .platform_instance ,
474
542
)
475
543
476
- # Get owner information
477
- owner_email = self .fivetran_access .get_user_email (connector .user_id )
544
+ # Extract useful connector information
545
+ connector_name = (
546
+ connector .connector_name or f"Fivetran-{ connector .connector_id } "
547
+ )
478
548
479
- # Create job description based on connector properties
480
- description = f"Fivetran data pipeline from { connector .connector_type } "
549
+ # Get source platform from connector type
550
+ source_platform = self ._detect_source_platform (connector )
551
+
552
+ # Get destination platform
553
+ destination_platform = "snowflake" # Default
481
554
if "destination_platform" in connector .additional_properties :
482
- destination = connector .additional_properties .get ("destination_platform" )
483
- description += f" to { destination } "
555
+ destination_platform = connector .additional_properties .get (
556
+ "destination_platform"
557
+ )
558
+
559
+ # Create job description
560
+ description = f"Fivetran data pipeline from { connector .connector_type } to { destination_platform } "
484
561
485
- # Create the DataJob with basic information
562
+ # Get owner information
563
+ owner_email = self .fivetran_access .get_user_email (connector .user_id )
564
+ owner_set = {owner_email } if owner_email else set ()
565
+
566
+ # Create the DataJob with enhanced information
486
567
datajob = DataJob (
487
568
id = connector .connector_id ,
488
569
flow_urn = dataflow_urn ,
489
- name = connector . connector_name ,
570
+ name = connector_name ,
490
571
description = description ,
491
- owners = { owner_email } if owner_email else set () ,
572
+ owners = owner_set ,
492
573
)
493
574
494
575
# Map connector source and destination table with dataset entity
495
576
# Also extend the fine grained lineage of column if include_column_lineage is True
496
- lineage_properties = self ._extend_lineage (connector = connector , datajob = datajob )
577
+ source_details = self ._get_source_details (connector )
578
+ source_details .platform = source_platform
579
+
580
+ destination_details = self ._get_destination_details (connector )
581
+ destination_details .platform = destination_platform
582
+
583
+ lineage_properties = self ._extend_lineage (
584
+ connector = connector ,
585
+ datajob = datajob ,
586
+ source_details = source_details ,
587
+ destination_details = destination_details ,
588
+ )
497
589
498
590
# Extract connector properties for the DataJob
499
591
connector_properties : Dict [str , str ] = {
0 commit comments