Skip to content

Commit aad4bee

Browse files
committed
feat(ingest): allow max_workers=1 with ASYNC_BATCH rest sink
1 parent 84e50d8 commit aad4bee

File tree

3 files changed

+7
-6
lines changed

3 files changed

+7
-6
lines changed

metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ class DatahubRestSinkConfig(DatahubClientConfig):
6565
mode: RestSinkMode = _DEFAULT_REST_SINK_MODE
6666

6767
# These only apply in async modes.
68-
max_threads: int = _DEFAULT_REST_SINK_MAX_THREADS
69-
max_pending_requests: int = 2000
68+
max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS
69+
max_pending_requests: pydantic.PositiveInt = 2000
7070

7171
# Only applies in async batch mode.
72-
max_per_batch: int = 100
72+
max_per_batch: pydantic.PositiveInt = 100
7373

7474

7575
@dataclasses.dataclass

metadata-ingestion/src/datahub/utilities/partition_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ def __init__(
268268
self.process_batch = process_batch
269269
self.min_process_interval = min_process_interval
270270
self.read_from_pending_interval = read_from_pending_interval
271-
assert self.max_workers > 1
271+
assert self.max_workers >= 1
272272

273273
self._state_lock = threading.Lock()
274274
self._executor = ThreadPoolExecutor(

metadata-ingestion/tests/unit/utilities/test_partition_executor.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ def task(id: str) -> str:
8080
assert len(done_tasks) == 16
8181

8282

83-
def test_batch_partition_executor_sequential_key_execution():
83+
@pytest.mark.parametrize("max_workers", [1, 2, 10])
84+
def test_batch_partition_executor_sequential_key_execution(max_workers: int) -> None:
8485
executing_tasks = set()
8586
done_tasks = set()
8687
done_task_batches = set()
@@ -99,7 +100,7 @@ def process_batch(batch):
99100
done_task_batches.add(tuple(id for _, id in batch))
100101

101102
with BatchPartitionExecutor(
102-
max_workers=2,
103+
max_workers=max_workers,
103104
max_pending=10,
104105
max_per_batch=2,
105106
process_batch=process_batch,

0 commit comments

Comments
 (0)