Skip to content

Commit 2e32255

Browse files
authored
[DPE-5218] Enable compatibility with ZK restore feature (#137)
1 parent 703c003 commit 2e32255

File tree

2 files changed

+33
-1
lines changed

2 files changed

+33
-1
lines changed

src/charm.py

+20
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
from charms.prometheus_k8s.v0.prometheus_scrape import MetricsEndpointProvider
1313
from charms.rolling_ops.v0.rollingops import RollingOpsManager
1414
from ops import (
15+
ActiveStatus,
16+
CollectStatusEvent,
1517
EventBase,
1618
StatusBase,
1719
)
@@ -60,6 +62,7 @@ def __init__(self, *args):
6062
self.restart = RollingOpsManager(self, relation="restart", callback=self._restart_broker)
6163

6264
self.framework.observe(getattr(self.on, "config_changed"), self._on_roles_changed)
65+
self.framework.observe(self.on.collect_app_status, self._on_collect_status)
6366

6467
# peer-cluster events are shared between all roles, so necessary to init here to avoid instantiating multiple times
6568
self.peer_cluster = PeerClusterEventsHandler(self)
@@ -127,6 +130,23 @@ def _set_status(self, key: Status) -> None:
127130
getattr(logger, log_level.lower())(status.message)
128131
self.unit.status = status
129132

133+
def _on_collect_status(self, event: CollectStatusEvent):
134+
ready_to_start = self.state.ready_to_start.value.status
135+
event.add_status(ready_to_start)
136+
137+
if not isinstance(ready_to_start, ActiveStatus):
138+
return
139+
140+
if not self.state.runs_broker:
141+
# early return, the next checks only concern the broker
142+
return
143+
144+
if not self.broker.workload.active():
145+
event.add_status(Status.BROKER_NOT_RUNNING.value.status)
146+
147+
if not self.state.zookeeper.broker_active():
148+
event.add_status(Status.ZK_NOT_CONNECTED.value.status)
149+
130150

131151
if __name__ == "__main__":
132152
main(KafkaCharm)

src/events/zookeeper.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ def _on_zookeeper_created(self, _) -> None:
5454

5555
def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:
5656
"""Handler for `zookeeper_relation_created/joined/changed` events, ensuring internal users get created."""
57+
if not self.charm.state.zookeeper.endpoints:
58+
# Kafka keeps a meta.properties in every log.dir with a unique ClusterID
59+
# this ID is provided by ZK, and removing it on relation-changed allows
60+
# re-joining a ZK cluster undergoing backup restoration.
61+
self.charm.workload.exec(
62+
[
63+
"bash",
64+
"-c",
65+
f"""find {self.charm.workload.paths.data_path} -type f -name meta.properties -delete || true""",
66+
]
67+
)
68+
5769
if not self.charm.state.zookeeper.zookeeper_connected:
5870
self.charm._set_status(Status.ZK_NO_DATA)
5971
return
@@ -71,7 +83,7 @@ def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:
7183
event.defer()
7284
return
7385

74-
if not self.charm.state.cluster.internal_user_credentials and self.model.unit.is_leader():
86+
if self.model.unit.is_leader():
7587
# loading the minimum config needed to authenticate to zookeeper
7688
self.dependent.config_manager.set_zk_jaas_config()
7789
self.dependent.config_manager.set_server_properties()

0 commit comments

Comments
 (0)