Skip to content

Commit b29bba0

Browse files
author
Karan Jindal
committed
Added ingestion support for Kafka(Consume and Publish) Processors and AMQP Consumer processor for Nifi along with test cases
1 parent 86f4b80 commit b29bba0

File tree

2 files changed

+379
-0
lines changed

2 files changed

+379
-0
lines changed

metadata-ingestion/src/datahub/ingestion/source/nifi.py

+53
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,11 @@ class NifiProcessorType:
313313
FetchSFTP = "org.apache.nifi.processors.standard.FetchSFTP"
314314
GetSFTP = "org.apache.nifi.processors.standard.GetSFTP"
315315
PutSFTP = "org.apache.nifi.processors.standard.PutSFTP"
316+
PutKafka_2_6 = "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_6"
317+
ConsumeKafka_2_6 = "org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_6"
318+
PutKafka_2_0 = "org.apache.nifi.processors.kafka.pubsub.PublishKafka_2_0"
319+
ConsumeKafka_2_0 = "org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0"
320+
ConsumeAMQP = "org.apache.nifi.amqp.processors.ConsumeAMQP"
316321

317322

318323
# To support new processor type,
@@ -330,6 +335,11 @@ class NifiProcessorProvenanceEventAnalyzer:
330335
NifiProcessorType.FetchSFTP: NifiEventType.FETCH,
331336
NifiProcessorType.GetSFTP: NifiEventType.RECEIVE,
332337
NifiProcessorType.PutSFTP: NifiEventType.SEND,
338+
NifiProcessorType.PutKafka_2_6: NifiEventType.SEND,
339+
NifiProcessorType.ConsumeKafka_2_6: NifiEventType.RECEIVE,
340+
NifiProcessorType.PutKafka_2_0: NifiEventType.SEND,
341+
NifiProcessorType.ConsumeKafka_2_0: NifiEventType.RECEIVE,
342+
NifiProcessorType.ConsumeAMQP: NifiEventType.RECEIVE,
333343
}
334344

335345
def __init__(self) -> None:
@@ -344,8 +354,51 @@ def __init__(self) -> None:
344354
NifiProcessorType.FetchSFTP: self.process_sftp_provenance_event,
345355
NifiProcessorType.GetSFTP: self.process_sftp_provenance_event,
346356
NifiProcessorType.PutSFTP: self.process_sftp_provenance_event,
357+
NifiProcessorType.PutKafka_2_6: self.process_putKafka_provenance_event,
358+
NifiProcessorType.ConsumeKafka_2_6: self.process_consumeKafka_provenance_event,
359+
NifiProcessorType.PutKafka_2_0: self.process_putKafka_provenance_event,
360+
NifiProcessorType.ConsumeKafka_2_0: self.process_consumeKafka_provenance_event,
361+
NifiProcessorType.ConsumeAMQP: self.process_consumeAmqp_provenance_event,
347362
}
348363

364+
def process_putKafka_provenance_event(self, event):
365+
transitUri = event.get("transitUri")
366+
topic = "dummy" if transitUri is None else transitUri.split("/")[-1]
367+
platform = "kafka"
368+
dataset_urn = builder.make_dataset_urn(platform, topic, self.env)
369+
return ExternalDataset(
370+
platform,
371+
topic,
372+
dict(topic=topic),
373+
dataset_urn,
374+
)
375+
376+
def process_consumeKafka_provenance_event(self, event):
377+
attributes = event.get("attributes", [])
378+
topic = get_attribute_value(attributes, "kafka.topic")
379+
platform = "kafka"
380+
if topic is None:
381+
topic = "dummy"
382+
dataset_urn = builder.make_dataset_urn(platform, topic, self.env)
383+
return ExternalDataset(
384+
platform,
385+
get_attribute_value(attributes, "kafka.topic"),
386+
dict(topic=topic),
387+
dataset_urn,
388+
)
389+
390+
def process_consumeAmqp_provenance_event(self, event):
391+
transitUri = event.get("transitUri")
392+
topic = "dummy" if transitUri is None else transitUri.split("/")[-1]
393+
platform = "rmq"
394+
dataset_urn = builder.make_dataset_urn(platform, topic, self.env)
395+
return ExternalDataset(
396+
platform,
397+
topic,
398+
dict(queue=topic),
399+
dataset_urn,
400+
)
401+
349402
def process_s3_provenance_event(self, event):
350403
logger.debug(f"Processing s3 provenance event: {event}")
351404
attributes = event.get("attributes", [])

0 commit comments

Comments
 (0)