4
4
5
5
"""Manager for handling Kafka Kubernetes resources for a single Kafka pod."""
6
6
7
+ import json
7
8
import logging
8
- from functools import cached_property
9
+ import math
10
+ import time
11
+ from functools import cache
9
12
10
13
from lightkube .core .client import Client
11
14
from lightkube .core .exceptions import ApiError
15
18
16
19
from literals import SECURITY_PROTOCOL_PORTS , AuthMap , AuthMechanism
17
20
18
- logger = logging .getLogger (__name__ )
19
-
20
21
# default logging from lightkube httpx requests is very noisy
21
- logging .getLogger ("lightkube" ).disabled = True
22
- logging .getLogger ("lightkube.core.client" ).disabled = True
23
- logging .getLogger ("httpx" ).disabled = True
24
- logging .getLogger ("httpcore" ).disabled = True
22
+ logging .getLogger ("lightkube" ).setLevel (logging .CRITICAL )
23
+ logging .getLogger ("httpx" ).setLevel (logging .CRITICAL )
24
+ logging .getLogger ("httpcore" ).setLevel (logging .CRITICAL )
25
+
26
+ logger = logging .getLogger (__name__ )
25
27
26
28
27
29
class K8sManager :
@@ -42,54 +44,57 @@ def __init__(
42
44
"SSL" : "ssl" ,
43
45
}
44
46
45
- @cached_property
47
+ def __eq__ (self , other : object ) -> bool :
48
+ """__eq__ dunder.
49
+
50
+ Needed to get an cache hit on calls on the same method from different instances of K8sManager
51
+ as `self` is passed to methods.
52
+ """
53
+ return isinstance (other , K8sManager ) and self .__dict__ == other .__dict__
54
+
55
+ def __hash__ (self ) -> int :
56
+ """__hash__ dunder.
57
+
58
+ K8sManager needs to be hashable so that `self` can be passed to the 'dict-like' cache.
59
+ """
60
+ return hash (json .dumps (self .__dict__ , sort_keys = True ))
61
+
62
+ @property
46
63
def client (self ) -> Client :
47
64
"""The Lightkube client."""
48
65
return Client ( # pyright: ignore[reportArgumentType]
49
66
field_manager = self .pod_name ,
50
67
namespace = self .namespace ,
51
68
)
52
69
70
+ @staticmethod
71
+ def get_ttl_hash (seconds = 60 * 2 ) -> int :
72
+ """Gets a unique time hash for the cache, expiring after 2 minutes.
73
+
74
+ When 2m has passed, a new value will be created, ensuring an cache miss
75
+ and a re-loading of that K8s API call.
76
+ """
77
+ return math .floor (time .time () / seconds )
78
+
53
79
# --- GETTERS ---
54
80
55
81
def get_pod (self , pod_name : str = "" ) -> Pod :
56
82
"""Gets the Pod via the K8s API."""
57
- # Allows us to get pods from other peer units
58
- pod_name = pod_name or self .pod_name
59
-
60
- return self .client .get (
61
- res = Pod ,
62
- name = self .pod_name ,
63
- )
83
+ return self ._get_pod (pod_name , self .get_ttl_hash ())
64
84
65
- def get_node (self , pod : Pod ) -> Node :
85
+ def get_node (self , pod_name : str ) -> Node :
66
86
"""Gets the Node the Pod is running on via the K8s API."""
67
- if not pod .spec or not pod .spec .nodeName :
68
- raise Exception ("Could not find podSpec or nodeName" )
69
-
70
- return self .client .get (
71
- Node ,
72
- name = pod .spec .nodeName ,
73
- )
74
-
75
- def get_node_ip (self , node : Node ) -> str :
76
- """Gets the IP Address of the Node via the K8s API."""
77
- # all these redundant checks are because Lightkube's typing is awful
78
- if not node .status or not node .status .addresses :
79
- raise Exception (f"No status found for { node } " )
87
+ return self ._get_node (pod_name , self .get_ttl_hash ())
80
88
81
- for addresses in node .status .addresses :
82
- if addresses .type in ["ExternalIP" , "InternalIP" , "Hostname" ]:
83
- return addresses .address
84
-
85
- return ""
89
+ def get_node_ip (self , pod_name : str ) -> str :
90
+ """Gets the IP Address of the Node of a given Pod via the K8s API."""
91
+ return self ._get_node_ip (pod_name , self .get_ttl_hash ())
86
92
87
93
def get_service (self , service_name : str ) -> Service | None :
88
94
"""Gets the Service via the K8s API."""
89
- return self .client .get (
90
- res = Service ,
91
- name = service_name ,
92
- )
95
+ return self ._get_service (service_name , self .get_ttl_hash ())
96
+
97
+ # SERVICE BUILDERS
93
98
94
99
def get_node_port (
95
100
self ,
@@ -139,7 +144,7 @@ def get_bootstrap_nodeport(self, auth_map: AuthMap) -> int:
139
144
140
145
def build_bootstrap_services (self ) -> Service :
141
146
"""Builds a ClusterIP service for initial client connection."""
142
- pod = self .get_pod (pod_name = self .pod_name )
147
+ pod = self .get_pod (self .pod_name )
143
148
if not pod .metadata :
144
149
raise Exception (f"Could not find metadata for { pod } " )
145
150
@@ -231,3 +236,46 @@ def apply_service(self, service: Service) -> None:
231
236
return
232
237
else :
233
238
raise
239
+
240
+ # PRIVATE METHODS
241
+
242
+ @cache
243
+ def _get_pod (self , pod_name : str = "" , * _ ) -> Pod :
244
+ # Allows us to get pods from other peer units
245
+ pod_name = pod_name or self .pod_name
246
+
247
+ return self .client .get (
248
+ res = Pod ,
249
+ name = pod_name ,
250
+ )
251
+
252
+ @cache
253
+ def _get_node (self , pod_name : str , * _ ) -> Node :
254
+ pod = self .get_pod (pod_name )
255
+ if not pod .spec or not pod .spec .nodeName :
256
+ raise Exception ("Could not find podSpec or nodeName" )
257
+
258
+ return self .client .get (
259
+ Node ,
260
+ name = pod .spec .nodeName ,
261
+ )
262
+
263
+ @cache
264
+ def _get_node_ip (self , pod_name : str , * _ ) -> str :
265
+ # all these redundant checks are because Lightkube's typing is awful
266
+ node = self .get_node (pod_name )
267
+ if not node .status or not node .status .addresses :
268
+ raise Exception (f"No status found for { node } " )
269
+
270
+ for addresses in node .status .addresses :
271
+ if addresses .type in ["ExternalIP" , "InternalIP" , "Hostname" ]:
272
+ return addresses .address
273
+
274
+ return ""
275
+
276
+ @cache
277
+ def _get_service (self , service_name : str , * _ ) -> Service | None :
278
+ return self .client .get (
279
+ res = Service ,
280
+ name = service_name ,
281
+ )
0 commit comments