Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(taskworker): add routing override #87373

Merged
merged 3 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3118,3 +3118,11 @@
default=1,
flags=FLAG_PRIORITIZE_DISK | FLAG_AUTOMATOR_MODIFIABLE,
)

# Taskbroker flags

register(
"taskworker.route.overrides",
default={},
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
4 changes: 4 additions & 0 deletions src/sentry/taskworker/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.conf import settings

from sentry import options
from sentry.conf.types.kafka_definition import Topic


Expand All @@ -13,6 +14,9 @@ class DefaultRouter:
"""Simple router used for self-hosted and local development"""

def route_namespace(self, name: str) -> Topic:
overrides = options.get("taskworker.route.overrides")
if name in overrides:
return Topic(overrides[name])
if name in settings.TASKWORKER_ROUTES:
return Topic(settings.TASKWORKER_ROUTES[name])
return Topic.TASK_WORKER
9 changes: 9 additions & 0 deletions tests/sentry/taskworker/scheduler/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def run_storage() -> RunStorage:
return RunStorage(redis)


@pytest.mark.django_db
def test_schedulerunner_add_invalid(taskregistry) -> None:
run_storage = Mock(spec=RunStorage)
schedule_set = ScheduleRunner(registry=taskregistry, run_storage=run_storage)
Expand Down Expand Up @@ -66,6 +67,7 @@ def test_schedulerunner_add_invalid(taskregistry) -> None:
assert "microseconds" in str(err)


@pytest.mark.django_db
def test_schedulerunner_tick_no_tasks(taskregistry: TaskRegistry, run_storage: RunStorage) -> None:
schedule_set = ScheduleRunner(registry=taskregistry, run_storage=run_storage)

Expand All @@ -74,6 +76,7 @@ def test_schedulerunner_tick_no_tasks(taskregistry: TaskRegistry, run_storage: R
assert sleep_time == 60


@pytest.mark.django_db
def test_schedulerunner_tick_one_task_time_remaining(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand All @@ -99,6 +102,7 @@ def test_schedulerunner_tick_one_task_time_remaining(
assert last_run == datetime(2025, 1, 24, 14, 23, 0, tzinfo=UTC)


@pytest.mark.django_db
def test_schedulerunner_tick_one_task_spawned(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand Down Expand Up @@ -128,6 +132,7 @@ def test_schedulerunner_tick_one_task_spawned(
run_storage.set.assert_called_with("test:valid", datetime(2025, 1, 24, 14, 30, 0, tzinfo=UTC))


@pytest.mark.django_db
def test_schedulerunner_tick_key_exists_no_spawn(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand Down Expand Up @@ -160,6 +165,7 @@ def test_schedulerunner_tick_key_exists_no_spawn(
assert mock_send.call_count == 1


@pytest.mark.django_db
def test_schedulerunner_tick_one_task_multiple_ticks(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand All @@ -184,6 +190,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks(
assert sleep_time == 120


@pytest.mark.django_db
def test_schedulerunner_tick_one_task_multiple_ticks_crontab(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand Down Expand Up @@ -214,6 +221,7 @@ def test_schedulerunner_tick_one_task_multiple_ticks_crontab(
assert mock_send.call_count == 2


@pytest.mark.django_db
def test_schedulerunner_tick_multiple_tasks(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand Down Expand Up @@ -255,6 +263,7 @@ def test_schedulerunner_tick_multiple_tasks(
assert mock_send.call_count == 3


@pytest.mark.django_db
def test_schedulerunner_tick_fast_and_slow(
taskregistry: TaskRegistry, run_storage: RunStorage
) -> None:
Expand Down
4 changes: 4 additions & 0 deletions tests/sentry/taskworker/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def simple_task() -> None:
assert proto_message == activation.SerializeToString()


@pytest.mark.django_db
def test_registry_get() -> None:
registry = TaskRegistry()
ns = registry.create_namespace(name="tests")
Expand All @@ -226,6 +227,7 @@ def test_registry_get() -> None:
assert registry.contains("tests")


@pytest.mark.django_db
def test_registry_get_task() -> None:
registry = TaskRegistry()
ns = registry.create_namespace(name="tests")
Expand All @@ -244,6 +246,7 @@ def simple_task() -> None:
registry.get_task(ns.name, "nope")


@pytest.mark.django_db
def test_registry_create_namespace_simple() -> None:
registry = TaskRegistry()
ns = registry.create_namespace(name="tests")
Expand All @@ -264,6 +267,7 @@ def test_registry_create_namespace_simple() -> None:
assert ns.topic == Topic.TASK_WORKER


@pytest.mark.django_db
def test_registry_create_namespace_route_setting() -> None:
routes = {
"profiling": "profiles",
Expand Down
11 changes: 10 additions & 1 deletion tests/sentry/taskworker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
from unittest import mock

import grpc
import pytest
from sentry_protos.taskbroker.v1.taskbroker_pb2 import (
TASK_ACTIVATION_STATUS_COMPLETE,
TASK_ACTIVATION_STATUS_FAILURE,
TASK_ACTIVATION_STATUS_RETRY,
TaskActivation,
)

import sentry.taskworker.tasks.examples as example_tasks
from sentry.taskworker.worker import ProcessingResult, TaskWorker, child_worker
from sentry.testutils.cases import TestCase

Expand Down Expand Up @@ -56,8 +56,11 @@
)


@pytest.mark.django_db
class TestTaskWorker(TestCase):
def test_tasks_exist(self) -> None:
import sentry.taskworker.tasks.examples as example_tasks

assert example_tasks.simple_task
assert example_tasks.retry_task
assert example_tasks.at_most_once_task
Expand Down Expand Up @@ -180,6 +183,7 @@ def get_task_response(*args, **kwargs):
assert mock_client.update_task.call_count == 3


@pytest.mark.django_db
def test_child_worker_complete() -> None:
todo: queue.Queue[TaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand All @@ -194,6 +198,7 @@ def test_child_worker_complete() -> None:
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE


@pytest.mark.django_db
def test_child_worker_retry_task() -> None:
todo: queue.Queue[TaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand All @@ -208,6 +213,7 @@ def test_child_worker_retry_task() -> None:
assert result.status == TASK_ACTIVATION_STATUS_RETRY


@pytest.mark.django_db
def test_child_worker_failure_task() -> None:
todo: queue.Queue[TaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand All @@ -222,6 +228,7 @@ def test_child_worker_failure_task() -> None:
assert result.status == TASK_ACTIVATION_STATUS_FAILURE


@pytest.mark.django_db
def test_child_worker_shutdown() -> None:
todo: queue.Queue[TaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand All @@ -236,6 +243,7 @@ def test_child_worker_shutdown() -> None:
assert processed.qsize() == 0


@pytest.mark.django_db
def test_child_worker_unknown_task() -> None:
todo: queue.Queue[TaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand All @@ -254,6 +262,7 @@ def test_child_worker_unknown_task() -> None:
assert result.status == TASK_ACTIVATION_STATUS_COMPLETE


@pytest.mark.django_db
def test_child_worker_at_most_once() -> None:
todo: queue.Queue[TaskActivation] = queue.Queue()
processed: queue.Queue[ProcessingResult] = queue.Queue()
Expand Down
Loading