Skip to content

Commit d3af469

Browse files
authored
Merge branch 'main' into DPE-6431-break-main-charm
2 parents a6cf98b + 62b2c50 commit d3af469

File tree

4 files changed

+184
-23
lines changed

4 files changed

+184
-23
lines changed

lib/charms/kafka/v0/client.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def on_kafka_relation_created(self, event: RelationCreatedEvent):
7777
import time
7878
import sys
7979
from functools import cached_property
80-
from typing import Generator, List, Optional
80+
from typing import Generator, List, Optional, Any
8181

8282
from kafka import KafkaAdminClient, KafkaConsumer, KafkaProducer
8383
from kafka.admin import NewTopic
@@ -184,6 +184,10 @@ def _consumer_client(self) -> KafkaConsumer:
184184
consumer_timeout_ms=15000,
185185
)
186186

187+
def describe_cluster(self) -> Any:
188+
"""Returns the cluster metadata."""
189+
return self._admin_client.describe_cluster()
190+
187191
def create_topic(self, topic: NewTopic) -> None:
188192
"""Creates a new topic on the Kafka cluster.
189193

tests/integration/conftest.py

+48
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@
33
# See LICENSE file for licensing details.
44

55

6+
import pathlib
67
import subprocess
8+
from types import SimpleNamespace
79

810
import juju
911
import pytest
1012
import yaml
1113
from pytest_operator.plugin import OpsTest
14+
from tenacity import Retrying, stop_after_delay, wait_fixed
1215

1316
from .helpers import (
1417
K8S_DB_MODEL_NAME,
@@ -43,3 +46,48 @@ async def kafka_benchmark_charm(ops_test: OpsTest):
4346
"""Kafka charm used for integration testing."""
4447
charm = await ops_test.build_charm(".")
4548
return charm
49+
50+
51+
@pytest.fixture(scope="module")
52+
async def microk8s(ops_test: OpsTest) -> SimpleNamespace:
53+
controller = yaml.safe_load(subprocess.check_output(["juju", "show-controller"]))
54+
55+
for controller_name in controller.keys():
56+
# controller_data = details["details"]
57+
try:
58+
subprocess.run(["sudo", "snap", "install", "--classic", "microk8s"], check=True)
59+
subprocess.run(["sudo", "snap", "install", "--classic", "kubectl"], check=True)
60+
subprocess.run(["sudo", "microk8s", "enable", "dns", "hostpath-storage"], check=True)
61+
62+
# Configure kubectl now
63+
subprocess.run(["mkdir", "-p", str(pathlib.Path.home() / ".kube")], check=True)
64+
kubeconfig = subprocess.check_output(["sudo", "microk8s", "config"])
65+
with open(str(pathlib.Path.home() / ".kube" / "config"), "w") as f:
66+
f.write(kubeconfig.decode())
67+
for attempt in Retrying(stop=stop_after_delay(150), wait=wait_fixed(15)):
68+
with attempt:
69+
if (
70+
len(
71+
subprocess.check_output(
72+
"kubectl get po -A --field-selector=status.phase!=Running",
73+
shell=True,
74+
stderr=subprocess.DEVNULL,
75+
).decode()
76+
)
77+
!= 0
78+
): # We got sth different than "No resources found." in stderr
79+
raise Exception()
80+
81+
# Get controller name
82+
ctlname = controller_name
83+
84+
# Add microk8s to the kubeconfig
85+
subprocess.run(
86+
["juju", "add-k8s", MICROK8S_CLOUD_NAME, "--client", "--controller", ctlname],
87+
check=True,
88+
)
89+
90+
except subprocess.CalledProcessError as e:
91+
pytest.exit(str(e))
92+
93+
return SimpleNamespace(cloud_name=MICROK8S_CLOUD_NAME)

tests/integration/helpers.py

+44
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import subprocess
88
from types import SimpleNamespace
99

10+
import pytest
1011
from pytest_operator.plugin import OpsTest
1112
from tenacity import Retrying, stop_after_delay, wait_fixed
1213

@@ -20,9 +21,52 @@
2021
CONFIG_OPTS = {"workload_name": "test_mode", "parallel_processes": 1}
2122
SERIES = "jammy"
2223
KAFKA = "kafka"
24+
KAFKA_K8S = "kafka-k8s"
2325
APP_NAME = "kafka-benchmark"
2426
KAFKA_CHANNEL = "3/edge"
2527
DEFAULT_NUM_UNITS = 2
28+
K8S_DB_MODEL_NAME = "database"
29+
MICROK8S_CLOUD_NAME = "uk8s"
30+
31+
32+
DEPLOY_MARKS = [
33+
(
34+
pytest.param(
35+
use_tls,
36+
cloud,
37+
id=str(use_tls) + f"-{cloud}",
38+
marks=pytest.mark.group(str(use_tls) + f"-{cloud}"),
39+
)
40+
)
41+
for use_tls in [True, False]
42+
for cloud in ["vm", MICROK8S_CLOUD_NAME]
43+
]
44+
45+
K8S_MARKS = [
46+
(
47+
pytest.param(
48+
use_tls,
49+
cloud,
50+
id=str(use_tls) + f"-{cloud}",
51+
marks=pytest.mark.group(str(use_tls) + f"-{cloud}"),
52+
)
53+
)
54+
for use_tls in [True, False]
55+
for cloud in [MICROK8S_CLOUD_NAME]
56+
]
57+
58+
VM_MARKS = [
59+
(
60+
pytest.param(
61+
use_tls,
62+
cloud,
63+
id=str(use_tls) + f"-{cloud}",
64+
marks=pytest.mark.group(str(use_tls) + f"-{cloud}"),
65+
)
66+
)
67+
for use_tls in [True, False]
68+
for cloud in ["vm"]
69+
]
2670

2771

2872
KRAFT_CONFIG = {

tests/integration/test_charm.py

+87-22
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import logging
66

7+
import juju
78
import pytest
89
from pytest_operator.plugin import OpsTest
910
from tenacity import Retrying, stop_after_attempt, wait_fixed
@@ -12,11 +13,16 @@
1213
APP_NAME,
1314
CONFIG_OPTS,
1415
DEFAULT_NUM_UNITS,
16+
DEPLOY_MARKS,
17+
K8S_DB_MODEL_NAME,
18+
K8S_MARKS,
1519
KAFKA,
1620
KAFKA_CHANNEL,
21+
KAFKA_K8S,
1722
KRAFT_CONFIG,
1823
MODEL_CONFIG,
1924
SERIES,
25+
VM_MARKS,
2026
check_service,
2127
get_leader_unit_id,
2228
run_action,
@@ -25,23 +31,78 @@
2531
logger = logging.getLogger(__name__)
2632

2733

28-
TLS_MARK = {
29-
(use_tls): pytest.param(
30-
use_tls,
31-
id=str(use_tls),
32-
marks=[
33-
pytest.mark.group(str(use_tls)),
34-
],
34+
model_db = None
35+
36+
37+
@pytest.mark.parametrize("use_tls,cloud", K8S_MARKS)
38+
@pytest.mark.abort_on_fail
39+
@pytest.mark.skip_if_deployed
40+
async def test_build_and_deploy_k8s_only(
41+
ops_test: OpsTest, microk8s, kafka_benchmark_charm, use_tls, cloud
42+
) -> None:
43+
"""Build and deploy with and without TLS on k8s."""
44+
logging.info(f"Creating k8s model {K8S_DB_MODEL_NAME}")
45+
controller = juju.controller.Controller()
46+
await controller.connect()
47+
await controller.add_model(K8S_DB_MODEL_NAME, cloud_name=microk8s.cloud_name)
48+
49+
global model_db
50+
model_db = juju.model.Model()
51+
await model_db.connect(model_name=K8S_DB_MODEL_NAME)
52+
53+
await ops_test.model.set_config(MODEL_CONFIG)
54+
await ops_test.model.deploy(
55+
kafka_benchmark_charm,
56+
num_units=DEFAULT_NUM_UNITS,
57+
series=SERIES,
58+
config=CONFIG_OPTS,
59+
)
60+
await model_db.deploy(
61+
KAFKA_K8S,
62+
channel=KAFKA_CHANNEL,
63+
config=KRAFT_CONFIG | {"expose_external": "nodeport"},
64+
num_units=DEFAULT_NUM_UNITS,
65+
series=SERIES,
66+
trust=True,
67+
)
68+
await model_db.create_offer(
69+
endpoint="kafka-client",
70+
offer_name="kafka-client",
71+
application_name=KAFKA_K8S,
3572
)
36-
for use_tls in [True, False]
37-
}
38-
USE_TLS = list(TLS_MARK.values())
73+
await ops_test.model.consume(f"admin/{model_db.name}.kafka-client")
74+
await ops_test.model.integrate("kafka-client", f"{APP_NAME}:kafka")
3975

76+
if use_tls:
77+
await ops_test.model.deploy(
78+
"self-signed-certificates",
79+
num_units=1,
80+
series=SERIES,
81+
)
82+
await ops_test.model.integrate(APP_NAME, "self-signed-certificates")
4083

41-
@pytest.mark.parametrize("use_tls", USE_TLS)
84+
await ops_test.model.create_offer(
85+
endpoint="certificates",
86+
offer_name="certificates",
87+
application_name="self-signed-certificates",
88+
)
89+
await model_db.consume(f"admin/{ops_test.model.name}.certificates")
90+
await model_db.integrate("certificates", f"{KAFKA_K8S}:certificates")
91+
92+
await ops_test.model.wait_for_idle(apps=[APP_NAME], status="waiting", timeout=2000)
93+
await model_db.wait_for_idle(apps=[KAFKA_K8S], status="active", timeout=2000)
94+
95+
assert len(ops_test.model.applications[APP_NAME].units) == DEFAULT_NUM_UNITS
96+
await controller.disconnect()
97+
await model_db.disconnect()
98+
99+
100+
@pytest.mark.parametrize("use_tls,cloud", VM_MARKS)
42101
@pytest.mark.abort_on_fail
43102
@pytest.mark.skip_if_deployed
44-
async def test_deploy(ops_test: OpsTest, kafka_benchmark_charm, use_tls) -> None:
103+
async def test_build_and_deploy_vm_only(
104+
ops_test: OpsTest, kafka_benchmark_charm, use_tls, cloud
105+
) -> None:
45106
"""Build and deploy with and without TLS."""
46107
await ops_test.model.set_config(MODEL_CONFIG)
47108

@@ -73,10 +134,14 @@ async def test_deploy(ops_test: OpsTest, kafka_benchmark_charm, use_tls) -> None
73134

74135
assert len(ops_test.model.applications[APP_NAME].units) == DEFAULT_NUM_UNITS
75136

137+
# set the model to the global model_db
138+
global model_db
139+
model_db = ops_test.model
140+
76141

77-
@pytest.mark.parametrize("use_tls", USE_TLS)
142+
@pytest.mark.parametrize("use_tls,cloud", DEPLOY_MARKS)
78143
@pytest.mark.abort_on_fail
79-
async def test_prepare(ops_test: OpsTest, use_tls) -> None:
144+
async def test_prepare(ops_test: OpsTest, use_tls, cloud) -> None:
80145
"""Test prepare action."""
81146
leader_id = await get_leader_unit_id(ops_test)
82147

@@ -93,9 +158,9 @@ async def test_prepare(ops_test: OpsTest, use_tls) -> None:
93158
)
94159

95160

96-
@pytest.mark.parametrize("use_tls", USE_TLS)
161+
@pytest.mark.parametrize("use_tls,cloud", DEPLOY_MARKS)
97162
@pytest.mark.abort_on_fail
98-
async def test_run(ops_test: OpsTest, use_tls) -> None:
163+
async def test_run(ops_test: OpsTest, use_tls, cloud) -> None:
99164
"""Test run action."""
100165
leader_id = await get_leader_unit_id(ops_test)
101166

@@ -111,9 +176,9 @@ async def test_run(ops_test: OpsTest, use_tls) -> None:
111176
assert check_service("dpe_benchmark", unit_id=leader_id)
112177

113178

114-
@pytest.mark.parametrize("use_tls", USE_TLS)
179+
@pytest.mark.parametrize("use_tls,cloud", DEPLOY_MARKS)
115180
@pytest.mark.abort_on_fail
116-
async def test_stop(ops_test: OpsTest, use_tls) -> None:
181+
async def test_stop(ops_test: OpsTest, use_tls, cloud) -> None:
117182
"""Test stop action."""
118183
leader_id = await get_leader_unit_id(ops_test)
119184

@@ -129,9 +194,9 @@ async def test_stop(ops_test: OpsTest, use_tls) -> None:
129194
assert not check_service("dpe_benchmark", unit_id=leader_id)
130195

131196

132-
@pytest.mark.parametrize("use_tls", USE_TLS)
197+
@pytest.mark.parametrize("use_tls,cloud", DEPLOY_MARKS)
133198
@pytest.mark.abort_on_fail
134-
async def test_restart(ops_test: OpsTest, use_tls) -> None:
199+
async def test_restart(ops_test: OpsTest, use_tls, cloud) -> None:
135200
"""Test stop and restart the benchmark."""
136201
leader_id = await get_leader_unit_id(ops_test)
137202

@@ -147,9 +212,9 @@ async def test_restart(ops_test: OpsTest, use_tls) -> None:
147212
assert check_service("dpe_benchmark", unit_id=leader_id)
148213

149214

150-
@pytest.mark.parametrize("use_tls", USE_TLS)
215+
@pytest.mark.parametrize("use_tls,cloud", DEPLOY_MARKS)
151216
@pytest.mark.abort_on_fail
152-
async def test_clean(ops_test: OpsTest, use_tls) -> None:
217+
async def test_clean(ops_test: OpsTest, use_tls, cloud) -> None:
153218
"""Test cleanup action."""
154219
leader_id = await get_leader_unit_id(ops_test)
155220

0 commit comments

Comments
 (0)