Skip to content

Commit 34f9d0f

Browse files
committed
references as lineage
1 parent cbd6f6e commit 34f9d0f

File tree

5 files changed

+2560
-111
lines changed

5 files changed

+2560
-111
lines changed

metadata-ingestion/src/datahub/ingestion/source/salesforce.py

+78-21
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
BooleanTypeClass,
5252
BytesTypeClass,
5353
DataPlatformInstanceClass,
54+
DatasetLineageTypeClass,
5455
DatasetProfileClass,
5556
DatasetPropertiesClass,
5657
DateTypeClass,
@@ -69,6 +70,8 @@
6970
StringTypeClass,
7071
SubTypesClass,
7172
TagAssociationClass,
73+
UpstreamClass,
74+
UpstreamLineageClass,
7275
)
7376
from datahub.utilities import config_clean
7477
from datahub.utilities.lossy_collections import LossyList
@@ -151,7 +154,8 @@ class SalesforceConfig(
151154
description="Regex patterns for profiles to filter in ingestion, allowed by the `object_pattern`.",
152155
)
153156

154-
set_referenced_entities_as_upstream: bool = Field(
157+
# Given lack of ERD visual graph view support, this alternate is useful.
158+
use_referenced_entities_as_upstreams: bool = Field(
155159
default=False,
156160
description="If enabled, referenced entities will be treated as upstream entities.",
157161
)
@@ -170,6 +174,10 @@ def remove_trailing_slash(cls, v):
170174
class SalesforceSourceReport(StaleEntityRemovalSourceReport):
171175
filtered: LossyList[str] = dataclass_field(default_factory=LossyList)
172176

177+
objects_with_calculated_field: LossyList[str] = dataclass_field(
178+
default_factory=LossyList
179+
)
180+
173181
def report_dropped(self, ent_name: str) -> None:
174182
self.filtered.append(ent_name)
175183

@@ -343,7 +351,7 @@ def get_custom_object_details(self, sObjectDeveloperName: str) -> dict:
343351
customObject = custom_objects_response["records"][0]
344352
return customObject
345353

346-
def get_fields_for_object(self, sObjectName, sObject):
354+
def get_fields_for_object(self, sObjectName: str, sObjectDurableId: str) -> list:
347355
sObject_fields_query_url = (
348356
self.base_url
349357
+ "tooling/query?q=SELECT "
@@ -353,7 +361,7 @@ def get_fields_for_object(self, sObjectName, sObject):
353361
+ "IsCompound, IsComponent, ReferenceTo, FieldDefinition.ComplianceGroup,"
354362
+ "RelationshipName, IsNillable, FieldDefinition.Description, InlineHelpText, "
355363
+ "IsCalculated FROM EntityParticle WHERE EntityDefinitionId='{}'".format(
356-
sObject["DurableId"]
364+
sObjectDurableId
357365
)
358366
)
359367

@@ -532,10 +540,33 @@ def get_salesforce_object_workunits(
532540

533541
yield self.get_properties_workunit(sObject, customObject, datasetUrn)
534542

543+
allFields = self.sf_api.get_fields_for_object(sObjectName, sObject["DurableId"])
544+
545+
customFields = self.sf_api.get_custom_fields_for_object(
546+
sObjectName, sObject["DurableId"]
547+
)
548+
549+
if any(field["IsCalculated"] for field in allFields):
550+
# Although formula is present in Metadata column of CustomField entity,
551+
# we can not use it as it allows querying only for one field at a time
552+
# and that would not be performant
553+
self.report.objects_with_calculated_field.append(sObjectName)
554+
calculated_field_formulae = self.get_calculated_field_formulae(sObjectName)
555+
else:
556+
calculated_field_formulae = {}
557+
535558
yield from self.get_schema_metadata_workunit(
536-
sObjectName, sObject, customObject, datasetUrn
559+
sObjectName,
560+
allFields,
561+
customFields,
562+
customObject,
563+
datasetUrn,
564+
calculated_field_formulae,
537565
)
538566

567+
if self.config.use_referenced_entities_as_upstreams:
568+
yield from self.get_upstream_workunit(datasetUrn, allFields)
569+
539570
yield self.get_subtypes_workunit(sObjectName, datasetUrn)
540571

541572
if self.config.platform_instance is not None:
@@ -549,6 +580,30 @@ def get_salesforce_object_workunits(
549580
):
550581
yield from self.get_profile_workunit(sObjectName, datasetUrn)
551582

583+
def get_upstream_workunit(
584+
self, datasetUrn: str, allFields: List[dict]
585+
) -> Iterable[MetadataWorkUnit]:
586+
upstreams: List[UpstreamClass] = []
587+
for field in allFields:
588+
if field["DataType"] == "reference" and field["ReferenceTo"]["referenceTo"]:
589+
for referenced_sObjectName in field["ReferenceTo"]["referenceTo"]:
590+
upstreams.append(
591+
UpstreamClass(
592+
dataset=builder.make_dataset_urn_with_platform_instance(
593+
self.platform,
594+
referenced_sObjectName,
595+
self.config.platform_instance,
596+
self.config.env,
597+
),
598+
type=DatasetLineageTypeClass.TRANSFORMED,
599+
)
600+
)
601+
602+
if upstreams:
603+
yield MetadataChangeProposalWrapper(
604+
entityUrn=datasetUrn, aspect=UpstreamLineageClass(upstreams=upstreams)
605+
).as_workunit()
606+
552607
def get_domain_workunit(
553608
self, dataset_name: str, datasetUrn: str
554609
) -> Iterable[MetadataWorkUnit]:
@@ -806,37 +861,39 @@ def get_audit_stamp(self, date: str, username: str) -> AuditStampClass:
806861
actor=builder.make_user_urn(username),
807862
)
808863

809-
def get_field_formulae(self, describe_object_result: dict) -> Dict[str, str]:
864+
def get_calculated_field_formulae(self, sObjectName: str) -> Dict[str, str]:
810865
# extract field wise formula and return response
811-
calculated_fields = {}
812-
for field in describe_object_result["fields"]:
813-
if field["calculatedFormula"]:
814-
calculated_fields[field["name"]] = field["calculatedFormula"]
866+
# Includes entries for calculated fields only
815867

868+
calculated_fields = {}
869+
try:
870+
describe_object_result = self.sf_api.describe_object(sObjectName)
871+
for field in describe_object_result["fields"]:
872+
if field["calculatedFormula"]:
873+
calculated_fields[field["name"]] = field["calculatedFormula"]
874+
except Exception as e:
875+
self.report.warning(
876+
message="Failed to get calculated field formulae",
877+
context=sObjectName,
878+
exc=e,
879+
)
816880
return calculated_fields
817881

818882
def get_schema_metadata_workunit(
819883
self,
820884
sObjectName: str,
821-
sObject: dict,
885+
all_fields: List[dict],
886+
custom_fields: dict,
822887
customObject: dict,
823888
datasetUrn: str,
889+
calculated_field_formulae: Dict[str, str],
824890
) -> Iterable[MetadataWorkUnit]:
825-
all_fields = self.sf_api.get_fields_for_object(sObjectName, sObject)
826-
827-
customFields = self.sf_api.get_custom_fields_for_object(
828-
sObjectName, sObject["DurableId"]
829-
)
830-
831-
describe_object_result = self.sf_api.describe_object(sObjectName)
832-
calculated_fields = self.get_field_formulae(describe_object_result)
833-
834891
fields: List[SchemaFieldClass] = []
835892
primaryKeys: List[str] = []
836893
foreignKeys: List[ForeignKeyConstraintClass] = []
837894

838895
for field in all_fields:
839-
customField = customFields.get(field["DeveloperName"], {})
896+
customField = custom_fields.get(field["DeveloperName"], {})
840897

841898
fieldName = field["QualifiedApiName"]
842899
fieldType = field["DataType"]
@@ -851,7 +908,7 @@ def get_schema_metadata_workunit(
851908
fieldType,
852909
field,
853910
customField,
854-
calculated_fields.get(fieldName),
911+
calculated_field_formulae.get(fieldName),
855912
)
856913
fields.append(schemaField)
857914

0 commit comments

Comments
 (0)