Skip to content

Commit ebd3a50

Browse files
feat(ingestion-tracing): implement ingestion with tracing api (#12714)
1 parent f9d71d6 commit ebd3a50

File tree

9 files changed

+1084
-18
lines changed

9 files changed

+1084
-18
lines changed

docs/how/updating-datahub.md

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
3636
### Known Issues
3737

3838
- #12601: Jetty 12 introduces a stricter handling of url encoding. We are currently applying a workaround to prevent a regression, while technically breaking the official specifications.
39+
- #12714: API Tracing requires at least one mutation of the aspect being updated using this version of DataHub.
3940

4041
### Potential Downtime
4142

metadata-ingestion/src/datahub/configuration/common.py

+8
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,14 @@ class IgnorableError(MetaError):
198198
"""An error that can be ignored."""
199199

200200

201+
class TraceTimeoutError(OperationalError):
202+
"""Failure to complete an API Trace within the timeout."""
203+
204+
205+
class TraceValidationError(OperationalError):
206+
"""Failure to complete the expected write operation."""
207+
208+
201209
@runtime_checkable
202210
class ExceptionWithProps(Protocol):
203211
def get_telemetry_props(self) -> Dict[str, Any]: ...
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
import json
2+
import logging
3+
from dataclasses import dataclass
4+
from typing import Dict, List, Optional, Sequence, Union
5+
6+
from requests import Response
7+
8+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
9+
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
10+
MetadataChangeProposal,
11+
)
12+
13+
logger = logging.getLogger(__name__)
14+
15+
16+
@dataclass
17+
class TraceData:
18+
trace_id: str
19+
data: Dict[str, List[str]]
20+
21+
def __post_init__(self) -> None:
22+
if not self.trace_id:
23+
raise ValueError("trace_id cannot be empty")
24+
if not isinstance(self.data, dict):
25+
raise TypeError("data must be a dictionary")
26+
27+
28+
def _extract_trace_id(
29+
response: Response, trace_header: str = "traceparent"
30+
) -> Optional[str]:
31+
"""
32+
Extract trace ID from response headers.
33+
Args:
34+
response: HTTP response object
35+
trace_header: Name of the trace header to use
36+
Returns:
37+
Trace ID if found and response is valid, None otherwise
38+
"""
39+
if not 200 <= response.status_code < 300:
40+
logger.debug(f"Invalid status code: {response.status_code}")
41+
return None
42+
43+
trace_id = response.headers.get(trace_header)
44+
if not trace_id:
45+
logger.debug(f"Missing trace header: {trace_header}")
46+
return None
47+
48+
return trace_id
49+
50+
51+
def extract_trace_data(
52+
response: Response,
53+
aspects_to_trace: Optional[List[str]] = None,
54+
trace_header: str = "traceparent",
55+
) -> Optional[TraceData]:
56+
"""
57+
Extract trace data from a response object.
58+
Args:
59+
response: HTTP response object
60+
aspects_to_trace: Optional list of aspect names to extract. If None, extracts all aspects.
61+
trace_header: Name of the trace header to use (default: "traceparent")
62+
Returns:
63+
TraceData object if successful, None otherwise
64+
Raises:
65+
JSONDecodeError: If response body cannot be decoded as JSON
66+
"""
67+
trace_id = _extract_trace_id(response, trace_header)
68+
if not trace_id:
69+
return None
70+
71+
try:
72+
json_data = response.json()
73+
if not isinstance(json_data, list):
74+
logger.debug("JSON data is not a list")
75+
return None
76+
77+
data: Dict[str, List[str]] = {}
78+
79+
for item in json_data:
80+
urn = item.get("urn")
81+
if not urn:
82+
logger.debug(f"Skipping item without URN: {item}")
83+
continue
84+
85+
if aspects_to_trace is None:
86+
aspect_names = [
87+
k for k, v in item.items() if k != "urn" and v is not None
88+
]
89+
else:
90+
aspect_names = [
91+
field for field in aspects_to_trace if item.get(field) is not None
92+
]
93+
94+
data[urn] = aspect_names
95+
96+
return TraceData(trace_id=trace_id, data=data)
97+
98+
except json.JSONDecodeError as e:
99+
logger.error(f"Failed to decode JSON response: {e}")
100+
return None
101+
102+
103+
def extract_trace_data_from_mcps(
104+
response: Response,
105+
mcps: Sequence[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
106+
aspects_to_trace: Optional[List[str]] = None,
107+
trace_header: str = "traceparent",
108+
) -> Optional[TraceData]:
109+
"""
110+
Extract trace data from a response object and populate data from provided MCPs.
111+
Args:
112+
response: HTTP response object used only for trace_id extraction
113+
mcps: List of MCP URN and aspect data
114+
aspects_to_trace: Optional list of aspect names to extract. If None, extracts all aspects.
115+
trace_header: Name of the trace header to use (default: "traceparent")
116+
Returns:
117+
TraceData object if successful, None otherwise
118+
"""
119+
trace_id = _extract_trace_id(response, trace_header)
120+
if not trace_id:
121+
return None
122+
123+
data: Dict[str, List[str]] = {}
124+
try:
125+
for mcp in mcps:
126+
entity_urn = getattr(mcp, "entityUrn", None)
127+
aspect_name = getattr(mcp, "aspectName", None)
128+
129+
if not entity_urn or not aspect_name:
130+
logger.debug(f"Skipping MCP with missing URN or aspect name: {mcp}")
131+
continue
132+
133+
if aspects_to_trace is not None and aspect_name not in aspects_to_trace:
134+
continue
135+
136+
if entity_urn not in data:
137+
data[entity_urn] = []
138+
139+
data[entity_urn].append(aspect_name)
140+
141+
return TraceData(trace_id=trace_id, data=data)
142+
143+
except AttributeError as e:
144+
logger.error(f"Error processing MCPs: {e}")
145+
return None

0 commit comments

Comments
 (0)