Skip to content

Commit 860fddc

Browse files
fix: failing upgrades without chain (#173)
1 parent 0eae5c5 commit 860fddc

File tree

9 files changed

+159
-52
lines changed

9 files changed

+159
-52
lines changed

src/core/models.py

+12-29
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,17 @@ def endpoints(self) -> str:
769769
or ""
770770
)
771771

772+
@property
773+
def uris(self) -> str:
774+
"""Connection address for Kafka to use to connect to ZooKeeper with."""
775+
if not self.relation:
776+
return ""
777+
778+
return (
779+
self.data_interface.fetch_relation_field(relation_id=self.relation.id, field="uris")
780+
or ""
781+
)
782+
772783
@property
773784
def database(self) -> str:
774785
"""Path allocated for Kafka on ZooKeeper."""
@@ -827,41 +838,15 @@ def zookeeper_connected(self) -> bool:
827838
@property
828839
def hosts(self) -> list[str]:
829840
"""Get the hosts from the databag."""
830-
return [host.split(":")[0] for host in self.endpoints.split(",")]
831-
832-
@property
833-
def uris(self):
834-
"""Comma separated connection string, containing endpoints + chroot."""
835-
return f"{self.endpoints.removesuffix('/')}/{self.database.removeprefix('/')}"
836-
837-
@property
838-
def port(self) -> int:
839-
"""Get the port in use from the databag.
840-
841-
We can extract from:
842-
- host1:port,host2:port
843-
- host1,host2:port
844-
"""
845-
try:
846-
port = next(
847-
iter([int(host.split(":")[1]) for host in reversed(self.endpoints.split(","))]),
848-
2181,
849-
)
850-
except IndexError:
851-
# compatibility with older zk versions
852-
port = 2181
853-
854-
return port
841+
return [host.split(":")[0] for host in self.uris.split(",")]
855842

856843
@property
857844
def zookeeper_version(self) -> str:
858845
"""Get running zookeeper version."""
859846
zk = ZooKeeperManager(
860847
hosts=self.hosts,
861-
client_port=self.port,
862848
username=self.username,
863849
password=self.password,
864-
use_ssl=self.tls,
865850
)
866851

867852
return zk.get_version()
@@ -880,10 +865,8 @@ def broker_active(self) -> bool:
880865
try:
881866
zk = ZooKeeperManager(
882867
hosts=self.hosts,
883-
client_port=self.port,
884868
username=self.username,
885869
password=self.password,
886-
use_ssl=self.tls,
887870
)
888871
brokers = zk.leader_znodes(path=path)
889872
except (

src/events/upgrade.py

+19-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
"""Manager for handling Kafka in-place upgrades."""
55

6+
import json
67
import logging
78
import subprocess
89
from typing import TYPE_CHECKING
@@ -22,7 +23,7 @@
2223
from pydantic import BaseModel
2324
from typing_extensions import override
2425

25-
from literals import Status
26+
from literals import TLS_RELATION, Status
2627

2728
if TYPE_CHECKING:
2829
from charm import KafkaCharm
@@ -197,3 +198,20 @@ def apply_backwards_compatibility_fixes(self, event) -> None:
197198

198199
# Rev.65 - Creation of external K8s services
199200
self.dependent.update_external_services()
201+
202+
# Rev.78 - TLS chain not yet set to peer relation data
203+
if (
204+
tls_relation := self.charm.model.get_relation(TLS_RELATION)
205+
) and not self.charm.state.unit_broker.chain:
206+
all_certificates = json.loads(
207+
tls_relation.data[tls_relation.app].get("certificates", "[]")
208+
)
209+
for certificate in all_certificates:
210+
if certificate["certificate"] == self.charm.state.unit_broker.certificate:
211+
logger.info("Saving new bundle...")
212+
self.charm.state.unit_broker.update(
213+
{"chain": json.dumps(certificate["chain"])}
214+
)
215+
216+
if not self.charm.state.unit_broker.chain:
217+
logger.error("Unable to find valid chain")

src/events/zookeeper.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ def _on_zookeeper_created(self, _) -> None:
5454

5555
def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:
5656
"""Handler for `zookeeper_relation_created/joined/changed` events, ensuring internal users get created."""
57-
if not self.charm.state.zookeeper.endpoints:
57+
if not self.charm.state.zookeeper.uris and self.charm.workload.container_can_connect:
58+
5859
# Kafka keeps a meta.properties in every log.dir with a unique ClusterID
5960
# this ID is provided by ZK, and removing it on relation-changed allows
6061
# re-joining a ZK cluster undergoing backup restoration.

tests/integration/helpers.py

+7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@
4040
REL_NAME_ADMIN = "kafka-client-admin"
4141
TEST_DEFAULT_MESSAGES = 15
4242
STORAGE = "data"
43+
TLS_NAME = "self-signed-certificates"
44+
MANUAL_TLS_NAME = "manual-tls-certificates"
45+
CERTS_NAME = "tls-certificates-operator"
46+
TLS_REQUIRER = "tls-certificates-requirer"
47+
48+
MTLS_NAME = "mtls"
49+
DUMMY_NAME = "app"
4350

4451

4552
class KRaftUnitStatus(Enum):

tests/integration/test_balancer.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,18 @@ async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm):
9898
trust=True,
9999
)
100100

101-
await ops_test.model.wait_for_idle(
102-
apps=list({APP_NAME, ZK_NAME, self.balancer_app}),
103-
idle_period=30,
104-
timeout=1800,
105-
raise_on_error=False,
106-
)
101+
async with ops_test.fast_forward(fast_interval="60s"):
102+
await ops_test.model.wait_for_idle(
103+
apps=list({APP_NAME, ZK_NAME, self.balancer_app}),
104+
idle_period=30,
105+
timeout=1800,
106+
raise_on_error=False,
107+
)
108+
109+
# ensuring update-status fires
110+
async with ops_test.fast_forward(fast_interval="10s"):
111+
await asyncio.sleep(30)
112+
107113
assert ops_test.model.applications[APP_NAME].status == "blocked"
108114
assert ops_test.model.applications[ZK_NAME].status == "active"
109115
assert ops_test.model.applications[self.balancer_app].status == "blocked"

tests/integration/test_kraft.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -107,12 +107,18 @@ async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm):
107107
trust=True,
108108
)
109109

110-
await ops_test.model.wait_for_idle(
111-
apps=list({APP_NAME, self.controller_app}),
112-
idle_period=30,
113-
timeout=1800,
114-
raise_on_error=False,
115-
)
110+
async with ops_test.fast_forward(fast_interval="60s"):
111+
await ops_test.model.wait_for_idle(
112+
apps=list({APP_NAME, self.controller_app}),
113+
idle_period=30,
114+
timeout=1800,
115+
raise_on_error=False,
116+
)
117+
118+
# ensuring update-status fires
119+
async with ops_test.fast_forward(fast_interval="10s"):
120+
await asyncio.sleep(30)
121+
116122
if self.controller_app != APP_NAME:
117123
assert ops_test.model.applications[APP_NAME].status == "blocked"
118124
assert ops_test.model.applications[self.controller_app].status == "blocked"

tests/integration/test_tls.py

+6-8
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,14 @@
1818

1919
from .helpers import (
2020
APP_NAME,
21+
CERTS_NAME,
22+
DUMMY_NAME,
2123
KAFKA_CONTAINER,
24+
MANUAL_TLS_NAME,
25+
MTLS_NAME,
2226
REL_NAME_ADMIN,
27+
TLS_NAME,
28+
TLS_REQUIRER,
2329
ZK_NAME,
2430
check_tls,
2531
create_test_topic,
@@ -39,14 +45,6 @@
3945

4046
logger = logging.getLogger(__name__)
4147

42-
TLS_NAME = "self-signed-certificates"
43-
MANUAL_TLS_NAME = "manual-tls-certificates"
44-
CERTS_NAME = "tls-certificates-operator"
45-
TLS_REQUIRER = "tls-certificates-requirer"
46-
47-
MTLS_NAME = "mtls"
48-
DUMMY_NAME = "app"
49-
5048

5149
@pytest.mark.skip_if_deployed
5250
@pytest.mark.abort_on_fail

tests/integration/test_upgrade.py

+87-1
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@
88
import pytest
99
from pytest_operator.plugin import OpsTest
1010

11+
from literals import TLS_RELATION
12+
1113
from .helpers import (
1214
APP_NAME,
1315
DUMMY_NAME,
1416
KAFKA_CONTAINER,
1517
REL_NAME_ADMIN,
18+
TLS_NAME,
1619
ZK_NAME,
1720
check_logs,
1821
)
@@ -23,7 +26,10 @@
2326

2427

2528
@pytest.mark.abort_on_fail
26-
async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm, app_charm):
29+
async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm):
30+
"""Tests happy path upgrade with TLS."""
31+
tls_config = {"ca-common-name": "kafka"}
32+
2733
await asyncio.gather(
2834
ops_test.model.deploy(
2935
ZK_NAME,
@@ -32,6 +38,86 @@ async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm, app_charm):
3238
num_units=1,
3339
trust=True,
3440
),
41+
ops_test.model.deploy(
42+
APP_NAME,
43+
application_name=APP_NAME,
44+
num_units=1,
45+
channel=CHANNEL,
46+
trust=True,
47+
),
48+
ops_test.model.deploy(
49+
TLS_NAME, channel="edge", config=tls_config, revision=163, trust=True
50+
),
51+
)
52+
53+
await asyncio.gather(
54+
ops_test.model.add_relation(APP_NAME, ZK_NAME),
55+
ops_test.model.add_relation(ZK_NAME, TLS_NAME),
56+
ops_test.model.add_relation(f"{APP_NAME}:{TLS_RELATION}", TLS_NAME),
57+
)
58+
59+
async with ops_test.fast_forward(fast_interval="60s"):
60+
await ops_test.model.wait_for_idle(
61+
apps=[APP_NAME, ZK_NAME, TLS_NAME],
62+
idle_period=30,
63+
timeout=1800,
64+
status="active",
65+
raise_on_error=False,
66+
)
67+
68+
await ops_test.model.wait_for_idle(
69+
apps=[APP_NAME, ZK_NAME, TLS_NAME], status="active", idle_period=30
70+
)
71+
72+
await ops_test.model.applications[APP_NAME].add_units(count=2)
73+
await ops_test.model.wait_for_idle(
74+
apps=[APP_NAME], status="active", timeout=600, idle_period=30, wait_for_exact_units=3
75+
)
76+
77+
leader_unit = None
78+
for unit in ops_test.model.applications[APP_NAME].units:
79+
if await unit.is_leader_from_status():
80+
leader_unit = unit
81+
assert leader_unit
82+
83+
logger.info("Calling pre-upgrade-check...")
84+
action = await leader_unit.run_action("pre-upgrade-check")
85+
await action.wait()
86+
await ops_test.model.wait_for_idle(
87+
apps=[APP_NAME], timeout=1000, idle_period=15, status="active"
88+
)
89+
90+
logger.info("Upgrading Kafka...")
91+
await ops_test.model.applications[APP_NAME].refresh(
92+
path=kafka_charm,
93+
resources={"kafka-image": KAFKA_CONTAINER},
94+
)
95+
96+
async with ops_test.fast_forward(fast_interval="20s"):
97+
await asyncio.sleep(90)
98+
99+
await ops_test.model.wait_for_idle(
100+
apps=[APP_NAME], timeout=1000, idle_period=180, raise_on_error=False
101+
)
102+
103+
action = await leader_unit.run_action("resume-upgrade")
104+
await action.wait()
105+
await ops_test.model.wait_for_idle(
106+
apps=[APP_NAME], timeout=1000, idle_period=30, status="active"
107+
)
108+
109+
# cleanup existing 'current' Kafka, and remove TLS for next test
110+
await ops_test.model.remove_application(APP_NAME, block_until_done=True)
111+
await ops_test.model.remove_application(TLS_NAME, block_until_done=True)
112+
await ops_test.model.wait_for_idle(
113+
apps=[ZK_NAME], timeout=1800, idle_period=30, status="active"
114+
)
115+
116+
117+
@pytest.mark.abort_on_fail
118+
async def test_in_place_upgrade_consistency(ops_test: OpsTest, kafka_charm, app_charm):
119+
"""Tests non-TLS upgrade data consistency during upgrade."""
120+
await asyncio.gather(
35121
ops_test.model.deploy(
36122
APP_NAME,
37123
application_name=APP_NAME,

tests/unit/test_config.py

+2
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,7 @@ def test_zookeeper_config_succeeds_fails_config(ctx: Context, base_state: State)
393393
"chroot": "/kafka",
394394
"username": "moria",
395395
"endpoints": "1.1.1.1:2181,2.2.2.2:2181",
396+
"uris": "1.1.1.1:2181,2.2.2.2:2181/kafka",
396397
"tls": "disabled",
397398
},
398399
)
@@ -418,6 +419,7 @@ def test_zookeeper_config_succeeds_valid_config(ctx: Context, base_state: State)
418419
"username": "moria",
419420
"password": "mellon",
420421
"endpoints": "1.1.1.1:2181,2.2.2.2:2181",
422+
"uris": "1.1.1.1:2181,2.2.2.2:2181/kafka",
421423
"tls": "disabled",
422424
},
423425
)

0 commit comments

Comments
 (0)