23
23
from lightkube .core .exceptions import ApiError as LightKubeApiError
24
24
from ops import Object , Relation
25
25
from ops .model import Unit
26
+ from tenacity import retry , retry_if_exception_cause_type , stop_after_attempt , wait_fixed
26
27
27
28
from core .models import (
28
29
BrokerCapacities ,
37
38
ADMIN_USER ,
38
39
BALANCER ,
39
40
BROKER ,
41
+ CONTROLLER ,
42
+ CONTROLLER_PORT ,
40
43
INTERNAL_USERS ,
44
+ KRAFT_NODE_ID_OFFSET ,
41
45
MIN_REPLICAS ,
42
46
OAUTH_REL_NAME ,
43
47
PEER ,
@@ -86,7 +90,7 @@ class PeerClusterData(ProviderData, RequirerData):
86
90
"""Broker provider data model."""
87
91
88
92
SECRET_LABEL_MAP = SECRET_LABEL_MAP
89
- SECRET_FIELDS = BALANCER .requested_secrets
93
+ SECRET_FIELDS = list ( set ( BALANCER .requested_secrets ) | set ( CONTROLLER . requested_secrets ))
90
94
91
95
92
96
class ClusterState (Object ):
@@ -138,45 +142,48 @@ def peer_cluster_relation(self) -> Relation | None:
138
142
@property
139
143
def peer_cluster_orchestrator (self ) -> PeerCluster :
140
144
"""The state for the related `peer-cluster-orchestrator` application that this charm is requiring from."""
141
- balancer_kwargs : dict [str , Any ] = (
142
- {
143
- "balancer_username" : self .cluster .balancer_username ,
144
- "balancer_password" : self .cluster .balancer_password ,
145
- "balancer_uris" : self .cluster .balancer_uris ,
146
- }
147
- if self .runs_balancer
148
- else {}
149
- )
145
+ extra_kwargs : dict [str , Any ] = {}
146
+
147
+ if self .runs_balancer :
148
+ extra_kwargs .update (
149
+ {
150
+ "balancer_username" : self .cluster .balancer_username ,
151
+ "balancer_password" : self .cluster .balancer_password ,
152
+ "balancer_uris" : self .cluster .balancer_uris ,
153
+ }
154
+ )
155
+
156
+ if self .runs_controller :
157
+ extra_kwargs .update (
158
+ {
159
+ "controller_quorum_uris" : self .cluster .controller_quorum_uris ,
160
+ }
161
+ )
162
+
150
163
return PeerCluster (
151
164
relation = self .peer_cluster_relation ,
152
165
data_interface = PeerClusterData (self .model , PEER_CLUSTER_RELATION ),
153
- ** balancer_kwargs ,
166
+ ** extra_kwargs ,
154
167
)
155
168
156
169
@property
157
170
def peer_cluster (self ) -> PeerCluster :
158
- """The state for the related `peer-cluster` application that this charm is providing to."""
159
- return PeerCluster (
160
- relation = self .peer_cluster_orchestrator_relation ,
161
- data_interface = PeerClusterOrchestratorData (
162
- self .model , PEER_CLUSTER_ORCHESTRATOR_RELATION
163
- ),
164
- )
165
-
166
- @property
167
- def balancer (self ) -> PeerCluster :
168
171
"""The state for the `peer-cluster-orchestrator` related balancer application."""
169
- balancer_kwargs : dict [str , Any ] = (
170
- {
171
- "balancer_username" : self .cluster .balancer_username ,
172
- "balancer_password" : self .cluster .balancer_password ,
173
- "balancer_uris" : self .cluster .balancer_uris ,
174
- }
175
- if self .runs_balancer
176
- else {}
177
- )
172
+ extra_kwargs : dict [str , Any ] = {}
178
173
179
- if self .runs_broker : # must be providing, initialise with necessary broker data
174
+ if self .runs_controller or self .runs_balancer :
175
+ extra_kwargs .update (
176
+ {
177
+ "balancer_username" : self .cluster .balancer_username ,
178
+ "balancer_password" : self .cluster .balancer_password ,
179
+ "balancer_uris" : self .cluster .balancer_uris ,
180
+ "controller_quorum_uris" : self .cluster .controller_quorum_uris ,
181
+ }
182
+ )
183
+
184
+ # FIXME: `cluster_manager` check instead of running broker
185
+ # must be providing, initialise with necessary broker data
186
+ if self .runs_broker :
180
187
return PeerCluster (
181
188
relation = self .peer_cluster_orchestrator_relation , # if same app, this will be None and OK
182
189
data_interface = PeerClusterOrchestratorData (
@@ -185,12 +192,13 @@ def balancer(self) -> PeerCluster:
185
192
broker_username = ADMIN_USER ,
186
193
broker_password = self .cluster .internal_user_credentials .get (ADMIN_USER , "" ),
187
194
broker_uris = self .bootstrap_server ,
195
+ cluster_uuid = self .cluster .cluster_uuid ,
188
196
racks = self .racks ,
189
197
broker_capacities = self .broker_capacities ,
190
198
zk_username = self .zookeeper .username ,
191
199
zk_password = self .zookeeper .password ,
192
200
zk_uris = self .zookeeper .uris ,
193
- ** balancer_kwargs , # in case of roles=broker,balancer on this app
201
+ ** extra_kwargs , # in case of roles=broker,[ balancer,controller] on this app
194
202
)
195
203
196
204
else : # must be roles=balancer only then, only load with necessary balancer data
@@ -346,7 +354,11 @@ def default_auth(self) -> AuthMap:
346
354
def enabled_auth (self ) -> list [AuthMap ]:
347
355
"""The currently enabled auth.protocols and their auth.mechanisms, based on related applications."""
348
356
enabled_auth = []
349
- if self .client_relations or self .runs_balancer or self .peer_cluster_orchestrator_relation :
357
+ if (
358
+ self .client_relations
359
+ or self .runs_balancer
360
+ or BALANCER .value in self .peer_cluster_orchestrator .roles
361
+ ):
350
362
enabled_auth .append (self .default_auth )
351
363
if self .oauth_relation :
352
364
enabled_auth .append (AuthMap (self .default_auth .protocol , "OAUTHBEARER" ))
@@ -356,6 +368,12 @@ def enabled_auth(self) -> list[AuthMap]:
356
368
return enabled_auth
357
369
358
370
@property
371
+ @retry (
372
+ wait = wait_fixed (5 ),
373
+ stop = stop_after_attempt (3 ),
374
+ retry = retry_if_exception_cause_type (LightKubeApiError ),
375
+ reraise = True ,
376
+ )
359
377
def bootstrap_servers_external (self ) -> str :
360
378
"""Comma-delimited string of `bootstrap-server` for external access."""
361
379
return "," .join (
@@ -394,6 +412,21 @@ def bootstrap_server(self) -> str:
394
412
)
395
413
)
396
414
415
+ @property
416
+ def controller_quorum_uris (self ) -> str :
417
+ """The current controller quorum uris when running KRaft mode."""
418
+ # FIXME: when running broker node.id will be unit-id + 100. If unit is only running
419
+ # the controller node.id == unit-id. This way we can keep a human readable mapping of ids.
420
+ if self .runs_controller :
421
+ node_offset = KRAFT_NODE_ID_OFFSET if self .runs_broker else 0
422
+ return "," .join (
423
+ [
424
+ f"{ broker .unit_id + node_offset } @{ broker .internal_address } :{ CONTROLLER_PORT } "
425
+ for broker in self .brokers
426
+ ]
427
+ )
428
+ return ""
429
+
397
430
@property
398
431
def log_dirs (self ) -> str :
399
432
"""Builds the necessary log.dirs based on mounted storage volumes.
@@ -446,7 +479,7 @@ def ready_to_start(self) -> Status: # noqa: C901
446
479
if not self .peer_relation :
447
480
return Status .NO_PEER_RELATION
448
481
449
- for status in [self ._broker_status , self ._balancer_status ]:
482
+ for status in [self ._broker_status , self ._balancer_status , self . _controller_status ]:
450
483
if status != Status .ACTIVE :
451
484
return status
452
485
@@ -461,29 +494,40 @@ def _balancer_status(self) -> Status:
461
494
if not self .peer_cluster_relation and not self .runs_broker :
462
495
return Status .NO_PEER_CLUSTER_RELATION
463
496
464
- if not self .balancer .broker_connected :
497
+ if not self .peer_cluster .broker_connected :
465
498
return Status .NO_BROKER_DATA
466
499
467
- if len (self .balancer .broker_capacities .get ("brokerCapacities" , [])) < MIN_REPLICAS :
500
+ if len (self .peer_cluster .broker_capacities .get ("brokerCapacities" , [])) < MIN_REPLICAS :
468
501
return Status .NOT_ENOUGH_BROKERS
469
502
470
503
return Status .ACTIVE
471
504
472
505
@property
473
- def _broker_status (self ) -> Status :
506
+ def _broker_status (self ) -> Status : # noqa: C901
474
507
"""Checks for role=broker specific readiness."""
475
508
if not self .runs_broker :
476
509
return Status .ACTIVE
477
510
478
- if not self .zookeeper :
479
- return Status .ZK_NOT_RELATED
511
+ # Neither ZooKeeper or KRaft are active
512
+ if self .kraft_mode is None :
513
+ return Status .MISSING_MODE
514
+
515
+ if self .kraft_mode :
516
+ if not self .peer_cluster .controller_quorum_uris : # FIXME: peer_cluster or cluster?
517
+ return Status .NO_QUORUM_URIS
518
+ if not self .cluster .cluster_uuid :
519
+ return Status .NO_CLUSTER_UUID
480
520
481
- if not self .zookeeper .zookeeper_connected :
482
- return Status .ZK_NO_DATA
521
+ if self .kraft_mode == False : # noqa: E712
522
+ if not self .zookeeper :
523
+ return Status .ZK_NOT_RELATED
483
524
484
- # TLS must be enabled for Kafka and ZK or disabled for both
485
- if self .cluster .tls_enabled ^ self .zookeeper .tls :
486
- return Status .ZK_TLS_MISMATCH
525
+ if not self .zookeeper .zookeeper_connected :
526
+ return Status .ZK_NO_DATA
527
+
528
+ # TLS must be enabled for Kafka and ZK or disabled for both
529
+ if self .cluster .tls_enabled ^ self .zookeeper .tls :
530
+ return Status .ZK_TLS_MISMATCH
487
531
488
532
if self .cluster .tls_enabled and not self .unit_broker .certificate :
489
533
return Status .NO_CERT
@@ -493,6 +537,37 @@ def _broker_status(self) -> Status:
493
537
494
538
return Status .ACTIVE
495
539
540
+ @property
541
+ def _controller_status (self ) -> Status :
542
+ """Checks for role=controller specific readiness."""
543
+ if not self .runs_controller :
544
+ return Status .ACTIVE
545
+
546
+ if not self .peer_cluster_relation and not self .runs_broker :
547
+ return Status .NO_PEER_CLUSTER_RELATION
548
+
549
+ if not self .peer_cluster .broker_connected_kraft_mode :
550
+ return Status .NO_BROKER_DATA
551
+
552
+ return Status .ACTIVE
553
+
554
+ @property
555
+ def kraft_mode (self ) -> bool | None :
556
+ """Is the deployment running in KRaft mode?
557
+
558
+ Returns:
559
+ True if Kraft mode, False if ZooKeeper, None when undefined.
560
+ """
561
+ # NOTE: self.roles when running colocated, peer_cluster.roles when multiapp
562
+ if CONTROLLER .value in (self .roles + self .peer_cluster .roles ):
563
+ return True
564
+ if self .zookeeper_relation :
565
+ return False
566
+
567
+ # FIXME raise instead of none. `not kraft_mode` is falsy
568
+ # NOTE: if previous checks are not met, we don't know yet how the charm is being deployed
569
+ return None
570
+
496
571
@property
497
572
def runs_balancer (self ) -> bool :
498
573
"""Is the charm enabling the balancer?"""
@@ -502,3 +577,8 @@ def runs_balancer(self) -> bool:
502
577
def runs_broker (self ) -> bool :
503
578
"""Is the charm enabling the broker(s)?"""
504
579
return BROKER .value in self .roles
580
+
581
+ @property
582
+ def runs_controller (self ) -> bool :
583
+ """Is the charm enabling the controller?"""
584
+ return CONTROLLER .value in self .roles
0 commit comments