Skip to content

Commit 0a1cc49

Browse files
treff7essleeperdeep
authored andcommitted
feat(ingest/airflow): Add way to disable Airflow plugin without a restart (datahub-project#12098)
1 parent 3929b66 commit 0a1cc49

File tree

2 files changed

+54
-0
lines changed

2 files changed

+54
-0
lines changed

docs/lineage/airflow.md

+31
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,37 @@ TypeError: on_task_instance_success() missing 3 required positional arguments: '
339339

340340
The solution is to upgrade `acryl-datahub-airflow-plugin>=0.12.0.4` or upgrade `pluggy>=1.2.0`. See this [PR](https://github.com/datahub-project/datahub/pull/9365) for details.
341341

342+
### Disabling the DataHub Plugin v2
343+
344+
There are two ways to disable the DataHub Plugin v2:
345+
346+
#### 1. Disable via Configuration
347+
348+
Set the `datahub.enabled` configuration property to `False` in the `airflow.cfg` file and restart the Airflow environment to reload the configuration and disable the plugin.
349+
350+
```ini title="airflow.cfg"
351+
[datahub]
352+
enabled = False
353+
```
354+
355+
#### 2. Disable via Airflow Variable (Kill-Switch)
356+
357+
If a restart is not possible and you need a faster way to disable the plugin, you can use the kill-switch. Create and set the `datahub_airflow_plugin_disable_listener` Airflow variable to `true`. This ensures that the listener won't process anything.
358+
359+
#### Command Line
360+
361+
```shell
362+
airflow variables set datahub_airflow_plugin_disable_listener true
363+
```
364+
365+
#### Airflow UI
366+
367+
1. Go to Admin -> Variables.
368+
2. Click the "+" symbol to create a new variable.
369+
3. Set the key to `datahub_airflow_plugin_disable_listener` and the value to `true`.
370+
371+
This will immediately disable the plugin without requiring a restart.
372+
342373
## Compatibility
343374

344375
We no longer officially support Airflow <2.3. However, you can use older versions of `acryl-datahub-airflow-plugin` with older versions of Airflow.

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py

+23
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import airflow
1111
import datahub.emitter.mce_builder as builder
12+
from airflow.models import Variable
1213
from airflow.models.serialized_dag import SerializedDagModel
1314
from datahub.api.entities.datajob import DataJob
1415
from datahub.api.entities.dataprocess.dataprocess_instance import InstanceRunResult
@@ -78,6 +79,8 @@ def hookimpl(f: _F) -> _F: # type: ignore[misc] # noqa: F811
7879
)
7980
_DATAHUB_CLEANUP_DAG = "Datahub_Cleanup"
8081

82+
KILL_SWITCH_VARIABLE_NAME = "datahub_airflow_plugin_disable_listener"
83+
8184

8285
def get_airflow_plugin_listener() -> Optional["DataHubListener"]:
8386
# Using globals instead of functools.lru_cache to make testing easier.
@@ -364,6 +367,12 @@ def _extract_lineage(
364367
redact_with_exclusions(v)
365368
)
366369

370+
def check_kill_switch(self):
371+
if Variable.get(KILL_SWITCH_VARIABLE_NAME, "false").lower() == "true":
372+
logger.debug("DataHub listener disabled by kill switch")
373+
return True
374+
return False
375+
367376
@hookimpl
368377
@run_in_thread
369378
def on_task_instance_running(
@@ -372,6 +381,8 @@ def on_task_instance_running(
372381
task_instance: "TaskInstance",
373382
session: "Session", # This will always be QUEUED
374383
) -> None:
384+
if self.check_kill_switch():
385+
return
375386
self._set_log_level()
376387

377388
# This if statement mirrors the logic in https://github.com/OpenLineage/OpenLineage/pull/508.
@@ -454,6 +465,9 @@ def on_task_instance_running(
454465
f"DataHub listener finished processing notification about task instance start for {task_instance.task_id}"
455466
)
456467

468+
self.materialize_iolets(datajob)
469+
470+
def materialize_iolets(self, datajob: DataJob) -> None:
457471
if self.config.materialize_iolets:
458472
for outlet in datajob.outlets:
459473
reported_time: int = int(time.time() * 1000)
@@ -541,6 +555,9 @@ def on_task_instance_finish(
541555
def on_task_instance_success(
542556
self, previous_state: None, task_instance: "TaskInstance", session: "Session"
543557
) -> None:
558+
if self.check_kill_switch():
559+
return
560+
544561
self._set_log_level()
545562

546563
logger.debug(
@@ -556,6 +573,9 @@ def on_task_instance_success(
556573
def on_task_instance_failed(
557574
self, previous_state: None, task_instance: "TaskInstance", session: "Session"
558575
) -> None:
576+
if self.check_kill_switch():
577+
return
578+
559579
self._set_log_level()
560580

561581
logger.debug(
@@ -696,6 +716,9 @@ def on_dag_start(self, dag_run: "DagRun") -> None:
696716
@hookimpl
697717
@run_in_thread
698718
def on_dag_run_running(self, dag_run: "DagRun", msg: str) -> None:
719+
if self.check_kill_switch():
720+
return
721+
699722
self._set_log_level()
700723

701724
logger.debug(

0 commit comments

Comments
 (0)