Skip to content

Commit e4ac263

Browse files
authored
[DPE-5226] - refactor: make 'broker' the central relation (#148)
1 parent e786476 commit e4ac263

File tree

8 files changed

+92
-88
lines changed

8 files changed

+92
-88
lines changed

src/core/cluster.py

+38-37
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,14 @@ class PeerClusterOrchestratorData(ProviderData, RequirerData):
7979
"""Broker provider data model."""
8080

8181
SECRET_LABEL_MAP = SECRET_LABEL_MAP
82-
SECRET_FIELDS = BALANCER.requested_secrets
82+
SECRET_FIELDS = BROKER.requested_secrets
8383

8484

8585
class PeerClusterData(ProviderData, RequirerData):
8686
"""Broker provider data model."""
8787

8888
SECRET_LABEL_MAP = SECRET_LABEL_MAP
89-
SECRET_FIELDS = BROKER.requested_secrets
89+
SECRET_FIELDS = BALANCER.requested_secrets
9090

9191

9292
class ClusterState(Object):
@@ -126,42 +126,43 @@ def client_relations(self) -> set[Relation]:
126126
return set(self.model.relations[REL_NAME])
127127

128128
@property
129-
def peer_cluster_orchestrator_relations(self) -> set[Relation]:
130-
"""The `peer-cluster-orchestrator` relations that this charm is providing."""
131-
return set(self.model.relations[PEER_CLUSTER_ORCHESTRATOR_RELATION])
129+
def peer_cluster_orchestrator_relation(self) -> Relation | None:
130+
"""The `peer-cluster-orchestrator` relation that this charm is providing."""
131+
return self.model.get_relation(PEER_CLUSTER_ORCHESTRATOR_RELATION)
132132

133133
@property
134134
def peer_cluster_relation(self) -> Relation | None:
135135
"""The `peer-cluster` relation that this charm is requiring."""
136136
return self.model.get_relation(PEER_CLUSTER_RELATION)
137137

138138
@property
139-
def peer_clusters(self) -> set[PeerCluster]:
140-
"""The state for all related `peer-cluster` applications that this charm is providing for."""
141-
peer_clusters = set()
142-
balancer_kwargs: dict[str, Any] = {
143-
"balancer_username": self.cluster.balancer_username,
144-
"balancer_password": self.cluster.balancer_password,
145-
"balancer_uris": self.cluster.balancer_uris,
146-
}
147-
for relation in self.peer_cluster_orchestrator_relations:
148-
if not relation.app or not self.runs_balancer:
149-
continue
150-
151-
peer_clusters.add(
152-
PeerCluster(
153-
relation=relation,
154-
data_interface=PeerClusterOrchestratorData(self.model, relation.name),
155-
**balancer_kwargs,
156-
)
157-
)
139+
def peer_cluster_orchestrator(self) -> PeerCluster:
140+
"""The state for the related `peer-cluster-orchestrator` application that this charm is requiring from."""
141+
balancer_kwargs: dict[str, Any] = (
142+
{
143+
"balancer_username": self.cluster.balancer_username,
144+
"balancer_password": self.cluster.balancer_password,
145+
"balancer_uris": self.cluster.balancer_uris,
146+
}
147+
if self.runs_balancer
148+
else {}
149+
)
150+
return PeerCluster(
151+
relation=self.peer_cluster_relation,
152+
data_interface=PeerClusterData(self.model, PEER_CLUSTER_RELATION),
153+
**balancer_kwargs,
154+
)
158155

159-
return peer_clusters
156+
@property
157+
def peer_cluster(self) -> PeerCluster:
158+
"""The state for the related `peer-cluster` application that this charm is providing to."""
159+
return PeerCluster(
160+
relation=self.peer_cluster_orchestrator_relation,
161+
data_interface=PeerClusterOrchestratorData(
162+
self.model, PEER_CLUSTER_ORCHESTRATOR_RELATION
163+
),
164+
)
160165

161-
# FIXME: will need renaming once we use Kraft as the orchestrator
162-
# uses the 'already there' BALANCER username now
163-
# will need to create one independently with Basic HTTP auth + multiple broker apps
164-
# right now, multiple<->multiple is very brittle
165166
@property
166167
def balancer(self) -> PeerCluster:
167168
"""The state for the `peer-cluster-orchestrator` related balancer application."""
@@ -175,10 +176,12 @@ def balancer(self) -> PeerCluster:
175176
else {}
176177
)
177178

178-
if self.runs_broker: # must be requiring, initialise with necessary broker data
179+
if self.runs_broker: # must be providing, initialise with necessary broker data
179180
return PeerCluster(
180-
relation=self.peer_cluster_relation, # if same app, this will be None and OK
181-
data_interface=PeerClusterData(self.model, PEER_CLUSTER_RELATION),
181+
relation=self.peer_cluster_orchestrator_relation, # if same app, this will be None and OK
182+
data_interface=PeerClusterOrchestratorData(
183+
self.model, PEER_CLUSTER_ORCHESTRATOR_RELATION
184+
),
182185
broker_username=ADMIN_USER,
183186
broker_password=self.cluster.internal_user_credentials.get(ADMIN_USER, ""),
184187
broker_uris=self.bootstrap_server,
@@ -191,9 +194,7 @@ def balancer(self) -> PeerCluster:
191194
)
192195

193196
else: # must be roles=balancer only then, only load with necessary balancer data
194-
return list(self.peer_clusters)[
195-
0
196-
] # for broker - balancer relation, currently limited to 1
197+
return self.peer_cluster_orchestrator
197198

198199
@property
199200
def oauth_relation(self) -> Relation | None:
@@ -345,7 +346,7 @@ def default_auth(self) -> AuthMap:
345346
def enabled_auth(self) -> list[AuthMap]:
346347
"""The currently enabled auth.protocols and their auth.mechanisms, based on related applications."""
347348
enabled_auth = []
348-
if self.client_relations or self.runs_balancer or self.peer_cluster_relation:
349+
if self.client_relations or self.runs_balancer or self.peer_cluster_orchestrator_relation:
349350
enabled_auth.append(self.default_auth)
350351
if self.oauth_relation:
351352
enabled_auth.append(AuthMap(self.default_auth.protocol, "OAUTHBEARER"))
@@ -457,7 +458,7 @@ def _balancer_status(self) -> Status:
457458
if not self.runs_balancer or not self.unit_broker.unit.is_leader():
458459
return Status.ACTIVE
459460

460-
if not self.peer_cluster_orchestrator_relations and not self.runs_broker:
461+
if not self.peer_cluster_relation and not self.runs_broker:
461462
return Status.NO_PEER_CLUSTER_RELATION
462463

463464
if not self.balancer.broker_connected:

src/core/models.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -393,17 +393,17 @@ def mtls_enabled(self) -> bool:
393393
return self.relation_data.get("mtls", "disabled") == "enabled"
394394

395395
@property
396-
def balancer_username(self) -> bool:
396+
def balancer_username(self) -> str:
397397
"""Persisted balancer username."""
398398
return self.relation_data.get("balancer-username", "")
399399

400400
@property
401-
def balancer_password(self) -> bool:
401+
def balancer_password(self) -> str:
402402
"""Persisted balancer password."""
403403
return self.relation_data.get("balancer-password", "")
404404

405405
@property
406-
def balancer_uris(self) -> bool:
406+
def balancer_uris(self) -> str:
407407
"""Persisted balancer uris."""
408408
return self.relation_data.get("balancer-uris", "")
409409

src/events/balancer.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -107,16 +107,15 @@ def _on_start(self, event: StartEvent | PebbleReadyEvent) -> None:
107107
return
108108

109109
if not self.charm.state.cluster.balancer_password:
110-
external_cluster = next(iter(self.charm.state.peer_clusters), None)
111110
payload = {
112111
"balancer-username": BALANCER_WEBSERVER_USER,
113112
"balancer-password": self.charm.workload.generate_password(),
114113
"balancer-uris": f"{self.charm.state.unit_broker.host}:{BALANCER_WEBSERVER_PORT}",
115114
}
116115
# Update relation data intra & extra cluster (if it exists)
117116
self.charm.state.cluster.update(payload)
118-
if external_cluster:
119-
external_cluster.update(payload)
117+
if self.charm.state.peer_cluster_orchestrator:
118+
self.charm.state.peer_cluster_orchestrator.update(payload)
120119

121120
self.config_manager.set_cruise_control_properties()
122121
self.config_manager.set_broker_capacities()

src/events/peer_cluster.py

+26-26
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def __init__(self, charm: "KafkaCharm") -> None:
6161

6262
# ensures data updates, eventually
6363
self.framework.observe(
64-
getattr(self.charm.on, "update_status"), self._on_peer_cluster_changed
64+
getattr(self.charm.on, "update_status"), self._on_peer_cluster_orchestrator_changed
6565
)
6666

6767
def _on_secret_changed_event(self, _: SecretChangedEvent) -> None:
@@ -94,8 +94,8 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None:
9494
"""Generic handler for peer-cluster `relation-changed` events."""
9595
if (
9696
not self.charm.unit.is_leader()
97-
or not self.charm.state.runs_broker
98-
or "balancer" not in self.charm.state.balancer.roles
97+
or not self.charm.state.runs_balancer # only balancer need to handle this event
98+
or not self.charm.state.balancer.roles # ensures secrets have set-up before writing
9999
):
100100
return
101101

@@ -104,40 +104,40 @@ def _on_peer_cluster_changed(self, event: RelationChangedEvent) -> None:
104104
# will no-op if relation does not exist
105105
self.charm.state.balancer.update(
106106
{
107-
"roles": self.charm.state.roles,
108-
"broker-username": self.charm.state.balancer.broker_username,
109-
"broker-password": self.charm.state.balancer.broker_password,
110-
"broker-uris": self.charm.state.balancer.broker_uris,
111-
"racks": str(self.charm.state.balancer.racks),
112-
"broker-capacities": json.dumps(self.charm.state.balancer.broker_capacities),
113-
"zk-uris": self.charm.state.balancer.zk_uris,
114-
"zk-username": self.charm.state.balancer.zk_username,
115-
"zk-password": self.charm.state.balancer.zk_password,
107+
"balancer-username": self.charm.state.balancer.balancer_username,
108+
"balancer-password": self.charm.state.balancer.balancer_password,
109+
"balancer-uris": self.charm.state.balancer.balancer_uris,
116110
}
117111
)
118112

119113
self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event
120114

121115
def _on_peer_cluster_orchestrator_changed(self, event: RelationChangedEvent) -> None:
122116
"""Generic handler for peer-cluster-orchestrator `relation-changed` events."""
123-
if not self.charm.unit.is_leader() or not self.charm.state.runs_balancer:
117+
if (
118+
not self.charm.unit.is_leader()
119+
or not self.charm.state.runs_broker # only broker needs handle this event
120+
or "balancer"
121+
not in self.charm.state.balancer.roles # ensures secret have set-up before writing, and only writing to balancers
122+
):
124123
return
125124

126125
self._default_relation_changed(event)
127126

128-
for peer_cluster in self.charm.state.peer_clusters:
129-
if "broker" not in peer_cluster.roles:
130-
# TODO: maybe a log here?
131-
continue
132-
133-
# will no-op if relation does not exist
134-
peer_cluster.update(
135-
{
136-
"balancer-username": self.charm.state.balancer.balancer_username,
137-
"balancer-password": self.charm.state.balancer.balancer_password,
138-
"balancer-uris": self.charm.state.balancer.balancer_uris,
139-
}
140-
)
127+
# will no-op if relation does not exist
128+
self.charm.state.balancer.update(
129+
{
130+
"roles": self.charm.state.roles,
131+
"broker-username": self.charm.state.balancer.broker_username,
132+
"broker-password": self.charm.state.balancer.broker_password,
133+
"broker-uris": self.charm.state.balancer.broker_uris,
134+
"racks": str(self.charm.state.balancer.racks),
135+
"broker-capacities": json.dumps(self.charm.state.balancer.broker_capacities),
136+
"zk-uris": self.charm.state.balancer.zk_uris,
137+
"zk-username": self.charm.state.balancer.zk_username,
138+
"zk-password": self.charm.state.balancer.zk_password,
139+
}
140+
)
141141

142142
self.charm.on.config_changed.emit() # ensure both broker+balancer get a changed event
143143

src/literals.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ def __eq__(self, value: object, /) -> bool:
128128
value="broker",
129129
service="kafka",
130130
paths=PATHS["kafka"],
131-
relation=PEER_CLUSTER_RELATION,
131+
relation=PEER_CLUSTER_ORCHESTRATOR_RELATION,
132132
requested_secrets=[
133133
"balancer-username",
134134
"balancer-password",
@@ -139,7 +139,7 @@ def __eq__(self, value: object, /) -> bool:
139139
value="balancer",
140140
service="balancer",
141141
paths=PATHS["cruise-control"],
142-
relation=PEER_CLUSTER_ORCHESTRATOR_RELATION,
142+
relation=PEER_CLUSTER_RELATION,
143143
requested_secrets=[
144144
"broker-username",
145145
"broker-password",

tests/integration/helpers.py

+8-8
Original file line numberDiff line numberDiff line change
@@ -600,18 +600,18 @@ def balancer_is_ready(ops_test: OpsTest, app_name: str) -> bool:
600600
pwd = get_secret_by_label(ops_test=ops_test, label=f"{PEER}.{app_name}.app", owner=app_name)[
601601
"balancer-password"
602602
]
603-
monitor_state = check_output(
604-
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {app_name}/leader sudo -i 'curl http://localhost:9090/kafkacruisecontrol/state?json=True'"
605-
f" -u {BALANCER_WEBSERVER_USER}:{pwd}",
606-
stderr=PIPE,
607-
shell=True,
608-
universal_newlines=True,
609-
)
610603

611604
try:
605+
monitor_state = check_output(
606+
f"JUJU_MODEL={ops_test.model_full_name} juju ssh {app_name}/leader sudo -i 'curl http://localhost:9090/kafkacruisecontrol/state?json=True'"
607+
f" -u {BALANCER_WEBSERVER_USER}:{pwd}",
608+
stderr=PIPE,
609+
shell=True,
610+
universal_newlines=True,
611+
)
612612
monitor_state_json = json.loads(monitor_state).get("MonitorState", {})
613613
executor_state_json = json.loads(monitor_state).get("ExecutorState", {})
614-
except json.JSONDecodeError as e:
614+
except (json.JSONDecodeError, CalledProcessError) as e:
615615
logger.error(e)
616616
return False
617617

tests/integration/test_balancer.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ async def test_relate_not_enough_brokers(self, ops_test: OpsTest):
114114
await ops_test.model.add_relation(PRODUCER_APP, APP_NAME)
115115
if self.balancer_app != APP_NAME:
116116
await ops_test.model.add_relation(
117-
f"{APP_NAME}:{PEER_CLUSTER_RELATION}",
118-
f"{BALANCER_APP}:{PEER_CLUSTER_ORCHESTRATOR_RELATION}",
117+
f"{APP_NAME}:{PEER_CLUSTER_ORCHESTRATOR_RELATION}",
118+
f"{BALANCER_APP}:{PEER_CLUSTER_RELATION}",
119119
)
120120

121121
await ops_test.model.wait_for_idle(

tests/integration/test_charm.py

+11-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import asyncio
66
import logging
7+
from subprocess import PIPE, check_output
78

89
import pytest
910
import requests
@@ -72,17 +73,20 @@ async def test_consistency_between_workload_and_metadata(ops_test: OpsTest):
7273

7374
@pytest.mark.abort_on_fail
7475
async def test_remove_zk_relation_relate(ops_test: OpsTest):
75-
remove_relation_cmd = f"remove-relation {APP_NAME} {ZK_NAME}"
76-
await ops_test.juju(*remove_relation_cmd.split(), check=True)
77-
await ops_test.model.wait_for_idle(
78-
apps=[APP_NAME, ZK_NAME], idle_period=60, timeout=3600, raise_on_error=False
76+
check_output(
77+
f"JUJU_MODEL={ops_test.model_full_name} juju remove-relation {APP_NAME} {ZK_NAME}",
78+
stderr=PIPE,
79+
shell=True,
80+
universal_newlines=True,
7981
)
8082

81-
assert ops_test.model.applications[APP_NAME].status == "blocked"
82-
assert ops_test.model.applications[ZK_NAME].status == "active"
83+
await ops_test.model.wait_for_idle(
84+
apps=[APP_NAME, ZK_NAME], idle_period=40, timeout=3600, raise_on_error=False
85+
)
8386

8487
await ops_test.model.add_relation(APP_NAME, ZK_NAME)
85-
async with ops_test.fast_forward(fast_interval="60s"):
88+
89+
async with ops_test.fast_forward(fast_interval="90s"):
8690
await ops_test.model.wait_for_idle(
8791
apps=[APP_NAME, ZK_NAME],
8892
status="active",

0 commit comments

Comments
 (0)