From 9c6c27912805cc602de60680f9312b18829e3f5a Mon Sep 17 00:00:00 2001 From: John Yang Date: Wed, 19 Mar 2025 11:47:59 -0700 Subject: [PATCH] feat(taskworker): add routing override --- src/sentry/options/defaults.py | 8 ++++++++ src/sentry/taskworker/router.py | 4 ++++ tests/sentry/taskworker/scheduler/test_runner.py | 9 +++++++++ tests/sentry/taskworker/test_registry.py | 4 ++++ tests/sentry/taskworker/test_worker.py | 11 ++++++++++- 5 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/sentry/options/defaults.py b/src/sentry/options/defaults.py index a3cbaf45af1ede..17344ffbdd5aea 100644 --- a/src/sentry/options/defaults.py +++ b/src/sentry/options/defaults.py @@ -3118,3 +3118,11 @@ default=1, flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE, ) + +# Taskbroker flags + +register( + "taskworker.route.overrides", + default={}, + flags=FLAG_AUTOMATOR_MODIFIABLE, +) diff --git a/src/sentry/taskworker/router.py b/src/sentry/taskworker/router.py index 71b8d400853aa1..83f657dc5ea17e 100644 --- a/src/sentry/taskworker/router.py +++ b/src/sentry/taskworker/router.py @@ -2,6 +2,7 @@ from django.conf import settings +from sentry import options from sentry.conf.types.kafka_definition import Topic @@ -13,6 +14,9 @@ class DefaultRouter: """Simple router used for self-hosted and local development""" def route_namespace(self, name: str) -> Topic: + overrides = options.get("taskworker.route.overrides") + if name in overrides: + return Topic(overrides[name]) if name in settings.TASKWORKER_ROUTES: return Topic(settings.TASKWORKER_ROUTES[name]) return Topic.TASK_WORKER diff --git a/tests/sentry/taskworker/scheduler/test_runner.py b/tests/sentry/taskworker/scheduler/test_runner.py index bcb455d6c0b6e1..1e86438c1f3c81 100644 --- a/tests/sentry/taskworker/scheduler/test_runner.py +++ b/tests/sentry/taskworker/scheduler/test_runner.py @@ -34,6 +34,7 @@ def run_storage() -> RunStorage: return RunStorage(redis) +@pytest.mark.django_db def test_schedulerunner_add_invalid(taskregistry) -> None: run_storage = Mock(spec=RunStorage) schedule_set = ScheduleRunner(registry=taskregistry, run_storage=run_storage) @@ -66,6 +67,7 @@ def test_schedulerunner_add_invalid(taskregistry) -> None: assert "microseconds" in str(err) +@pytest.mark.django_db def test_schedulerunner_tick_no_tasks(taskregistry: TaskRegistry, run_storage: RunStorage) -> None: schedule_set = ScheduleRunner(registry=taskregistry, run_storage=run_storage) @@ -74,6 +76,7 @@ def test_schedulerunner_tick_no_tasks(taskregistry: TaskRegistry, run_storage: R assert sleep_time == 60 +@pytest.mark.django_db def test_schedulerunner_tick_one_task_time_remaining( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: @@ -99,6 +102,7 @@ def test_schedulerunner_tick_one_task_time_remaining( assert last_run == datetime(2025, 1, 24, 14, 23, 0, tzinfo=UTC) +@pytest.mark.django_db def test_schedulerunner_tick_one_task_spawned( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: @@ -128,6 +132,7 @@ def test_schedulerunner_tick_one_task_spawned( run_storage.set.assert_called_with("test:valid", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC)) +@pytest.mark.django_db def test_schedulerunner_tick_key_exists_no_spawn( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: @@ -160,6 +165,7 @@ def test_schedulerunner_tick_key_exists_no_spawn( assert mock_send.call_count == 1 +@pytest.mark.django_db def test_schedulerunner_tick_one_task_multiple_ticks( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: @@ -184,6 +190,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks( assert sleep_time == 120 +@pytest.mark.django_db def test_schedulerunner_tick_one_task_multiple_ticks_crontab( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: @@ -214,6 +221,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks_crontab( assert mock_send.call_count == 2 +@pytest.mark.django_db def test_schedulerunner_tick_multiple_tasks( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: @@ -255,6 +263,7 @@ def test_schedulerunner_tick_multiple_tasks( assert mock_send.call_count == 3 +@pytest.mark.django_db def test_schedulerunner_tick_fast_and_slow( taskregistry: TaskRegistry, run_storage: RunStorage ) -> None: diff --git a/tests/sentry/taskworker/test_registry.py b/tests/sentry/taskworker/test_registry.py index 9b3d7d9df20d99..da5650cb2db1af 100644 --- a/tests/sentry/taskworker/test_registry.py +++ b/tests/sentry/taskworker/test_registry.py @@ -210,6 +210,7 @@ def simple_task() -> None: assert proto_message == activation.SerializeToString() +@pytest.mark.django_db def test_registry_get() -> None: registry = TaskRegistry() ns = registry.create_namespace(name="tests") @@ -226,6 +227,7 @@ def test_registry_get() -> None: assert registry.contains("tests") +@pytest.mark.django_db def test_registry_get_task() -> None: registry = TaskRegistry() ns = registry.create_namespace(name="tests") @@ -244,6 +246,7 @@ def simple_task() -> None: registry.get_task(ns.name, "nope") +@pytest.mark.django_db def test_registry_create_namespace_simple() -> None: registry = TaskRegistry() ns = registry.create_namespace(name="tests") @@ -264,6 +267,7 @@ def test_registry_create_namespace_simple() -> None: assert ns.topic == Topic.TASK_WORKER +@pytest.mark.django_db def test_registry_create_namespace_route_setting() -> None: routes = { "profiling": "profiles", diff --git a/tests/sentry/taskworker/test_worker.py b/tests/sentry/taskworker/test_worker.py index cbbf38f19d1348..cd1e62fac23154 100644 --- a/tests/sentry/taskworker/test_worker.py +++ b/tests/sentry/taskworker/test_worker.py @@ -4,6 +4,7 @@ from unittest import mock import grpc +import pytest from sentry_protos.taskbroker.v1.taskbroker_pb2 import ( TASK_ACTIVATION_STATUS_COMPLETE, TASK_ACTIVATION_STATUS_FAILURE, @@ -11,7 +12,6 @@ TaskActivation, ) -import sentry.taskworker.tasks.examples as example_tasks from sentry.taskworker.worker import ProcessingResult, TaskWorker, child_worker from sentry.testutils.cases import TestCase @@ -56,8 +56,11 @@ ) +@pytest.mark.django_db class TestTaskWorker(TestCase): def test_tasks_exist(self) -> None: + import sentry.taskworker.tasks.examples as example_tasks + assert example_tasks.simple_task assert example_tasks.retry_task assert example_tasks.at_most_once_task @@ -180,6 +183,7 @@ def get_task_response(*args, **kwargs): assert mock_client.update_task.call_count == 3 +@pytest.mark.django_db def test_child_worker_complete() -> None: todo: queue.Queue[TaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() @@ -194,6 +198,7 @@ def test_child_worker_complete() -> None: assert result.status == TASK_ACTIVATION_STATUS_COMPLETE +@pytest.mark.django_db def test_child_worker_retry_task() -> None: todo: queue.Queue[TaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() @@ -208,6 +213,7 @@ def test_child_worker_retry_task() -> None: assert result.status == TASK_ACTIVATION_STATUS_RETRY +@pytest.mark.django_db def test_child_worker_failure_task() -> None: todo: queue.Queue[TaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() @@ -222,6 +228,7 @@ def test_child_worker_failure_task() -> None: assert result.status == TASK_ACTIVATION_STATUS_FAILURE +@pytest.mark.django_db def test_child_worker_shutdown() -> None: todo: queue.Queue[TaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() @@ -236,6 +243,7 @@ def test_child_worker_shutdown() -> None: assert processed.qsize() == 0 +@pytest.mark.django_db def test_child_worker_unknown_task() -> None: todo: queue.Queue[TaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue() @@ -254,6 +262,7 @@ def test_child_worker_unknown_task() -> None: assert result.status == TASK_ACTIVATION_STATUS_COMPLETE +@pytest.mark.django_db def test_child_worker_at_most_once() -> None: todo: queue.Queue[TaskActivation] = queue.Queue() processed: queue.Queue[ProcessingResult] = queue.Queue()