Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: failing upgrades without chain #173

Merged
merged 8 commits into from
Mar 18, 2025
Merged
41 changes: 12 additions & 29 deletions src/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,17 @@ def endpoints(self) -> str:
or ""
)

@property
def uris(self) -> str:
"""Connection address for Kafka to use to connect to ZooKeeper with."""
if not self.relation:
return ""

return (
self.data_interface.fetch_relation_field(relation_id=self.relation.id, field="uris")
or ""
)

@property
def database(self) -> str:
"""Path allocated for Kafka on ZooKeeper."""
Expand Down Expand Up @@ -827,41 +838,15 @@ def zookeeper_connected(self) -> bool:
@property
def hosts(self) -> list[str]:
"""Get the hosts from the databag."""
return [host.split(":")[0] for host in self.endpoints.split(",")]

@property
def uris(self):
"""Comma separated connection string, containing endpoints + chroot."""
return f"{self.endpoints.removesuffix('/')}/{self.database.removeprefix('/')}"

@property
def port(self) -> int:
"""Get the port in use from the databag.

We can extract from:
- host1:port,host2:port
- host1,host2:port
"""
try:
port = next(
iter([int(host.split(":")[1]) for host in reversed(self.endpoints.split(","))]),
2181,
)
except IndexError:
# compatibility with older zk versions
port = 2181

return port
return [host.split(":")[0] for host in self.uris.split(",")]

@property
def zookeeper_version(self) -> str:
"""Get running zookeeper version."""
zk = ZooKeeperManager(
hosts=self.hosts,
client_port=self.port,
username=self.username,
password=self.password,
use_ssl=self.tls,
)

return zk.get_version()
Expand All @@ -880,10 +865,8 @@ def broker_active(self) -> bool:
try:
zk = ZooKeeperManager(
hosts=self.hosts,
client_port=self.port,
username=self.username,
password=self.password,
use_ssl=self.tls,
)
brokers = zk.leader_znodes(path=path)
except (
Expand Down
20 changes: 19 additions & 1 deletion src/events/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

"""Manager for handling Kafka in-place upgrades."""

import json
import logging
import subprocess
from typing import TYPE_CHECKING
Expand All @@ -22,7 +23,7 @@
from pydantic import BaseModel
from typing_extensions import override

from literals import Status
from literals import TLS_RELATION, Status

if TYPE_CHECKING:
from charm import KafkaCharm
Expand Down Expand Up @@ -197,3 +198,20 @@ def apply_backwards_compatibility_fixes(self, event) -> None:

# Rev.65 - Creation of external K8s services
self.dependent.update_external_services()

# Rev.78 - TLS chain not yet set to peer relation data
if (
tls_relation := self.charm.model.get_relation(TLS_RELATION)
) and not self.charm.state.unit_broker.chain:
all_certificates = json.loads(
tls_relation.data[tls_relation.app].get("certificates", "[]")
)
for certificate in all_certificates:
if certificate["certificate"] == self.charm.state.unit_broker.certificate:
logger.info("Saving new bundle...")
self.charm.state.unit_broker.update(
{"chain": json.dumps(certificate["chain"])}
)

if not self.charm.state.unit_broker.chain:
logger.error("Unable to find valid chain")
3 changes: 2 additions & 1 deletion src/events/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ def _on_zookeeper_created(self, _) -> None:

def _on_zookeeper_changed(self, event: RelationChangedEvent) -> None:
"""Handler for `zookeeper_relation_created/joined/changed` events, ensuring internal users get created."""
if not self.charm.state.zookeeper.endpoints:
if not self.charm.state.zookeeper.uris and self.charm.workload.container_can_connect:

# Kafka keeps a meta.properties in every log.dir with a unique ClusterID
# this ID is provided by ZK, and removing it on relation-changed allows
# re-joining a ZK cluster undergoing backup restoration.
Expand Down
7 changes: 7 additions & 0 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@
REL_NAME_ADMIN = "kafka-client-admin"
TEST_DEFAULT_MESSAGES = 15
STORAGE = "data"
TLS_NAME = "self-signed-certificates"
MANUAL_TLS_NAME = "manual-tls-certificates"
CERTS_NAME = "tls-certificates-operator"
TLS_REQUIRER = "tls-certificates-requirer"

MTLS_NAME = "mtls"
DUMMY_NAME = "app"


class KRaftUnitStatus(Enum):
Expand Down
18 changes: 12 additions & 6 deletions tests/integration/test_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,18 @@ async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm):
trust=True,
)

await ops_test.model.wait_for_idle(
apps=list({APP_NAME, ZK_NAME, self.balancer_app}),
idle_period=30,
timeout=1800,
raise_on_error=False,
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=list({APP_NAME, ZK_NAME, self.balancer_app}),
idle_period=30,
timeout=1800,
raise_on_error=False,
)

# ensuring update-status fires
async with ops_test.fast_forward(fast_interval="10s"):
await asyncio.sleep(30)

assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[ZK_NAME].status == "active"
assert ops_test.model.applications[self.balancer_app].status == "blocked"
Expand Down
18 changes: 12 additions & 6 deletions tests/integration/test_kraft.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,18 @@ async def test_build_and_deploy(self, ops_test: OpsTest, kafka_charm):
trust=True,
)

await ops_test.model.wait_for_idle(
apps=list({APP_NAME, self.controller_app}),
idle_period=30,
timeout=1800,
raise_on_error=False,
)
async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=list({APP_NAME, self.controller_app}),
idle_period=30,
timeout=1800,
raise_on_error=False,
)

# ensuring update-status fires
async with ops_test.fast_forward(fast_interval="10s"):
await asyncio.sleep(30)

if self.controller_app != APP_NAME:
assert ops_test.model.applications[APP_NAME].status == "blocked"
assert ops_test.model.applications[self.controller_app].status == "blocked"
Expand Down
14 changes: 6 additions & 8 deletions tests/integration/test_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@

from .helpers import (
APP_NAME,
CERTS_NAME,
DUMMY_NAME,
KAFKA_CONTAINER,
MANUAL_TLS_NAME,
MTLS_NAME,
REL_NAME_ADMIN,
TLS_NAME,
TLS_REQUIRER,
ZK_NAME,
check_tls,
create_test_topic,
Expand All @@ -39,14 +45,6 @@

logger = logging.getLogger(__name__)

TLS_NAME = "self-signed-certificates"
MANUAL_TLS_NAME = "manual-tls-certificates"
CERTS_NAME = "tls-certificates-operator"
TLS_REQUIRER = "tls-certificates-requirer"

MTLS_NAME = "mtls"
DUMMY_NAME = "app"


@pytest.mark.skip_if_deployed
@pytest.mark.abort_on_fail
Expand Down
88 changes: 87 additions & 1 deletion tests/integration/test_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import pytest
from pytest_operator.plugin import OpsTest

from literals import TLS_RELATION

from .helpers import (
APP_NAME,
DUMMY_NAME,
KAFKA_CONTAINER,
REL_NAME_ADMIN,
TLS_NAME,
ZK_NAME,
check_logs,
)
Expand All @@ -23,7 +26,10 @@


@pytest.mark.abort_on_fail
async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm, app_charm):
async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm):
"""Tests happy path upgrade with TLS."""
tls_config = {"ca-common-name": "kafka"}

await asyncio.gather(
ops_test.model.deploy(
ZK_NAME,
Expand All @@ -32,6 +38,86 @@ async def test_in_place_upgrade(ops_test: OpsTest, kafka_charm, app_charm):
num_units=1,
trust=True,
),
ops_test.model.deploy(
APP_NAME,
application_name=APP_NAME,
num_units=1,
channel=CHANNEL,
trust=True,
),
ops_test.model.deploy(
TLS_NAME, channel="edge", config=tls_config, revision=163, trust=True
),
)

await asyncio.gather(
ops_test.model.add_relation(APP_NAME, ZK_NAME),
ops_test.model.add_relation(ZK_NAME, TLS_NAME),
ops_test.model.add_relation(f"{APP_NAME}:{TLS_RELATION}", TLS_NAME),
)

async with ops_test.fast_forward(fast_interval="60s"):
await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME, TLS_NAME],
idle_period=30,
timeout=1800,
status="active",
raise_on_error=False,
)

await ops_test.model.wait_for_idle(
apps=[APP_NAME, ZK_NAME, TLS_NAME], status="active", idle_period=30
)

await ops_test.model.applications[APP_NAME].add_units(count=2)
await ops_test.model.wait_for_idle(
apps=[APP_NAME], status="active", timeout=600, idle_period=30, wait_for_exact_units=3
)

leader_unit = None
for unit in ops_test.model.applications[APP_NAME].units:
if await unit.is_leader_from_status():
leader_unit = unit
assert leader_unit

logger.info("Calling pre-upgrade-check...")
action = await leader_unit.run_action("pre-upgrade-check")
await action.wait()
await ops_test.model.wait_for_idle(
apps=[APP_NAME], timeout=1000, idle_period=15, status="active"
)

logger.info("Upgrading Kafka...")
await ops_test.model.applications[APP_NAME].refresh(
path=kafka_charm,
resources={"kafka-image": KAFKA_CONTAINER},
)

async with ops_test.fast_forward(fast_interval="20s"):
await asyncio.sleep(90)

await ops_test.model.wait_for_idle(
apps=[APP_NAME], timeout=1000, idle_period=180, raise_on_error=False
)

action = await leader_unit.run_action("resume-upgrade")
await action.wait()
await ops_test.model.wait_for_idle(
apps=[APP_NAME], timeout=1000, idle_period=30, status="active"
)

# cleanup existing 'current' Kafka, and remove TLS for next test
await ops_test.model.remove_application(APP_NAME, block_until_done=True)
await ops_test.model.remove_application(TLS_NAME, block_until_done=True)
await ops_test.model.wait_for_idle(
apps=[ZK_NAME], timeout=1800, idle_period=30, status="active"
)


@pytest.mark.abort_on_fail
async def test_in_place_upgrade_consistency(ops_test: OpsTest, kafka_charm, app_charm):
"""Tests non-TLS upgrade data consistency during upgrade."""
await asyncio.gather(
ops_test.model.deploy(
APP_NAME,
application_name=APP_NAME,
Expand Down
2 changes: 2 additions & 0 deletions tests/unit/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ def test_zookeeper_config_succeeds_fails_config(ctx: Context, base_state: State)
"chroot": "/kafka",
"username": "moria",
"endpoints": "1.1.1.1:2181,2.2.2.2:2181",
"uris": "1.1.1.1:2181,2.2.2.2:2181/kafka",
"tls": "disabled",
},
)
Expand All @@ -418,6 +419,7 @@ def test_zookeeper_config_succeeds_valid_config(ctx: Context, base_state: State)
"username": "moria",
"password": "mellon",
"endpoints": "1.1.1.1:2181,2.2.2.2:2181",
"uris": "1.1.1.1:2181,2.2.2.2:2181/kafka",
"tls": "disabled",
},
)
Expand Down
Loading