Skip to content

Commit b091e46

Browse files
authored
feat(ingest/kafka): Flag for optional schemas ingestion (#12077)
1 parent e6cc676 commit b091e46

File tree

7 files changed

+620
-18
lines changed

7 files changed

+620
-18
lines changed

docs/how/updating-datahub.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,15 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
3131
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
3232
```
3333

34-
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
34+
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatibility, However, we recommend enabling this flag after performing the necessary cleanup.
3535
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
3636
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
3737

3838
- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
3939
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
4040
- #12056: The DataHub Airflow plugin no longer supports Airflow 2.1 and Airflow 2.2.
4141
- #12056: The DataHub Airflow plugin now defaults to the v2 plugin implementation.
42+
- #12077: `Kafka` source no longer ingests schemas from schema registry as separate entities by default, set `ingest_schemas_as_entities` to `true` to ingest them
4243
- OpenAPI Update: PIT Keep Alive parameter added to scroll. NOTE: This parameter requires the `pointInTimeCreationEnabled` feature flag to be enabled and the `elasticSearch.implementation` configuration to be `elasticsearch`. This feature is not supported for OpenSearch at this time and the parameter will not be respected without both of these set.
4344
- OpenAPI Update 2: Previously there was an incorrectly marked parameter named `sort` on the generic list entities endpoint for v3. This parameter is deprecated and only supports a single string value while the documentation indicates it supports a list of strings. This documentation error has been fixed and the correct field, `sortCriteria`, is now documented which supports a list of strings.
4445

metadata-ingestion/src/datahub/ingestion/source/kafka/kafka.py

+18-11
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,10 @@ class KafkaSourceConfig(
141141
default=False,
142142
description="Disables the utilization of the TopicRecordNameStrategy for Schema Registry subjects. For more information, visit: https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html#handling-differences-between-preregistered-and-client-derived-schemas:~:text=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
143143
)
144+
ingest_schemas_as_entities: bool = pydantic.Field(
145+
default=False,
146+
description="Enables ingesting schemas from schema registry as separate entities, in addition to the topics",
147+
)
144148

145149

146150
def get_kafka_consumer(
@@ -343,17 +347,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
343347
else:
344348
self.report.report_dropped(topic)
345349

346-
# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
347-
for subject in self.schema_registry_client.get_subjects():
348-
try:
349-
yield from self._extract_record(
350-
subject, True, topic_detail=None, extra_topic_config=None
351-
)
352-
except Exception as e:
353-
logger.warning(f"Failed to extract subject {subject}", exc_info=True)
354-
self.report.report_warning(
355-
"subject", f"Exception while extracting topic {subject}: {e}"
356-
)
350+
if self.source_config.ingest_schemas_as_entities:
351+
# Get all subjects from schema registry and ingest them as SCHEMA DatasetSubTypes
352+
for subject in self.schema_registry_client.get_subjects():
353+
try:
354+
yield from self._extract_record(
355+
subject, True, topic_detail=None, extra_topic_config=None
356+
)
357+
except Exception as e:
358+
logger.warning(
359+
f"Failed to extract subject {subject}", exc_info=True
360+
)
361+
self.report.report_warning(
362+
"subject", f"Exception while extracting topic {subject}: {e}"
363+
)
357364

358365
def _extract_record(
359366
self,

metadata-ingestion/tests/integration/kafka/kafka_to_file.yml

+1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ run_id: kafka-test
33
source:
44
type: kafka
55
config:
6+
ingest_schemas_as_entities: true
67
connection:
78
bootstrap: "localhost:29092"
89
schema_registry_url: "http://localhost:28081"

0 commit comments

Comments
 (0)