Skip to content

Commit 9247845

Browse files
authored
[DPE-6138] Update zookeeper client lib (#157)
1 parent 2262dee commit 9247845

File tree

6 files changed

+123
-52
lines changed

6 files changed

+123
-52
lines changed

lib/charms/zookeeper/v0/client.py

+65-20
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def update_cluster(new_members: List[str], event: EventBase) -> None:
7474

7575
# Increment this PATCH version before using `charmcraft publish-lib` or reset
7676
# to 0 if you are raising the major API version
77-
LIBPATCH = 6
77+
LIBPATCH = 8
7878

7979

8080
logger = logging.getLogger(__name__)
@@ -101,6 +101,12 @@ class QuorumLeaderNotFoundError(Exception):
101101
pass
102102

103103

104+
class NoUnitFoundError(Exception):
105+
"""Generic exception for when there are no running zk unit in the app."""
106+
107+
pass
108+
109+
104110
class ZooKeeperManager:
105111
"""Handler for performing ZK commands."""
106112

@@ -114,6 +120,7 @@ def __init__(
114120
keyfile_path: Optional[str] = "",
115121
keyfile_password: Optional[str] = "",
116122
certfile_path: Optional[str] = "",
123+
read_only: bool = True,
117124
):
118125
self.hosts = hosts
119126
self.username = username
@@ -123,12 +130,21 @@ def __init__(
123130
self.keyfile_path = keyfile_path
124131
self.keyfile_password = keyfile_password
125132
self.certfile_path = certfile_path
126-
self.leader = ""
133+
self.zk_host = ""
134+
self.read_only = read_only
127135

128-
try:
129-
self.leader = self.get_leader()
130-
except RetryError:
131-
raise QuorumLeaderNotFoundError("quorum leader not found")
136+
if not read_only:
137+
try:
138+
self.zk_host = self.get_leader()
139+
except RetryError:
140+
raise QuorumLeaderNotFoundError("quorum leader not found")
141+
142+
else:
143+
try:
144+
self.zk_host = self.get_any_unit()
145+
146+
except RetryError:
147+
raise NoUnitFoundError
132148

133149
@retry(
134150
wait=wait_fixed(3),
@@ -170,6 +186,35 @@ def get_leader(self) -> str:
170186

171187
return leader or ""
172188

189+
@retry(
190+
wait=wait_fixed(3),
191+
stop=stop_after_attempt(2),
192+
retry=retry_if_not_result(lambda result: True if result else False),
193+
)
194+
def get_any_unit(self) -> str:
195+
any_host = None
196+
for host in self.hosts:
197+
try:
198+
with ZooKeeperClient(
199+
host=host,
200+
client_port=self.client_port,
201+
username=self.username,
202+
password=self.password,
203+
use_ssl=self.use_ssl,
204+
keyfile_path=self.keyfile_path,
205+
keyfile_password=self.keyfile_password,
206+
certfile_path=self.certfile_path,
207+
) as zk:
208+
response = zk.srvr
209+
if response:
210+
any_host = host
211+
break
212+
except KazooTimeoutError: # in the case of having a dead unit in relation data
213+
logger.debug(f"TIMEOUT - {host}")
214+
continue
215+
216+
return any_host or ""
217+
173218
@property
174219
def server_members(self) -> Set[str]:
175220
"""The current members within the ZooKeeper quorum.
@@ -179,7 +224,7 @@ def server_members(self) -> Set[str]:
179224
e.g {"server.1=10.141.78.207:2888:3888:participant;0.0.0.0:2181"}
180225
"""
181226
with ZooKeeperClient(
182-
host=self.leader,
227+
host=self.zk_host,
183228
client_port=self.client_port,
184229
username=self.username,
185230
password=self.password,
@@ -200,7 +245,7 @@ def config_version(self) -> int:
200245
The zookeeper config version decoded from base16
201246
"""
202247
with ZooKeeperClient(
203-
host=self.leader,
248+
host=self.zk_host,
204249
client_port=self.client_port,
205250
username=self.username,
206251
password=self.password,
@@ -221,7 +266,7 @@ def members_syncing(self) -> bool:
221266
True if any members are syncing. Otherwise False.
222267
"""
223268
with ZooKeeperClient(
224-
host=self.leader,
269+
host=self.zk_host,
225270
client_port=self.client_port,
226271
username=self.username,
227272
password=self.password,
@@ -305,7 +350,7 @@ def add_members(self, members: Iterable[str]) -> None:
305350

306351
# specific connection to leader
307352
with ZooKeeperClient(
308-
host=self.leader,
353+
host=self.zk_host,
309354
client_port=self.client_port,
310355
username=self.username,
311356
password=self.password,
@@ -330,7 +375,7 @@ def remove_members(self, members: Iterable[str]) -> None:
330375
for member in members:
331376
member_id = re.findall(r"server.([0-9]+)", member)[0]
332377
with ZooKeeperClient(
333-
host=self.leader,
378+
host=self.zk_host,
334379
client_port=self.client_port,
335380
username=self.username,
336381
password=self.password,
@@ -356,7 +401,7 @@ def leader_znodes(self, path: str) -> Set[str]:
356401
Set of all nested child zNodes
357402
"""
358403
with ZooKeeperClient(
359-
host=self.leader,
404+
host=self.zk_host,
360405
client_port=self.client_port,
361406
username=self.username,
362407
password=self.password,
@@ -369,15 +414,15 @@ def leader_znodes(self, path: str) -> Set[str]:
369414

370415
return all_znode_children
371416

372-
def create_znode_leader(self, path: str, acls: List[ACL]) -> None:
417+
def create_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None:
373418
"""Creates a new zNode on the current quorum leader with given ACLs.
374419
375420
Args:
376421
path: the zNode path to set
377422
acls: the ACLs to be set on that path
378423
"""
379424
with ZooKeeperClient(
380-
host=self.leader,
425+
host=self.zk_host,
381426
client_port=self.client_port,
382427
username=self.username,
383428
password=self.password,
@@ -388,15 +433,15 @@ def create_znode_leader(self, path: str, acls: List[ACL]) -> None:
388433
) as zk:
389434
zk.create_znode(path=path, acls=acls)
390435

391-
def set_acls_znode_leader(self, path: str, acls: List[ACL]) -> None:
436+
def set_acls_znode_leader(self, path: str, acls: List[ACL] | None = None) -> None:
392437
"""Updates ACLs for an existing zNode on the current quorum leader.
393438
394439
Args:
395440
path: the zNode path to update
396441
acls: the new ACLs to be set on that path
397442
"""
398443
with ZooKeeperClient(
399-
host=self.leader,
444+
host=self.zk_host,
400445
client_port=self.client_port,
401446
username=self.username,
402447
password=self.password,
@@ -414,7 +459,7 @@ def delete_znode_leader(self, path: str) -> None:
414459
path: the zNode path to delete
415460
"""
416461
with ZooKeeperClient(
417-
host=self.leader,
462+
host=self.zk_host,
418463
client_port=self.client_port,
419464
username=self.username,
420465
password=self.password,
@@ -432,7 +477,7 @@ def get_version(self) -> str:
432477
String of ZooKeeper service version
433478
"""
434479
with ZooKeeperClient(
435-
host=self.leader,
480+
host=self.zk_host,
436481
client_port=self.client_port,
437482
username=self.username,
438483
password=self.password,
@@ -577,7 +622,7 @@ def delete_znode(self, path: str) -> None:
577622
return
578623
self.client.delete(path, recursive=True)
579624

580-
def create_znode(self, path: str, acls: List[ACL]) -> None:
625+
def create_znode(self, path: str, acls: List[ACL] | None = None) -> None:
581626
"""Create new znode.
582627
583628
Args:
@@ -599,7 +644,7 @@ def get_acls(self, path: str) -> List[ACL]:
599644

600645
return acl_list if acl_list else []
601646

602-
def set_acls(self, path: str, acls: List[ACL]) -> None:
647+
def set_acls(self, path: str, acls: List[ACL] | None = None) -> None:
603648
"""Sets acls for a desired znode path.
604649
605650
Args:

requirements.txt

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ charset-normalizer==3.4.0 ; python_version >= "3.10" and python_version < "4.0"
66
cryptography==43.0.3 ; python_version >= "3.10" and python_version < "4.0"
77
exceptiongroup==1.2.2 ; python_version >= "3.10" and python_version < "3.11"
88
h11==0.14.0 ; python_version >= "3.10" and python_version < "4.0"
9-
httpcore==1.0.6 ; python_version >= "3.10" and python_version < "4.0"
9+
httpcore==1.0.7 ; python_version >= "3.10" and python_version < "4.0"
1010
httpx==0.27.2 ; python_version >= "3.10" and python_version < "4.0"
1111
idna==3.10 ; python_version >= "3.10" and python_version < "4.0"
1212
jsonschema-specifications==2024.10.1 ; python_version >= "3.10" and python_version < "4.0"
@@ -17,11 +17,11 @@ lightkube==0.15.0 ; python_version >= "3.10" and python_version < "4.0"
1717
ops==2.17.0 ; python_version >= "3.10" and python_version < "4.0"
1818
pure-sasl==0.6.2 ; python_version >= "3.10" and python_version < "4.0"
1919
pycparser==2.22 ; python_version >= "3.10" and python_version < "4.0" and platform_python_implementation != "PyPy"
20-
pydantic==1.10.18 ; python_version >= "3.10" and python_version < "4.0"
20+
pydantic==1.10.19 ; python_version >= "3.10" and python_version < "4.0"
2121
pyyaml==6.0.2 ; python_version >= "3.10" and python_version < "4.0"
2222
referencing==0.35.1 ; python_version >= "3.10" and python_version < "4.0"
2323
requests==2.32.3 ; python_version >= "3.10" and python_version < "4.0"
24-
rpds-py==0.18.1 ; python_version >= "3.10" and python_version < "4.0"
24+
rpds-py==0.21.0 ; python_version >= "3.10" and python_version < "4.0"
2525
sniffio==1.3.1 ; python_version >= "3.10" and python_version < "4.0"
2626
tenacity==9.0.0 ; python_version >= "3.10" and python_version < "4.0"
2727
typing-extensions==4.12.2 ; python_version >= "3.10" and python_version < "4.0"

src/core/models.py

+49-22
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
DataPeerData,
1616
DataPeerUnitData,
1717
)
18-
from charms.zookeeper.v0.client import QuorumLeaderNotFoundError, ZooKeeperManager
18+
from charms.zookeeper.v0.client import (
19+
NoUnitFoundError,
20+
QuorumLeaderNotFoundError,
21+
ZooKeeperManager,
22+
)
1923
from kazoo.client import AuthFailedError, ConnectionLoss, NoNodeError
2024
from kazoo.exceptions import NoAuthError
2125
from lightkube.resources.core_v1 import Node, Pod
@@ -689,23 +693,6 @@ def chroot(self) -> str:
689693
or ""
690694
)
691695

692-
@property
693-
def uris(self) -> str:
694-
"""Comma separated connection string, containing endpoints + chroot."""
695-
if not self.relation:
696-
return ""
697-
698-
return ",".join(
699-
sorted( # sorting as they may be disordered
700-
(
701-
self.data_interface.fetch_relation_field(
702-
relation_id=self.relation.id, field="uris"
703-
)
704-
or ""
705-
).split(",")
706-
)
707-
)
708-
709696
@property
710697
def tls(self) -> bool:
711698
"""Check if TLS is enabled on ZooKeeper."""
@@ -737,11 +724,45 @@ def zookeeper_connected(self) -> bool:
737724

738725
return True
739726

727+
@property
728+
def hosts(self) -> list[str]:
729+
"""Get the hosts from the databag."""
730+
return [host.split(":")[0] for host in self.endpoints.split(",")]
731+
732+
@property
733+
def uris(self):
734+
"""Comma separated connection string, containing endpoints + chroot."""
735+
return f"{self.endpoints.removesuffix('/')}/{self.database.removeprefix('/')}"
736+
737+
@property
738+
def port(self) -> int:
739+
"""Get the port in use from the databag.
740+
741+
We can extract from:
742+
- host1:port,host2:port
743+
- host1,host2:port
744+
"""
745+
try:
746+
port = next(
747+
iter([int(host.split(":")[1]) for host in reversed(self.endpoints.split(","))]),
748+
2181,
749+
)
750+
except IndexError:
751+
# compatibility with older zk versions
752+
port = 2181
753+
754+
return port
755+
740756
@property
741757
def zookeeper_version(self) -> str:
742758
"""Get running zookeeper version."""
743-
hosts = self.endpoints.split(",")
744-
zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
759+
zk = ZooKeeperManager(
760+
hosts=self.hosts,
761+
client_port=self.port,
762+
username=self.username,
763+
password=self.password,
764+
use_ssl=self.tls,
765+
)
745766

746767
return zk.get_version()
747768

@@ -755,16 +776,22 @@ def zookeeper_version(self) -> str:
755776
def broker_active(self) -> bool:
756777
"""Checks if broker id is recognised as active by ZooKeeper."""
757778
broker_id = self.data_interface.local_unit.name.split("/")[1]
758-
hosts = self.endpoints.split(",")
759779
path = f"{self.database}/brokers/ids/"
760780

761-
zk = ZooKeeperManager(hosts=hosts, username=self.username, password=self.password)
762781
try:
782+
zk = ZooKeeperManager(
783+
hosts=self.hosts,
784+
client_port=self.port,
785+
username=self.username,
786+
password=self.password,
787+
use_ssl=self.tls,
788+
)
763789
brokers = zk.leader_znodes(path=path)
764790
except (
765791
NoNodeError,
766792
AuthFailedError,
767793
QuorumLeaderNotFoundError,
794+
NoUnitFoundError,
768795
ConnectionLoss,
769796
NoAuthError,
770797
) as e:

tests/integration/helpers.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import subprocess
99
from pathlib import Path
1010
from subprocess import PIPE, CalledProcessError, check_output
11-
from typing import Any, Dict, List, Optional, Set
11+
from typing import Any, List, Optional, Set
1212

1313
import yaml
1414
from charms.kafka.client import KafkaClient
@@ -485,7 +485,7 @@ def get_provider_data(
485485
return provider_relation_data | user_secret | tls_secret
486486

487487

488-
def get_active_brokers(config: Dict) -> Set[str]:
488+
def get_active_brokers(config: dict[str, str]) -> set[str]:
489489
"""Gets all brokers currently connected to ZooKeeper.
490490
491491
Args:
@@ -495,9 +495,9 @@ def get_active_brokers(config: Dict) -> Set[str]:
495495
Set of active broker ids
496496
"""
497497
chroot = config.get("database", config.get("chroot", ""))
498-
hosts = config.get("endpoints", "").split(",")
499498
username = config.get("username", "")
500499
password = config.get("password", "")
500+
hosts = [host.split(":")[0] for host in config.get("endpoints", "").split(",")]
501501

502502
zk = ZooKeeperManager(hosts=hosts, username=username, password=password)
503503
path = f"{chroot}/brokers/ids/"

0 commit comments

Comments
 (0)