Skip to content

Commit 55f121b

Browse files
authoredAug 13, 2024··
[DPE-4952] Add add/remove broker capability to rebalance action (#122)
1 parent 6c9b705 commit 55f121b

File tree

8 files changed

+274
-31
lines changed

8 files changed

+274
-31
lines changed
 

‎actions.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,8 @@ rebalance:
5050
description: Only generate the partition rebalance proposals and estimated result, without executing
5151
type: boolean
5252
default: true
53+
brokerid:
54+
description: Broker ID newly added to the cluster or to be removed. The broker ID is the unit number, e.g. kafka/0 is broker 0.
55+
type: integer
56+
minimum: 0
57+
required: [mode]

‎src/events/balancer.py

+14-3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
BALANCER_WEBSERVER_USER,
2121
CONTAINER,
2222
GROUP,
23+
MODE_ADD,
24+
MODE_REMOVE,
2325
USER,
2426
Status,
2527
)
@@ -200,16 +202,25 @@ def rebalance(self, event: ActionEvent) -> None:
200202
not self.balancer_manager.cruise_control.ready,
201203
"CruiseControl balancer service has not yet collected enough data to provide a partition reallocation proposal",
202204
),
205+
(
206+
event.params.get("brokerid", None) is None
207+
and event.params["mode"] in (MODE_ADD, MODE_REMOVE),
208+
"'add' and 'remove' rebalance action require passing the 'brokerid' parameter",
209+
),
210+
(
211+
event.params["mode"] in (MODE_ADD, MODE_REMOVE)
212+
and event.params.get("brokerid")
213+
not in [broker.unit_id for broker in self.charm.state.brokers],
214+
"invalid brokerid",
215+
),
203216
]
204217

205218
for check, msg in failure_conditions:
206219
if check:
207220
event.fail(msg)
208221
return
209222

210-
response, user_task_id = self.balancer_manager.rebalance(
211-
mode=event.params["mode"], dryrun=event.params["dryrun"]
212-
)
223+
response, user_task_id = self.balancer_manager.rebalance(**event.params)
213224
logger.debug(f"rebalance - {vars(response)=}")
214225

215226
if response.status_code != 200:

‎src/literals.py

+5
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,11 @@ def __eq__(self, value: object, /) -> bool:
173173
]
174174

175175

176+
MODE_FULL = "full"
177+
MODE_ADD = "add"
178+
MODE_REMOVE = "remove"
179+
180+
176181
@dataclass
177182
class StatusLevel:
178183
"""Status object helper."""

‎src/managers/balancer.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@
77
import json
88
import logging
99
import time
10-
from typing import TYPE_CHECKING, Any, Literal
10+
from typing import TYPE_CHECKING, Any
1111

1212
import requests
1313

1414
from core.models import JSON
15-
from literals import BALANCER, BALANCER_TOPICS, STORAGE
15+
from literals import BALANCER, BALANCER_TOPICS, MODE_FULL, STORAGE
1616

1717
if TYPE_CHECKING:
1818
from charm import KafkaCharm
@@ -22,8 +22,6 @@
2222

2323
logger = logging.getLogger(__name__)
2424

25-
RebalanceMode = Literal["full", "add", "remove"]
26-
2725

2826
class CruiseControlClient:
2927
"""Client wrapper for CruiseControl."""
@@ -60,10 +58,14 @@ def post(self, endpoint: str, dryrun: bool = False, **kwargs) -> requests.Respon
6058
dryrun: flag to decide whether to return only proposals (True), or execute (False)
6159
**kwargs: any REST API query parameters provided by that endpoint
6260
"""
61+
payload = {"dryrun": str(dryrun)}
62+
if brokerid := kwargs.get("brokerid", None) is not None:
63+
payload |= {"brokerid": brokerid}
64+
6365
r = requests.post(
6466
url=f"{self.address}/{endpoint}",
6567
auth=(self.username, self.password),
66-
params=kwargs | {"dryrun": str(dryrun)} | self.default_params,
68+
params=kwargs | payload | self.default_params,
6769
)
6870
logger.debug(f"POST {endpoint} - {vars(r)}")
6971

@@ -178,15 +180,18 @@ def create_internal_topics(self) -> None:
178180
)
179181
logger.info(f"Created topic {topic}")
180182

181-
def rebalance(self, mode: str, dryrun: bool = True) -> tuple[requests.Response, str]:
183+
def rebalance(
184+
self, mode: str, dryrun: bool = True, brokerid: int | None = None
185+
) -> tuple[requests.Response, str]:
182186
"""Triggers a full Kafka cluster partition rebalance.
183187
184188
Returns:
185189
Tuple of requests.Response and string of the CruiseControl User-Task-ID for the rebalance
186190
"""
191+
mode = f"{mode}_broker" if mode != MODE_FULL else mode
187192
rebalance_request = self.cruise_control.post(
188-
endpoint=mode, dryrun=dryrun
189-
) # FIXME: will need updating to include add/remove_broker when it works
193+
endpoint=mode, dryrun=dryrun, brokerid=brokerid
194+
)
190195

191196
return (rebalance_request, rebalance_request.headers.get("User-Task-ID", ""))
192197

‎tests/integration/test_balancer.py

+134-8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from .helpers import (
1515
APP_NAME,
16+
KAFKA_CONTAINER,
1617
ZK_NAME,
1718
balancer_exporter_is_up,
1819
balancer_is_ready,
@@ -38,27 +39,23 @@ async def balancer_app(ops_test: OpsTest, request):
3839
class TestBalancer:
3940
@pytest.mark.abort_on_fail
4041
async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm, balancer_app):
41-
await ops_test.model.add_machine(series="jammy")
42-
machine_ids = await ops_test.model.get_machines()
4342

4443
await asyncio.gather(
4544
ops_test.model.deploy(
4645
kafka_charm,
4746
application_name=APP_NAME,
4847
num_units=1,
49-
series="jammy",
50-
to=machine_ids[0],
5148
config={"roles": "broker,balancer" if balancer_app == APP_NAME else "broker"},
49+
resources={"kafka-image": KAFKA_CONTAINER},
5250
),
5351
ops_test.model.deploy(
54-
ZK_NAME, channel="edge", application_name=ZK_NAME, num_units=1, series="jammy"
52+
ZK_NAME, channel="3/edge", application_name=ZK_NAME, num_units=3, series="jammy"
5553
),
5654
ops_test.model.deploy(
5755
"kafka-test-app",
5856
application_name=PRODUCER_APP,
5957
channel="edge",
6058
num_units=1,
61-
series="jammy",
6259
config={
6360
"topic_name": "HOT-TOPIC",
6461
"num_messages": 100000,
@@ -74,12 +71,15 @@ async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm, balancer_a
7471
kafka_charm,
7572
application_name=balancer_app,
7673
num_units=1,
77-
series="jammy",
7874
config={"roles": balancer_app},
75+
resources={"kafka-image": KAFKA_CONTAINER},
7976
)
8077

8178
await ops_test.model.wait_for_idle(
82-
apps=list({APP_NAME, ZK_NAME, balancer_app}), idle_period=30, timeout=3600
79+
apps=list({APP_NAME, ZK_NAME, balancer_app}),
80+
idle_period=30,
81+
timeout=3600,
82+
raise_on_error=False,
8383
)
8484
assert ops_test.model.applications[APP_NAME].status == "blocked"
8585
assert ops_test.model.applications[ZK_NAME].status == "active"
@@ -237,6 +237,132 @@ async def test_remove_unit_full_rebalance(self, ops_test: OpsTest, balancer_app)
237237
# verify that post-rebalance, surviving units increased replica counts
238238
assert int(value) < int(post_rebalance_replica_counts.get(key, 0))
239239

240+
@pytest.mark.abort_on_fail
241+
async def test_add_unit_targeted_rebalance(self, ops_test: OpsTest, balancer_app):
242+
await ops_test.model.applications[APP_NAME].add_units(
243+
count=1 # up to 4, new unit won't have any partitions
244+
)
245+
await ops_test.model.block_until(
246+
lambda: len(ops_test.model.applications[APP_NAME].units) == 4
247+
)
248+
await ops_test.model.wait_for_idle(
249+
apps=list({APP_NAME, ZK_NAME, PRODUCER_APP, balancer_app}),
250+
status="active",
251+
timeout=1800,
252+
idle_period=30,
253+
)
254+
async with ops_test.fast_forward(fast_interval="20s"):
255+
await asyncio.sleep(120) # ensure update-status adds broker-capacities if missed
256+
257+
assert balancer_is_ready(ops_test=ops_test, app_name=balancer_app)
258+
259+
await asyncio.sleep(30) # let the API breathe after so many requests
260+
261+
# verify CC can find the new broker_id 3, with no replica partitions allocated
262+
broker_replica_count = get_replica_count_by_broker_id(ops_test, balancer_app)
263+
new_broker_id = max(map(int, broker_replica_count.keys()))
264+
pre_rebalance_replica_counts = {
265+
key: value for key, value in broker_replica_count.items() if key != str(new_broker_id)
266+
}
267+
new_broker_replica_count = int(broker_replica_count.get(str(new_broker_id), 0))
268+
269+
assert not new_broker_replica_count
270+
271+
for unit in ops_test.model.applications[balancer_app].units:
272+
if await unit.is_leader_from_status():
273+
leader_unit = unit
274+
275+
rebalance_action_dry_run = await leader_unit.run_action(
276+
"rebalance", mode="add", brokerid=new_broker_id, dryrun=True, timeout=600, block=True
277+
)
278+
response = await rebalance_action_dry_run.wait()
279+
assert response.results
280+
281+
rebalance_action = await leader_unit.run_action(
282+
"rebalance",
283+
mode="add",
284+
brokerid=new_broker_id,
285+
dryrun=False,
286+
timeout=600,
287+
block=True,
288+
)
289+
response = await rebalance_action.wait()
290+
assert response.results
291+
292+
post_rebalance_replica_counts = get_replica_count_by_broker_id(ops_test, balancer_app)
293+
294+
# Partition only were moved from existing brokers to the new one
295+
for existing_broker, previous_replica_count in pre_rebalance_replica_counts.items():
296+
assert previous_replica_count >= post_rebalance_replica_counts.get(
297+
str(existing_broker)
298+
)
299+
300+
# New broker has partition(s)
301+
assert int(
302+
get_replica_count_by_broker_id(ops_test, balancer_app).get(str(new_broker_id), 0)
303+
) # replicas were successfully moved
304+
305+
# Total sum of partition conserved
306+
assert sum(pre_rebalance_replica_counts.values()) == sum(
307+
post_rebalance_replica_counts.values()
308+
)
309+
310+
@pytest.mark.abort_on_fail
311+
async def test_balancer_prepare_unit_removal(self, ops_test: OpsTest, balancer_app):
312+
broker_replica_count = get_replica_count_by_broker_id(ops_test, balancer_app)
313+
new_broker_id = max(map(int, broker_replica_count.keys()))
314+
315+
# storing the current replica counts of 0, 1, 2 - they will persist
316+
pre_rebalance_replica_counts = {
317+
key: value
318+
for key, value in get_replica_count_by_broker_id(ops_test, balancer_app).items()
319+
if key != str(new_broker_id)
320+
}
321+
322+
for unit in ops_test.model.applications[balancer_app].units:
323+
if await unit.is_leader_from_status():
324+
leader_unit = unit
325+
326+
rebalance_action_dry_run = await leader_unit.run_action(
327+
"rebalance",
328+
mode="remove",
329+
brokerid=new_broker_id,
330+
dryrun=True,
331+
timeout=600,
332+
block=True,
333+
)
334+
response = await rebalance_action_dry_run.wait()
335+
assert response.results
336+
337+
rebalance_action = await leader_unit.run_action(
338+
"rebalance",
339+
mode="remove",
340+
brokerid=[new_broker_id],
341+
dryrun=False,
342+
timeout=600,
343+
block=True,
344+
)
345+
response = await rebalance_action.wait()
346+
assert response.results
347+
348+
post_rebalance_replica_counts = get_replica_count_by_broker_id(ops_test, balancer_app)
349+
350+
# Partition only were moved from the removed broker to the other ones
351+
for existing_broker, previous_replica_count in pre_rebalance_replica_counts.items():
352+
assert previous_replica_count <= post_rebalance_replica_counts.get(
353+
str(existing_broker)
354+
)
355+
356+
# Replicas were successfully moved
357+
assert not int(
358+
get_replica_count_by_broker_id(ops_test, balancer_app).get(str(new_broker_id), 0)
359+
)
360+
361+
# Total sum of partition conserved
362+
assert sum(pre_rebalance_replica_counts.values()) == sum(
363+
post_rebalance_replica_counts.values()
364+
)
365+
240366
@pytest.mark.abort_on_fail
241367
async def test_tls(self, ops_test: OpsTest, balancer_app):
242368
# deploy and integrate tls

‎tests/integration/test_provider.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,8 @@ async def test_connection_updated_on_tls_enabled(ops_test: OpsTest, app_charm: P
243243

244244
# deploying tls
245245
tls_config = {"ca-common-name": "kafka"}
246-
await ops_test.model.deploy(TLS_NAME, channel="edge", config=tls_config)
246+
# FIXME (certs): Unpin the revision once the charm is fixed
247+
await ops_test.model.deploy(TLS_NAME, channel="edge", config=tls_config, revision=163)
247248
await ops_test.model.wait_for_idle(
248249
apps=[TLS_NAME], idle_period=30, timeout=1800, status="active"
249250
)

‎tests/integration/test_tls.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ async def test_deploy_tls(ops_test: OpsTest, app_charm):
4141
tls_config = {"ca-common-name": "kafka"}
4242

4343
await asyncio.gather(
44-
ops_test.model.deploy(TLS_NAME, channel="edge", config=tls_config),
44+
# FIXME (certs): Unpin the revision once the charm is fixed
45+
ops_test.model.deploy(TLS_NAME, channel="edge", config=tls_config, revision=163),
4546
ops_test.model.deploy(ZK_NAME, channel="3/edge", num_units=3),
4647
ops_test.model.deploy(
4748
kafka_charm,

‎tests/unit/test_balancer.py

+99-10
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
from charm import KafkaCharm
1414
from literals import BALANCER_TOPICS, CHARM_KEY, CONTAINER, SUBSTRATE
15+
from managers.balancer import CruiseControlClient
1516

1617
logger = logging.getLogger(__name__)
1718

@@ -54,7 +55,7 @@ def harness():
5455
return harness
5556

5657

57-
def test_client_get_args(client):
58+
def test_client_get_args(client: CruiseControlClient):
5859
with patch("managers.balancer.requests.get") as patched_get:
5960
client.get("silmaril")
6061

@@ -67,7 +68,7 @@ def test_client_get_args(client):
6768
assert kwargs["auth"] == ("Beren", "Luthien")
6869

6970

70-
def test_client_post_args(client):
71+
def test_client_post_args(client: CruiseControlClient):
7172
with patch("managers.balancer.requests.post") as patched_post:
7273
client.post("silmaril")
7374

@@ -82,25 +83,25 @@ def test_client_post_args(client):
8283
assert kwargs["auth"] == ("Beren", "Luthien")
8384

8485

85-
def test_client_get_task_status(client, user_tasks):
86+
def test_client_get_task_status(client: CruiseControlClient, user_tasks: dict):
8687
with patch("managers.balancer.requests.get", return_value=MockResponse(user_tasks)):
8788
assert (
8889
client.get_task_status(user_task_id="e4256bcb-93f7-4290-ab11-804a665bf011")
8990
== "Completed"
9091
)
9192

9293

93-
def test_client_monitoring(client, state):
94+
def test_client_monitoring(client: CruiseControlClient, state: dict):
9495
with patch("managers.balancer.requests.get", return_value=MockResponse(state)):
9596
assert client.monitoring
9697

9798

98-
def test_client_executing(client, state):
99+
def test_client_executing(client: CruiseControlClient, state: dict):
99100
with patch("managers.balancer.requests.get", return_value=MockResponse(state)):
100101
assert not client.executing
101102

102103

103-
def test_client_ready(client, state):
104+
def test_client_ready(client: CruiseControlClient, state: dict):
104105
with patch("managers.balancer.requests.get", return_value=MockResponse(state)):
105106
assert client.ready
106107

@@ -111,7 +112,7 @@ def test_client_ready(client, state):
111112
assert not client.ready
112113

113114

114-
def test_balancer_manager_create_internal_topics(harness):
115+
def test_balancer_manager_create_internal_topics(harness: Harness[KafkaCharm]):
115116
with (
116117
patch("core.models.PeerCluster.broker_uris", new_callable=PropertyMock, return_value=""),
117118
patch(
@@ -146,8 +147,14 @@ def test_balancer_manager_create_internal_topics(harness):
146147
@pytest.mark.parametrize("executing", [True, False])
147148
@pytest.mark.parametrize("ready", [True, False])
148149
@pytest.mark.parametrize("status", [200, 404])
149-
def test_balancer_manager_rebalance(
150-
harness, proposal, leader, monitoring, executing, ready, status
150+
def test_balancer_manager_rebalance_full(
151+
harness: Harness[KafkaCharm],
152+
proposal: dict,
153+
leader: bool,
154+
monitoring: bool,
155+
executing: bool,
156+
ready: bool,
157+
status: int,
151158
):
152159
mock_event = MagicMock()
153160
mock_event.params = {"mode": "full", "dryrun": True}
@@ -189,7 +196,89 @@ def test_balancer_manager_rebalance(
189196
assert mock_event._mock_children.get("set_results") # event.set_results was called
190197

191198

192-
def test_balancer_manager_clean_results(harness, proposal):
199+
@pytest.mark.parametrize("mode", ["add", "remove"])
200+
@pytest.mark.parametrize("brokerid", [None, 0])
201+
def test_rebalance_add_remove_broker_id_length(
202+
harness: Harness[KafkaCharm], proposal: dict, mode: str, brokerid: int | None
203+
):
204+
mock_event = MagicMock()
205+
payload = {"mode": mode, "dryrun": True}
206+
payload = payload | {"brokerid": brokerid} if brokerid is not None else payload
207+
mock_event.params = payload
208+
209+
with (
210+
harness.hooks_disabled(),
211+
patch(
212+
"managers.balancer.CruiseControlClient.monitoring",
213+
new_callable=PropertyMock,
214+
return_value=True,
215+
),
216+
patch(
217+
"managers.balancer.CruiseControlClient.executing",
218+
new_callable=PropertyMock,
219+
return_value=not True,
220+
),
221+
patch(
222+
"managers.balancer.CruiseControlClient.ready",
223+
new_callable=PropertyMock,
224+
return_value=True,
225+
),
226+
patch(
227+
"managers.balancer.BalancerManager.rebalance",
228+
new_callable=None,
229+
return_value=(MockResponse(content=proposal, status_code=200), "foo"),
230+
),
231+
patch(
232+
"managers.balancer.BalancerManager.wait_for_task",
233+
new_callable=None,
234+
) as patched_wait_for_task,
235+
):
236+
harness.set_leader(True)
237+
238+
# When
239+
harness.charm.balancer.rebalance(mock_event)
240+
241+
# Then
242+
if brokerid is None:
243+
assert mock_event._mock_children.get("fail") # event.fail was called
244+
else:
245+
assert patched_wait_for_task.call_count
246+
assert mock_event._mock_children.get("set_results") # event.set_results was called
247+
248+
249+
def test_rebalance_broker_id_not_found(harness: Harness[KafkaCharm]):
250+
mock_event = MagicMock()
251+
payload = {"mode": "add", "dryrun": True, "brokerid": 999}
252+
mock_event.params = payload
253+
254+
with (
255+
harness.hooks_disabled(),
256+
patch(
257+
"managers.balancer.CruiseControlClient.monitoring",
258+
new_callable=PropertyMock,
259+
return_value=True,
260+
),
261+
patch(
262+
"managers.balancer.CruiseControlClient.executing",
263+
new_callable=PropertyMock,
264+
return_value=not True,
265+
),
266+
patch(
267+
"managers.balancer.CruiseControlClient.ready",
268+
new_callable=PropertyMock,
269+
return_value=True,
270+
),
271+
):
272+
harness.set_leader(True)
273+
274+
# When
275+
harness.charm.balancer.rebalance(mock_event)
276+
277+
# Then
278+
assert mock_event._mock_children.get("fail") # event.fail was called
279+
280+
281+
def test_balancer_manager_clean_results(harness: Harness[KafkaCharm], proposal: dict):
193282
cleaned_results = harness.charm.balancer.balancer_manager.clean_results(value=proposal)
194283

195284
def _check_cleaned_results(value) -> bool:

0 commit comments

Comments
 (0)
Please sign in to comment.