Skip to content

Commit 6204cba

Browse files
dushayntAWshubhamjagtap639treff7es
authored
fix(ingestion/prefect-plugin): Prefect plugin (#10643)
Co-authored-by: shubhamjagtap639 <[email protected]> Co-authored-by: Tamas Nemeth <[email protected]>
1 parent 1a051b1 commit 6204cba

File tree

29 files changed

+2567
-4
lines changed

29 files changed

+2567
-4
lines changed

.github/workflows/build-and-test.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ jobs:
9191
-x :metadata-ingestion-modules:airflow-plugin:check \
9292
-x :metadata-ingestion-modules:dagster-plugin:build \
9393
-x :metadata-ingestion-modules:dagster-plugin:check \
94+
-x :metadata-ingestion-modules:prefect-plugin:build \
95+
-x :metadata-ingestion-modules:prefect-plugin:check \
9496
-x :metadata-ingestion-modules:gx-plugin:build \
9597
-x :metadata-ingestion-modules:gx-plugin:check \
9698
-x :datahub-frontend:build \
@@ -138,4 +140,4 @@ jobs:
138140
uses: actions/upload-artifact@v3
139141
with:
140142
name: Event File
141-
path: ${{ github.event_path }}
143+
path: ${{ github.event_path }}

.github/workflows/prefect-plugin.yml

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
name: Prefect Plugin
2+
on:
3+
push:
4+
branches:
5+
- master
6+
paths:
7+
- ".github/workflows/prefect-plugin.yml"
8+
- "metadata-ingestion-modules/prefect-plugin/**"
9+
- "metadata-ingestion/**"
10+
- "metadata-models/**"
11+
pull_request:
12+
branches:
13+
- "**"
14+
paths:
15+
- ".github/workflows/prefect-plugin.yml"
16+
- "metadata-ingestion-modules/prefect-plugin/**"
17+
- "metadata-ingestion/**"
18+
- "metadata-models/**"
19+
release:
20+
types: [published]
21+
22+
concurrency:
23+
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
24+
cancel-in-progress: true
25+
26+
jobs:
27+
prefect-plugin:
28+
runs-on: ubuntu-latest
29+
env:
30+
SPARK_VERSION: 3.0.3
31+
DATAHUB_TELEMETRY_ENABLED: false
32+
strategy:
33+
matrix:
34+
python-version: ["3.8", "3.9", "3.10"]
35+
include:
36+
- python-version: "3.8"
37+
- python-version: "3.9"
38+
- python-version: "3.10"
39+
fail-fast: false
40+
steps:
41+
- name: Set up JDK 17
42+
uses: actions/setup-java@v3
43+
with:
44+
distribution: "zulu"
45+
java-version: 17
46+
- uses: gradle/gradle-build-action@v2
47+
- uses: actions/checkout@v3
48+
- uses: actions/setup-python@v4
49+
with:
50+
python-version: ${{ matrix.python-version }}
51+
cache: "pip"
52+
- name: Install dependencies
53+
run: ./metadata-ingestion/scripts/install_deps.sh
54+
- name: Install prefect package
55+
run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick
56+
- name: pip freeze show list installed
57+
if: always()
58+
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze
59+
- uses: actions/upload-artifact@v3
60+
if: ${{ always() && matrix.python-version == '3.10'}}
61+
with:
62+
name: Test Results (Prefect Plugin ${{ matrix.python-version}})
63+
path: |
64+
**/build/reports/tests/test/**
65+
**/build/test-results/test/**
66+
**/junit.*.xml
67+
!**/binary/**
68+
- name: Upload coverage to Codecov
69+
if: always()
70+
uses: codecov/codecov-action@v3
71+
with:
72+
token: ${{ secrets.CODECOV_TOKEN }}
73+
directory: .
74+
fail_ci_if_error: false
75+
flags: prefect,prefect-${{ matrix.extra_pip_extras }}
76+
name: pytest-prefect-${{ matrix.python-version }}
77+
verbose: true
78+
79+
event-file:
80+
runs-on: ubuntu-latest
81+
steps:
82+
- name: Upload
83+
uses: actions/upload-artifact@v3
84+
with:
85+
name: Event File
86+
path: ${{ github.event_path }}

.github/workflows/test-results.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ name: Test Results
22

33
on:
44
workflow_run:
5-
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "GX Plugin"]
5+
workflows: ["build & test", "metadata ingestion", "Airflow Plugin", "Dagster Plugin", "Prefect Plugin", "GX Plugin"]
66
types:
77
- completed
88

docs-website/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ task yarnGenerate(type: YarnTask, dependsOn: [yarnInstall,
8686
':metadata-ingestion:buildWheel',
8787
':metadata-ingestion-modules:airflow-plugin:buildWheel',
8888
':metadata-ingestion-modules:dagster-plugin:buildWheel',
89+
':metadata-ingestion-modules:prefect-plugin:buildWheel',
8990
':metadata-ingestion-modules:gx-plugin:buildWheel',
9091
]) {
9192
inputs.files(projectMdFiles)

docs-website/filterTagIndexes.json

+12-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
"tags": {
8686
"Platform Type": "Orchestrator",
8787
"Connection Type": "Pull",
88-
"Features": "Stateful Ingestion, UI Ingestion, Status Aspect"
88+
"Features": "Status Aspect"
8989
}
9090
},
9191
{
@@ -429,6 +429,17 @@
429429
"Features": "Stateful Ingestion, Lower Casing, Status Aspect"
430430
}
431431
},
432+
{
433+
"Path": "docs/lineage/prefect",
434+
"imgPath": "img/logos/platforms/prefect.svg",
435+
"Title": "Prefect",
436+
"Description": "Prefect is a modern workflow orchestration for data and ML engineers.",
437+
"tags": {
438+
"Platform Type": "Orchestrator",
439+
"Connection Type": "Pull",
440+
"Features": "Status Aspect"
441+
}
442+
},
432443
{
433444
"Path": "docs/generated/ingestion/sources/presto",
434445
"imgPath": "img/logos/platforms/presto.svg",

docs-website/generateDocsDir.ts

+1
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,7 @@ function copy_python_wheels(): void {
573573
"../metadata-ingestion/dist",
574574
"../metadata-ingestion-modules/airflow-plugin/dist",
575575
"../metadata-ingestion-modules/dagster-plugin/dist",
576+
"../metadata-ingestion-modules/prefect-plugin/dist",
576577
"../metadata-ingestion-modules/gx-plugin/dist",
577578
];
578579

docs-website/sidebars.js

+6
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,11 @@ module.exports = {
444444
id: "docs/lineage/openlineage",
445445
label: "OpenLineage",
446446
},
447+
{
448+
type: "doc",
449+
id: "docs/lineage/prefect",
450+
label: "Prefect",
451+
},
447452
{
448453
type: "doc",
449454
id: "metadata-integration/java/acryl-spark-lineage/README",
@@ -917,6 +922,7 @@ module.exports = {
917922
// "metadata-integration/java/openlineage-converter/README"
918923
//"metadata-ingestion-modules/airflow-plugin/README"
919924
//"metadata-ingestion-modules/dagster-plugin/README"
925+
//"metadata-ingestion-modules/prefect-plugin/README"
920926
//"metadata-ingestion-modules/gx-plugin/README"
921927
// "metadata-ingestion/schedule_docs/datahub", // we can delete this
922928
// TODO: change the titles of these, removing the "What is..." portion from the sidebar"

docs-website/src/pages/_components/Logos/index.js

+2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const platformLogos = [
4040
name: "CouchBase",
4141
imageUrl: "/img/logos/platforms/couchbase.svg",
4242
},
43+
{ name: "Dagster", imageUrl: "/img/logos/platforms/dagster.png" },
4344
{ name: "Databricks", imageUrl: "/img/logos/platforms/databricks.png" },
4445
{ name: "DBT", imageUrl: "/img/logos/platforms/dbt.svg" },
4546
{ name: "Deltalake", imageUrl: "/img/logos/platforms/deltalake.svg" },
@@ -87,6 +88,7 @@ const platformLogos = [
8788
{ name: "Pinot", imageUrl: "/img/logos/platforms/pinot.svg" },
8889
{ name: "PostgreSQL", imageUrl: "/img/logos/platforms/postgres.svg" },
8990
{ name: "PowerBI", imageUrl: "/img/logos/platforms/powerbi.png" },
91+
{ name: "Prefect", imageUrl: "/img/logos/platforms/prefect.svg" },
9092
{ name: "Presto", imageUrl: "/img/logos/platforms/presto.svg" },
9193
{ name: "Protobuf", imageUrl: "/img/logos/platforms/protobuf.png" },
9294
{ name: "Pulsar", imageUrl: "/img/logos/platforms/pulsar.png" },
Loading

docs/lineage/prefect.md

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# Prefect Integration with DataHub
2+
3+
## Overview
4+
5+
DataHub supports integration with Prefect, allowing you to ingest:
6+
7+
- Prefect flow and task metadata
8+
- Flow run and Task run information
9+
- Lineage information (when available)
10+
11+
This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities.
12+
13+
## Prefect DataHub Block
14+
15+
### What is a Prefect DataHub Block?
16+
17+
Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The `prefect-datahub` block uses the [DataHub REST](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to send metadata events while running Prefect flows.
18+
19+
### Prerequisites
20+
21+
1. Use either Prefect Cloud (recommended) or a self-hosted Prefect server.
22+
2. For Prefect Cloud setup, refer to the [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) guide.
23+
3. For self-hosted Prefect server setup, refer to the [Host Prefect Server](https://docs.prefect.io/latest/guides/host/) guide.
24+
4. Ensure the Prefect API URL is set correctly. Verify using:
25+
26+
```shell
27+
prefect profile inspect
28+
```
29+
30+
5. API URL format:
31+
- Prefect Cloud: `https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>`
32+
- Self-hosted: `http://<host>:<port>/api`
33+
34+
## Setup Instructions
35+
36+
### 1. Installation
37+
38+
Install `prefect-datahub` using pip:
39+
40+
```shell
41+
pip install 'prefect-datahub'
42+
```
43+
44+
Note: Requires Python 3.7+
45+
46+
### 2. Saving Configurations to a Block
47+
48+
Save your configuration to the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks):
49+
50+
```python
51+
from prefect_datahub.datahub_emitter import DatahubEmitter
52+
53+
DatahubEmitter(
54+
datahub_rest_url="http://localhost:8080",
55+
env="PROD",
56+
platform_instance="local_prefect"
57+
).save("MY-DATAHUB-BLOCK")
58+
```
59+
60+
Configuration options:
61+
62+
| Config | Type | Default | Description |
63+
|--------|------|---------|-------------|
64+
| datahub_rest_url | `str` | `http://localhost:8080` | DataHub GMS REST URL |
65+
| env | `str` | `PROD` | Environment for assets (see [FabricType](https://datahubproject.io/docs/graphql/enums/#fabrictype)) |
66+
| platform_instance | `str` | `None` | Platform instance for assets (see [Platform Instances](https://datahubproject.io/docs/platform-instances/)) |
67+
68+
### 3. Using the Block in Prefect Workflows
69+
70+
Load and use the saved block in your Prefect workflows:
71+
72+
```python
73+
from prefect import flow, task
74+
from prefect_datahub.dataset import Dataset
75+
from prefect_datahub.datahub_emitter import DatahubEmitter
76+
77+
datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK")
78+
79+
@task(name="Transform", description="Transform the data")
80+
def transform(data):
81+
data = data.split(" ")
82+
datahub_emitter.add_task(
83+
inputs=[Dataset("snowflake", "mydb.schema.tableA")],
84+
outputs=[Dataset("snowflake", "mydb.schema.tableC")],
85+
)
86+
return data
87+
88+
@flow(name="ETL flow", description="Extract transform load flow")
89+
def etl():
90+
data = transform("This is data")
91+
datahub_emitter.emit_flow()
92+
```
93+
94+
**Note**: To emit tasks, you must call `emit_flow()`. Otherwise, no metadata will be emitted.
95+
96+
## Concept Mapping
97+
98+
| Prefect Concept | DataHub Concept |
99+
|-----------------|-----------------|
100+
| [Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/) |
101+
| [Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |
102+
| [Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) |
103+
| [Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) |
104+
| [Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/) |
105+
106+
## Validation and Troubleshooting
107+
108+
### Validating the Setup
109+
110+
1. Check the Prefect UI's Blocks menu for the DataHub emitter.
111+
2. Run a Prefect workflow and look for DataHub-related log messages:
112+
113+
```text
114+
Emitting flow to datahub...
115+
Emitting tasks to datahub...
116+
```
117+
118+
### Debugging Common Issues
119+
120+
#### Incorrect Prefect API URL
121+
122+
If the Prefect API URL is incorrect, set it manually:
123+
124+
```shell
125+
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api'
126+
```
127+
128+
#### DataHub Connection Error
129+
130+
If you encounter a `ConnectionError: HTTPConnectionPool(host='localhost', port=8080)`, ensure that your DataHub GMS service is running.
131+
132+
## Additional Resources
133+
134+
- [Prefect Documentation](https://docs.prefect.io/)
135+
- [DataHub Documentation](https://datahubproject.io/docs/)
136+
137+
For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities.

0 commit comments

Comments
 (0)