Skip to content

Commit 162aadd

Browse files
committed
Updating doc
Generate status aspect
1 parent 2316771 commit 162aadd

File tree

2 files changed

+28
-14
lines changed
  • metadata-integration/java

2 files changed

+28
-14
lines changed

metadata-integration/java/openlineage-converter/src/main/java/io/datahubproject/openlineage/dataset/DatahubJob.java

+11
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import com.linkedin.common.EdgeArray;
1010
import com.linkedin.common.GlobalTags;
1111
import com.linkedin.common.Ownership;
12+
import com.linkedin.common.Status;
1213
import com.linkedin.common.TagAssociation;
1314
import com.linkedin.common.UrnArray;
1415
import com.linkedin.common.urn.DataFlowUrn;
@@ -110,6 +111,8 @@ public List<MetadataChangeProposal> toMcps(DatahubOpenlineageConfig config) thro
110111
// Generate and add DataFlow Aspect
111112
log.info("Generating MCPs for job: {}", jobUrn);
112113
addAspectToMcps(flowUrn, DATA_FLOW_ENTITY_TYPE, dataFlowInfo, mcps);
114+
generateStatus(flowUrn, DATA_FLOW_ENTITY_TYPE, mcps);
115+
113116

114117
// Generate and add PlatformInstance Aspect
115118
if (flowPlatformInstance != null) {
@@ -132,6 +135,7 @@ public List<MetadataChangeProposal> toMcps(DatahubOpenlineageConfig config) thro
132135
log.info("Setting custom properties for job: {}", jobUrn);
133136
jobInfo.setCustomProperties(customProperties);
134137
addAspectToMcps(jobUrn, DATAJOB_ENTITY_TYPE, jobInfo, mcps);
138+
generateStatus(jobUrn, DATAJOB_ENTITY_TYPE, mcps);
135139

136140
// Generate and add tags Aspect
137141
generateFlowGlobalTagsAspect(flowUrn, flowGlobalTags, config, mcps);
@@ -225,6 +229,7 @@ private Pair<UrnArray, EdgeArray> processDownstreams(DatahubOpenlineageConfig co
225229
if (config.isMaterializeDataset()) {
226230
try {
227231
mcps.add(eventFormatter.convert(materializeDataset(dataset.getUrn())));
232+
generateStatus(dataset.getUrn(), DATASET_ENTITY_TYPE, mcps);
228233
} catch (IOException e) {
229234
throw new RuntimeException(e);
230235
}
@@ -286,6 +291,7 @@ private Pair<UrnArray, EdgeArray> processUpstreams(DatahubOpenlineageConfig conf
286291
if (config.isMaterializeDataset()) {
287292
try {
288293
mcps.add(eventFormatter.convert(materializeDataset(dataset.getUrn())));
294+
generateStatus(dataset.getUrn(), DATASET_ENTITY_TYPE, mcps);
289295
} catch (IOException e) {
290296
throw new RuntimeException(e);
291297
}
@@ -330,6 +336,11 @@ private void generateFlowGlobalTagsAspect(Urn flowUrn, GlobalTags flowGlobalTags
330336
}
331337
}
332338

339+
private void generateStatus(Urn entityUrn, String entityType, List<MetadataChangeProposal> mcps) {
340+
Status statusInfo = new Status().setRemoved(false);
341+
addAspectToMcps(entityUrn, entityType, statusInfo, mcps);
342+
}
343+
333344
private void addAspectToMcps(Urn entityUrn, String entityType, DataTemplate aspect, List<MetadataChangeProposal> mcps) {
334345
MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.create(
335346
b -> b.entityType(entityType).entityUrn(entityUrn).upsert().aspect(aspect));

metadata-integration/java/spark-lineage-beta/README.md

+17-14
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Databricks, refer [Configuration Instructions for Databricks](#configuration-ins
1515
Versioning of the jar artifact will follow the semantic versioning of the
1616
main [DataHub repo](https://github.com/datahub-project/datahub) and release notes will be
1717
available [here](https://github.com/datahub-project/datahub/releases).
18-
Always check [the Maven central repository](https://search.maven.org/search?q=a:datahub-spark-lineage) for the latest
18+
Always check [the Maven central repository](https://search.maven.org/search?q=a:acryl-spark-lineage) for the latest
1919
released version.
2020

2121
### Configuration Instructions: spark-submit
@@ -24,15 +24,15 @@ When running jobs using spark-submit, the agent needs to be configured in the co
2424

2525
```text
2626
#Configuring DataHub spark agent jar
27-
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.0
27+
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.1
2828
spark.extraListeners datahub.spark.DatahubSparkListener
2929
spark.datahub.rest.server http://localhost:8080
3030
```
3131

3232
## spark-submit command line
3333

3434
```sh
35-
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.0 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
35+
spark-submit --packages io.acryl:acryl-spark-lineage:0.2.1 --conf "spark.extraListeners=datahub.spark.DatahubSparkListener" my_spark_job_to_run.py
3636
```
3737

3838
### Configuration Instructions: Amazon EMR
@@ -41,7 +41,7 @@ Set the following spark-defaults configuration properties as it
4141
stated [here](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-configure.html)
4242

4343
```text
44-
spark.jars.packages io.acryl:acryl-spark-lineage:0.0.2
44+
spark.jars.packages io.acryl:acryl-spark-lineage:0.2.1
4545
spark.extraListeners datahub.spark.DatahubSparkListener
4646
spark.datahub.rest.server https://your_datahub_host/gms
4747
#If you have authentication set up then you also need to specify the Datahub access token
@@ -79,7 +79,7 @@ appName("test-application")
7979
config("spark.master","spark://spark-master:7077")
8080
.
8181

82-
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.1.0")
82+
config("spark.jars.packages","io.acryl:acryl-spark-lineage:0.2.1")
8383
.
8484

8585
config("spark.extraListeners","datahub.spark.DatahubSparkListener")
@@ -104,7 +104,7 @@ and [Init script](https://docs.databricks.com/clusters/configure.html#init-scrip
104104
information like tokens.
105105

106106
- Download `datahub-spark-lineage` jar
107-
from [the Maven central repository](https://s01.oss.sonatype.org/content/groups/public/io/acryl/acryl-spark-lineage/0.0.1/).
107+
from [the Maven central repository](https://s01.oss.sonatype.org/content/groups/public/io/acryl/acryl-spark-lineage/).
108108
- Create `init.sh` with below content
109109

110110
```sh
@@ -178,7 +178,10 @@ information like tokens.
178178
| spark.datahub.flow_name | | | If it is set it will be used as the DataFlow name otherwise it uses spark app name as flow_name |
179179
| spark.datahub.partition_regexp_pattern | | | Strip partition part from the path if path end matches with the specified regexp. Example `year=.*/month=.*/day=.*` |
180180
| spark.datahub.tags | | | Comma separated list of tags to attach to the DataFlow |
181-
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricsk if you want coalesced run . |
181+
| spark.datahub.domains | | | Comma separated list of domain urns to attach to the DataFlow |
182+
| spark.datahub.stage_metadata_coalescing | | | Normally it coalesce and send metadata at the onApplicationEnd event which is never called on Databricsk. You should enable this on Databricks if you want coalesced run . |
183+
| spark.datahub.patch.enabled | | | Set this to true to send lineage as a patch, which appends rather than overwrites existing Dataset lineage edges. By default it is enabled.
184+
|
182185

183186
## What to Expect: The Metadata Model
184187

@@ -207,7 +210,7 @@ For Spark on Databricks, pipeline start time is the cluster start time.
207210

208211
### Spark versions supported
209212

210-
Supports Spark 3.x series and was tested with Spark 3.2.x and 3.3.x.
213+
Supports Spark 3.x series.
211214

212215
### Environments tested with
213216

@@ -219,12 +222,6 @@ This initial release has been tested with the following environments:
219222

220223
Testing with Databricks Standard and High-concurrency Cluster is not done yet.
221224

222-
### Spark commands not yet supported
223-
224-
- View related commands
225-
- Cache commands and implications on lineage
226-
- RDD jobs
227-
228225
### Configuring Hdfs based dataset URNs
229226

230227
Spark emits lineage between datasets. It has its own logic for generating urns. Python sources emit metadata of
@@ -336,5 +333,11 @@ log4j.logger.datahub.spark=DEBUG
336333
log4j.logger.datahub.client.rest=DEBUG
337334
```
338335
336+
## How to build
337+
Use Java 8 to build the project. The project uses Gradle as the build tool. To build the project, run the following command:
338+
339+
```shell
340+
./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:spark-lineage-beta:shadowJar
341+
```
339342
## Known limitations
340343

0 commit comments

Comments
 (0)