Skip to content

Commit 4165faa

Browse files
committed
fix config
1 parent 097f84c commit 4165faa

File tree

3 files changed

+42
-27
lines changed

3 files changed

+42
-27
lines changed

metadata-ingestion/src/datahub/emitter/rest_emitter.py

+27-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from collections import defaultdict
99
from dataclasses import dataclass
1010
from datetime import datetime, timedelta
11+
from enum import auto
1112
from json.decoder import JSONDecodeError
1213
from typing import (
1314
TYPE_CHECKING,
@@ -21,6 +22,7 @@
2122
Union,
2223
)
2324

25+
import pydantic
2426
import requests
2527
from deprecated import deprecated
2628
from requests.adapters import HTTPAdapter, Retry
@@ -31,6 +33,7 @@
3133
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url, get_or_else
3234
from datahub.cli.env_utils import get_boolean_env_variable
3335
from datahub.configuration.common import (
36+
ConfigEnum,
3437
ConfigModel,
3538
ConfigurationError,
3639
OperationalError,
@@ -94,6 +97,28 @@
9497
)
9598

9699

100+
class RestTraceMode(ConfigEnum):
101+
ENABLED = auto()
102+
DISABLED = auto()
103+
104+
105+
class RestSinkEndpoint(ConfigEnum):
106+
RESTLI = auto()
107+
OPENAPI = auto()
108+
109+
110+
DEFAULT_REST_SINK_ENDPOINT = pydantic.parse_obj_as(
111+
RestSinkEndpoint,
112+
os.getenv("DATAHUB_REST_SINK_DEFAULT_ENDPOINT", RestSinkEndpoint.RESTLI),
113+
)
114+
115+
116+
DEFAULT_REST_TRACE_MODE = pydantic.parse_obj_as(
117+
RestTraceMode,
118+
os.getenv("DATAHUB_REST_TRACE_MODE", RestTraceMode.DISABLED),
119+
)
120+
121+
97122
class RequestsSessionConfig(ConfigModel):
98123
timeout: Union[float, Tuple[float, float], None] = _DEFAULT_TIMEOUT_SEC
99124

@@ -220,10 +245,10 @@ def __init__(
220245
self._session = requests.Session()
221246

222247
logger.debug(
223-
f"Using {'OpenAPI' if openapi_ingestion else 'Restli'} for ingestion."
248+
f"Using {'OpenAPI' if self._openapi_ingestion else 'Restli'} for ingestion."
224249
)
225250

226-
if default_trace_mode:
251+
if self._default_trace_mode:
227252
logger.debug("Using API Tracing for ingestion.")
228253

229254
headers = {

metadata-ingestion/src/datahub/ingestion/graph/client.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@
3232
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
3333
from datahub.emitter.mce_builder import DEFAULT_ENV, Aspect
3434
from datahub.emitter.mcp import MetadataChangeProposalWrapper
35-
from datahub.emitter.rest_emitter import DatahubRestEmitter
35+
from datahub.emitter.rest_emitter import (
36+
DEFAULT_REST_SINK_ENDPOINT,
37+
DEFAULT_REST_TRACE_MODE,
38+
DatahubRestEmitter,
39+
RestSinkEndpoint,
40+
RestTraceMode,
41+
)
3642
from datahub.emitter.serialization_helper import post_json_transform
3743
from datahub.ingestion.graph.config import (
3844
DatahubClientConfig as DatahubClientConfig,
@@ -141,6 +147,8 @@ def __init__(self, config: DatahubClientConfig) -> None:
141147
ca_certificate_path=self.config.ca_certificate_path,
142148
client_certificate_path=self.config.client_certificate_path,
143149
disable_ssl_verification=self.config.disable_ssl_verification,
150+
openapi_ingestion=DEFAULT_REST_SINK_ENDPOINT == RestSinkEndpoint.OPENAPI,
151+
default_trace_mode=DEFAULT_REST_TRACE_MODE == RestTraceMode.ENABLED,
144152
)
145153

146154
self.server_id = _MISSING_SERVER_ID

metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py

+6-24
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020
from datahub.emitter.mcp_builder import mcps_from_mce
2121
from datahub.emitter.rest_emitter import (
2222
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
23+
DEFAULT_REST_SINK_ENDPOINT,
24+
DEFAULT_REST_TRACE_MODE,
2325
DataHubRestEmitter,
26+
RestSinkEndpoint,
27+
RestTraceMode,
2428
)
2529
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
2630
from datahub.ingestion.api.sink import (
@@ -49,16 +53,6 @@
4953
)
5054

5155

52-
class RestTraceMode(ConfigEnum):
53-
ENABLED = auto()
54-
DISABLED = auto()
55-
56-
57-
class RestSinkEndpoint(ConfigEnum):
58-
RESTLI = auto()
59-
OPENAPI = auto()
60-
61-
6256
class RestSinkMode(ConfigEnum):
6357
SYNC = auto()
6458
ASYNC = auto()
@@ -74,22 +68,10 @@ class RestSinkMode(ConfigEnum):
7468
)
7569

7670

77-
_DEFAULT_REST_SINK_ENDPOINT = pydantic.parse_obj_as(
78-
RestSinkEndpoint,
79-
os.getenv("DATAHUB_REST_SINK_DEFAULT_ENDPOINT", RestSinkEndpoint.RESTLI),
80-
)
81-
82-
83-
_DEFAULT_REST_TRACE_MODE = pydantic.parse_obj_as(
84-
RestTraceMode,
85-
os.getenv("DATAHUB_REST_TRACE_MODE", RestTraceMode.DISABLED),
86-
)
87-
88-
8971
class DatahubRestSinkConfig(DatahubClientConfig):
9072
mode: RestSinkMode = _DEFAULT_REST_SINK_MODE
91-
endpoint: RestSinkEndpoint = _DEFAULT_REST_SINK_ENDPOINT
92-
default_trace_mode: RestTraceMode = _DEFAULT_REST_TRACE_MODE
73+
endpoint: RestSinkEndpoint = DEFAULT_REST_SINK_ENDPOINT
74+
default_trace_mode: RestTraceMode = DEFAULT_REST_TRACE_MODE
9375

9476
# These only apply in async modes.
9577
max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS

0 commit comments

Comments
 (0)