Skip to content

Commit d0b4f7a

Browse files
feat(cli): added cli option for ingestion source (#11980)
1 parent 6b8d21a commit d0b4f7a

File tree

3 files changed

+131
-2
lines changed

3 files changed

+131
-2
lines changed

docs/cli.md

+13
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,19 @@ datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml --dry-run
115115
datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n
116116
```
117117

118+
#### ingest --list-source-runs
119+
120+
The `--list-source-runs` option of the `ingest` command lists the previous runs, displaying their run ID, source name,
121+
start time, status, and source URN. This command allows you to filter results using the --urn option for URN-based
122+
filtering or the --source option to filter by source name (partial or complete matches are supported).
123+
124+
```shell
125+
# List all ingestion runs
126+
datahub ingest --list-source-runs
127+
# Filter runs by a source name containing "demo"
128+
datahub ingest --list-source-runs --source "demo"
129+
```
130+
118131
#### ingest --preview
119132

120133
The `--preview` option of the `ingest` command performs all of the ingestion steps, but limits the processing to only the first 10 workunits produced by the source.

docs/how/delete-metadata.md

+8-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
To follow this guide, you'll need the [DataHub CLI](../cli.md).
55
:::
66

7-
There are a two ways to delete metadata from DataHub:
7+
There are two ways to delete metadata from DataHub:
88

99
1. Delete metadata attached to entities by providing a specific urn or filters that identify a set of urns (delete CLI).
1010
2. Delete metadata created by a single ingestion run (rollback).
@@ -233,7 +233,13 @@ To view the ids of the most recent set of ingestion batches, execute
233233
datahub ingest list-runs
234234
```
235235

236-
That will print out a table of all the runs. Once you have an idea of which run you want to roll back, run
236+
That will print out a table of all the runs. To see run statuses or to filter runs by URN/source run
237+
238+
```shell
239+
datahub ingest list-source-runs
240+
```
241+
242+
Once you have an idea of which run you want to roll back, run
237243

238244
```shell
239245
datahub ingest show --run-id <run-id>

metadata-ingestion/src/datahub/cli/ingest_cli.py

+110
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
logger = logging.getLogger(__name__)
2929

30+
INGEST_SRC_TABLE_COLUMNS = ["runId", "source", "startTime", "status", "URN"]
3031
RUNS_TABLE_COLUMNS = ["runId", "rows", "created at"]
3132
RUN_TABLE_COLUMNS = ["urn", "aspect name", "created at"]
3233

@@ -437,6 +438,115 @@ def mcps(path: str) -> None:
437438
sys.exit(ret)
438439

439440

441+
@ingest.command()
442+
@click.argument("page_offset", type=int, default=0)
443+
@click.argument("page_size", type=int, default=100)
444+
@click.option("--urn", type=str, default=None, help="Filter by ingestion source URN.")
445+
@click.option(
446+
"--source", type=str, default=None, help="Filter by ingestion source name."
447+
)
448+
@upgrade.check_upgrade
449+
@telemetry.with_telemetry()
450+
def list_source_runs(page_offset: int, page_size: int, urn: str, source: str) -> None:
451+
"""List ingestion source runs with their details, optionally filtered by URN or source."""
452+
453+
query = """
454+
query listIngestionRuns($input: ListIngestionSourcesInput!) {
455+
listIngestionSources(input: $input) {
456+
ingestionSources {
457+
urn
458+
name
459+
executions {
460+
executionRequests {
461+
id
462+
result {
463+
startTimeMs
464+
status
465+
}
466+
}
467+
}
468+
}
469+
}
470+
}
471+
"""
472+
473+
# filter by urn and/or source using CONTAINS
474+
filters = []
475+
if urn:
476+
filters.append({"field": "urn", "values": [urn], "condition": "CONTAIN"})
477+
if source:
478+
filters.append({"field": "name", "values": [source], "condition": "CONTAIN"})
479+
480+
variables = {
481+
"input": {
482+
"start": page_offset,
483+
"count": page_size,
484+
"filters": filters,
485+
}
486+
}
487+
488+
client = get_default_graph()
489+
session = client._session
490+
gms_host = client.config.server
491+
492+
url = f"{gms_host}/api/graphql"
493+
try:
494+
response = session.post(url, json={"query": query, "variables": variables})
495+
response.raise_for_status()
496+
except Exception as e:
497+
click.echo(f"Error fetching data: {str(e)}")
498+
return
499+
500+
try:
501+
data = response.json()
502+
except ValueError:
503+
click.echo("Failed to parse JSON response from server.")
504+
return
505+
506+
if not data:
507+
click.echo("No response received from the server.")
508+
return
509+
510+
# when urn or source filter does not match, exit gracefully
511+
if (
512+
not isinstance(data.get("data"), dict)
513+
or "listIngestionSources" not in data["data"]
514+
):
515+
click.echo("No matching ingestion sources found. Please check your filters.")
516+
return
517+
518+
ingestion_sources = data["data"]["listIngestionSources"]["ingestionSources"]
519+
if not ingestion_sources:
520+
click.echo("No ingestion sources or executions found.")
521+
return
522+
523+
rows = []
524+
for ingestion_source in ingestion_sources:
525+
urn = ingestion_source.get("urn", "N/A")
526+
name = ingestion_source.get("name", "N/A")
527+
528+
executions = ingestion_source.get("executions", {}).get("executionRequests", [])
529+
for execution in executions:
530+
execution_id = execution.get("id", "N/A")
531+
start_time = execution.get("result", {}).get("startTimeMs", "N/A")
532+
start_time = (
533+
datetime.fromtimestamp(start_time / 1000).strftime("%Y-%m-%d %H:%M:%S")
534+
if start_time != "N/A"
535+
else "N/A"
536+
)
537+
status = execution.get("result", {}).get("status", "N/A")
538+
539+
rows.append([execution_id, name, start_time, status, urn])
540+
541+
click.echo(
542+
tabulate(
543+
rows,
544+
headers=INGEST_SRC_TABLE_COLUMNS,
545+
tablefmt="grid",
546+
)
547+
)
548+
549+
440550
@ingest.command()
441551
@click.argument("page_offset", type=int, default=0)
442552
@click.argument("page_size", type=int, default=100)

0 commit comments

Comments
 (0)