Skip to content

Commit 8a3a135

Browse files
committed
fix config
1 parent e88c46b commit 8a3a135

File tree

3 files changed

+25
-17
lines changed

3 files changed

+25
-17
lines changed

metadata-ingestion/src/datahub/emitter/rest_emitter.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
from collections import defaultdict
88
from dataclasses import dataclass
9+
from enum import auto
910
from json.decoder import JSONDecodeError
1011
from typing import (
1112
TYPE_CHECKING,
@@ -19,6 +20,7 @@
1920
Union,
2021
)
2122

23+
import pydantic
2224
import requests
2325
from deprecated import deprecated
2426
from requests.adapters import HTTPAdapter, Retry
@@ -29,6 +31,7 @@
2931
from datahub.cli.cli_utils import ensure_has_system_metadata, fixup_gms_url, get_or_else
3032
from datahub.cli.env_utils import get_boolean_env_variable
3133
from datahub.configuration.common import (
34+
ConfigEnum,
3235
ConfigModel,
3336
ConfigurationError,
3437
OperationalError,
@@ -80,6 +83,17 @@
8083
)
8184

8285

86+
class RestSinkEndpoint(ConfigEnum):
87+
RESTLI = auto()
88+
OPENAPI = auto()
89+
90+
91+
DEFAULT_REST_SINK_ENDPOINT = pydantic.parse_obj_as(
92+
RestSinkEndpoint,
93+
os.getenv("DATAHUB_REST_SINK_DEFAULT_ENDPOINT", RestSinkEndpoint.RESTLI),
94+
)
95+
96+
8397
class RequestsSessionConfig(ConfigModel):
8498
timeout: Union[float, Tuple[float, float], None] = _DEFAULT_TIMEOUT_SEC
8599

@@ -203,7 +217,7 @@ def __init__(
203217
self._session = requests.Session()
204218

205219
logger.debug(
206-
f"Using {'OpenAPI' if openapi_ingestion else 'Restli'} for ingestion."
220+
f"Using {'OpenAPI' if self._openapi_ingestion else 'Restli'} for ingestion."
207221
)
208222

209223
headers = {

metadata-ingestion/src/datahub/ingestion/graph/client.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@
3232
from datahub.emitter.aspect import TIMESERIES_ASPECT_MAP
3333
from datahub.emitter.mce_builder import DEFAULT_ENV, Aspect
3434
from datahub.emitter.mcp import MetadataChangeProposalWrapper
35-
from datahub.emitter.rest_emitter import DatahubRestEmitter
35+
from datahub.emitter.rest_emitter import (
36+
DEFAULT_REST_SINK_ENDPOINT,
37+
DatahubRestEmitter,
38+
RestSinkEndpoint,
39+
)
3640
from datahub.emitter.serialization_helper import post_json_transform
3741
from datahub.ingestion.graph.config import (
3842
DatahubClientConfig as DatahubClientConfig,
@@ -141,6 +145,7 @@ def __init__(self, config: DatahubClientConfig) -> None:
141145
ca_certificate_path=self.config.ca_certificate_path,
142146
client_certificate_path=self.config.client_certificate_path,
143147
disable_ssl_verification=self.config.disable_ssl_verification,
148+
openapi_ingestion=DEFAULT_REST_SINK_ENDPOINT == RestSinkEndpoint.OPENAPI,
144149
)
145150

146151
self.server_id = _MISSING_SERVER_ID

metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py

+4-15
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
from datahub.emitter.mcp_builder import mcps_from_mce
2121
from datahub.emitter.rest_emitter import (
2222
BATCH_INGEST_MAX_PAYLOAD_LENGTH,
23+
DEFAULT_REST_SINK_ENDPOINT,
2324
DataHubRestEmitter,
25+
RestSinkEndpoint,
2426
)
2527
from datahub.ingestion.api.common import RecordEnvelope, WorkUnit
2628
from datahub.ingestion.api.sink import (
@@ -49,11 +51,6 @@
4951
)
5052

5153

52-
class RestSinkEndpoint(ConfigEnum):
53-
RESTLI = auto()
54-
OPENAPI = auto()
55-
56-
5754
class RestSinkMode(ConfigEnum):
5855
SYNC = auto()
5956
ASYNC = auto()
@@ -69,15 +66,9 @@ class RestSinkMode(ConfigEnum):
6966
)
7067

7168

72-
_DEFAULT_REST_SINK_ENDPOINT = pydantic.parse_obj_as(
73-
RestSinkEndpoint,
74-
os.getenv("DATAHUB_REST_SINK_DEFAULT_ENDPOINT", RestSinkEndpoint.RESTLI),
75-
)
76-
77-
7869
class DatahubRestSinkConfig(DatahubClientConfig):
7970
mode: RestSinkMode = _DEFAULT_REST_SINK_MODE
80-
endpoint: RestSinkEndpoint = _DEFAULT_REST_SINK_ENDPOINT
71+
endpoint: RestSinkEndpoint = DEFAULT_REST_SINK_ENDPOINT
8172

8273
# These only apply in async modes.
8374
max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS
@@ -184,9 +175,7 @@ def _make_emitter(cls, config: DatahubRestSinkConfig) -> DataHubRestEmitter:
184175
ca_certificate_path=config.ca_certificate_path,
185176
client_certificate_path=config.client_certificate_path,
186177
disable_ssl_verification=config.disable_ssl_verification,
187-
openapi_ingestion=True
188-
if config.endpoint == RestSinkEndpoint.OPENAPI
189-
else False,
178+
openapi_ingestion=config.endpoint == RestSinkEndpoint.OPENAPI,
190179
)
191180

192181
@property

0 commit comments

Comments
 (0)