32
32
from datahub .ingestion .source .common .subtypes import DatasetSubTypes
33
33
from datahub .ingestion .source .state .stale_entity_removal_handler import (
34
34
StaleEntityRemovalHandler ,
35
- StaleEntityRemovalSourceReport ,
36
35
StatefulStaleMetadataRemovalConfig ,
37
36
)
38
37
from datahub .ingestion .source .state .stateful_ingestion_base import (
75
74
class Neo4jConfig (
76
75
StatefulIngestionConfigBase , EnvConfigMixin , PlatformInstanceConfigMixin
77
76
):
78
- platform : str = Field (default = "neo4j" , hidden_from_docs = True )
79
77
username : str = Field (description = "Neo4j Username" )
80
78
password : str = Field (description = "Neo4j Password" )
81
79
uri : str = Field (description = "The URI for the Neo4j server" )
@@ -92,7 +90,9 @@ class Neo4jSourceReport(StatefulIngestionReport):
92
90
93
91
@platform_name ("Neo4j" , id = "neo4j" )
94
92
@config_class (Neo4jConfig )
95
- @capability (SourceCapability .PLATFORM_INSTANCE , "Supported via the `platform_instance` config" )
93
+ @capability (
94
+ SourceCapability .PLATFORM_INSTANCE , "Supported via the `platform_instance` config"
95
+ )
96
96
@support_status (SupportStatus .CERTIFIED )
97
97
class Neo4jSource (StatefulIngestionSourceBase ):
98
98
NODE = "node"
@@ -104,24 +104,14 @@ def __init__(self, config: Neo4jConfig, ctx: PipelineContext):
104
104
super ().__init__ (config , ctx )
105
105
self .ctx = ctx
106
106
self .config = config
107
- self .platform = self .config .platform
108
- self .platform_instance = self .config .platform_instance
109
- self .env = self .config .env
107
+ self .platform = "neo4j"
110
108
self .report : Neo4jSourceReport = Neo4jSourceReport ()
111
109
112
110
@classmethod
113
111
def create (cls , config_dict : Dict , ctx : PipelineContext ) -> "Neo4jSource" :
114
112
config = Neo4jConfig .parse_obj (config_dict )
115
113
return cls (config , ctx )
116
114
117
- def get_workunit_processors (self ) -> List [Optional [MetadataWorkUnitProcessor ]]:
118
- return [
119
- * super ().get_workunit_processors (),
120
- StaleEntityRemovalHandler .create (
121
- self , self .config , self .ctx
122
- ).workunit_processor ,
123
- ]
124
-
125
115
def get_field_type (self , attribute_type : Union [type , str ]) -> SchemaFieldDataType :
126
116
type_class : type = _type_mapping .get (attribute_type , NullTypeClass )
127
117
return SchemaFieldDataType (type = type_class ())
@@ -159,8 +149,8 @@ def add_properties(
159
149
entityUrn = make_dataset_urn_with_platform_instance (
160
150
platform = self .platform ,
161
151
name = dataset ,
162
- platform_instance = self .platform_instance ,
163
- env = self .env ,
152
+ platform_instance = self .config . platform_instance ,
153
+ env = self .config . env ,
164
154
),
165
155
aspect = dataset_properties ,
166
156
).as_workunit ()
@@ -178,8 +168,8 @@ def generate_neo4j_object(
178
168
entityUrn = make_dataset_urn_with_platform_instance (
179
169
platform = self .platform ,
180
170
name = dataset ,
181
- platform_instance = self .platform_instance ,
182
- env = self .env ,
171
+ platform_instance = self .config . platform_instance ,
172
+ env = self .config . env ,
183
173
),
184
174
aspect = SchemaMetadataClass (
185
175
schemaName = dataset ,
@@ -359,8 +349,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
359
349
entityUrn = make_dataset_urn_with_platform_instance (
360
350
platform = self .platform ,
361
351
name = row ["key" ],
362
- platform_instance = self .platform_instance ,
363
- env = self .env ,
352
+ platform_instance = self .config . platform_instance ,
353
+ env = self .config . env ,
364
354
),
365
355
aspect = SubTypesClass (
366
356
typeNames = [
0 commit comments