10
10
from cryptography .hazmat .primitives .ciphers import Cipher , algorithms , modes
11
11
from cryptography .hazmat .primitives import padding
12
12
13
- from cmapi_server .constants import (
14
- MCS_DATA_PATH , MCS_SECRETS_FILENAME , MCS_SECRETS_FILE_PATH
15
- )
13
+ from cmapi_server .constants import MCS_DATA_PATH , MCS_SECRETS_FILENAME
16
14
from cmapi_server .exceptions import CEJError
17
15
18
16
@@ -26,16 +24,19 @@ class CEJPasswordHandler():
26
24
"""Handler for CrossEngineSupport password decryption."""
27
25
28
26
@classmethod
29
- def secretsfile_exists (cls ) -> bool :
30
- """Check the .secrets file in MCS_SECRETS_FILE_PATH.
27
+ def secretsfile_exists (cls , directory : str = MCS_DATA_PATH ) -> bool :
28
+ """Check the .secrets file in directory. Default MCS_SECRETS_FILE_PATH.
31
29
30
+ :param directory: path to the directory with .secrets file
31
+ :type directory: str, optional
32
32
:return: True if file exists and not empty.
33
33
:rtype: bool
34
34
"""
35
+ secrets_file_path = os .path .join (directory , MCS_SECRETS_FILENAME )
35
36
try :
36
37
if (
37
- os .path .isfile (MCS_SECRETS_FILE_PATH ) and
38
- os .path .getsize (MCS_SECRETS_FILE_PATH ) > 0
38
+ os .path .isfile (secrets_file_path ) and
39
+ os .path .getsize (secrets_file_path ) > 0
39
40
):
40
41
return True
41
42
except Exception :
@@ -49,40 +50,48 @@ def secretsfile_exists(cls) -> bool:
49
50
return False
50
51
51
52
@classmethod
52
- def get_secrets_json (cls ) -> dict :
53
+ def get_secrets_json (cls , directory : str = MCS_DATA_PATH ) -> dict :
53
54
"""Get json from .secrets file.
54
55
55
- :raises CEJError: on empty\corrupted\wrong format .secrets file
56
+ :param directory: path to the directory with .secrets file
57
+ :type directory: str, optional
56
58
:return: json from .secrets file
57
59
:rtype: dict
60
+ :raises CEJError: on empty\corrupted\wrong format .secrets file
58
61
"""
59
- if not cls .secretsfile_exists ():
60
- raise CEJError (f'{ MCS_SECRETS_FILE_PATH } file does not exist.' )
61
- with open (MCS_SECRETS_FILE_PATH ) as secrets_file :
62
+ secrets_file_path = os .path .join (directory , MCS_SECRETS_FILENAME )
63
+ if not cls .secretsfile_exists (directory = directory ):
64
+ raise CEJError (f'{ secrets_file_path } file does not exist.' )
65
+ with open (secrets_file_path ) as secrets_file :
62
66
try :
63
67
secrets_json = json .load (secrets_file )
64
68
except Exception :
65
69
logging .error (
66
70
'Something went wrong while loading json from '
67
- f'{ MCS_SECRETS_FILE_PATH } ' ,
71
+ f'{ secrets_file_path } ' ,
68
72
exc_info = True
69
73
)
70
74
raise CEJError (
71
- f'Looks like file { MCS_SECRETS_FILE_PATH } is corrupted or'
75
+ f'Looks like file { secrets_file_path } is corrupted or'
72
76
'has wrong format.'
73
77
) from None
74
78
return secrets_json
75
79
76
80
@classmethod
77
- def decrypt_password (cls , enc_data : str ) -> str :
81
+ def decrypt_password (
82
+ cls , enc_data : str , directory : str = MCS_DATA_PATH
83
+ ) -> str :
78
84
"""Decrypt CEJ password if needed.
79
85
86
+ :param directory: path to the directory with .secrets file
87
+ :type directory: str, optional
80
88
:param enc_data: encrypted initialization vector + password in hex str
81
89
:type enc_data: str
82
90
:return: decrypted CEJ password
83
91
:rtype: str
84
92
"""
85
- if not cls .secretsfile_exists ():
93
+ secrets_file_path = os .path .join (directory , MCS_SECRETS_FILENAME )
94
+ if not cls .secretsfile_exists (directory = directory ):
86
95
logging .warning ('Unencrypted CrossEngineSupport password used.' )
87
96
return enc_data
88
97
@@ -96,18 +105,18 @@ def decrypt_password(cls, enc_data: str) -> str:
96
105
'Non-hexadecimal number found in encrypted CEJ password.'
97
106
) from value_error
98
107
99
- secrets_json = cls .get_secrets_json ()
108
+ secrets_json = cls .get_secrets_json (directory = directory )
100
109
encryption_key_hex = secrets_json .get ('encryption_key' , None )
101
110
if not encryption_key_hex :
102
111
raise CEJError (
103
- f'Empty "encryption key" found in { MCS_SECRETS_FILE_PATH } '
112
+ f'Empty "encryption key" found in { secrets_file_path } '
104
113
)
105
114
try :
106
115
encryption_key = bytes .fromhex (encryption_key_hex )
107
116
except ValueError as value_error :
108
117
raise CEJError (
109
118
'Non-hexadecimal number found in encryption key from '
110
- f'{ MCS_SECRETS_FILE_PATH } file.'
119
+ f'{ secrets_file_path } file.'
111
120
) from value_error
112
121
cipher = Cipher (
113
122
algorithms .AES (encryption_key ),
@@ -125,21 +134,34 @@ def decrypt_password(cls, enc_data: str) -> str:
125
134
return passwd_bytes .decode ()
126
135
127
136
@classmethod
128
- def encrypt_password (cls , passwd : str ) -> str :
129
- iv = os .urandom (size = AES_IV_BIN_SIZE )
137
+ def encrypt_password (
138
+ cls , passwd : str , directory : str = MCS_DATA_PATH
139
+ ) -> str :
140
+ """Encrypt CEJ password.
141
+
142
+ :param directory: path to the directory with .secrets file
143
+ :type directory: str, optional
144
+ :param passwd: CEJ password
145
+ :type passwd: str
146
+ :return: encrypted CEJ password in uppercase hex format
147
+ :rtype: str
148
+ """
130
149
131
- secrets_json = cls .get_secrets_json ()
150
+ secrets_file_path = os .path .join (directory , MCS_SECRETS_FILENAME )
151
+ iv = os .urandom (AES_IV_BIN_SIZE )
152
+
153
+ secrets_json = cls .get_secrets_json (directory = directory )
132
154
encryption_key_hex = secrets_json .get ('encryption_key' )
133
155
if not encryption_key_hex :
134
156
raise CEJError (
135
- f'Empty "encryption key" found in { MCS_SECRETS_FILE_PATH } '
157
+ f'Empty "encryption key" found in { secrets_file_path } '
136
158
)
137
159
try :
138
160
encryption_key = bytes .fromhex (encryption_key_hex )
139
161
except ValueError as value_error :
140
162
raise CEJError (
141
163
'Non-hexadecimal number found in encryption key from '
142
- f'{ MCS_SECRETS_FILE_PATH } file.'
164
+ f'{ secrets_file_path } file.'
143
165
) from value_error
144
166
cipher = Cipher (
145
167
algorithms .AES (encryption_key ),
@@ -151,8 +173,8 @@ def encrypt_password(cls, passwd: str) -> str:
151
173
padded_data = padder .update (passwd .encode ()) + padder .finalize ()
152
174
153
175
encrypted_data = encryptor .update (padded_data ) + encryptor .finalize ()
154
-
155
- return iv + encrypted_data
176
+ encrypted_passwd_bytes = iv + encrypted_data
177
+ return encrypted_passwd_bytes . hex (). upper ()
156
178
157
179
@classmethod
158
180
def generate_secrets_data (cls ) -> dict :
@@ -161,8 +183,8 @@ def generate_secrets_data(cls) -> dict:
161
183
:return: secrets data
162
184
:rtype: dict
163
185
"""
164
- key_length = algorithms . AES256 . key_size // 8
165
- encryption_key = os .urandom (size = key_length )
186
+ key_length = 32 # AES256 key_size
187
+ encryption_key = os .urandom (key_length )
166
188
encryption_key_hex = binascii .hexlify (encryption_key ).decode ()
167
189
secrets_dict = {
168
190
'description' : 'Columnstore CrossEngineSupport password encryption/decryption key' ,
@@ -174,41 +196,48 @@ def generate_secrets_data(cls) -> dict:
174
196
175
197
@classmethod
176
198
def save_secrets (
177
- cls , secrets : dict , filepath : str = MCS_SECRETS_FILE_PATH ,
199
+ cls , secrets : dict , directory : str = MCS_DATA_PATH ,
178
200
owner : str = 'mysql'
179
201
) -> None :
180
202
"""Write secrets to .secrets file.
181
203
204
+ :param directory: path to the directory with .secrets file
205
+ :type directory: str, optional
182
206
:param secrets: secrets dict
183
207
:type secrets: dict
184
208
:param filepath: path to the .secrets file
185
209
:type filepath: str, optional
186
210
:param owner: owner of the file
187
211
:type owner: str, optional
188
212
"""
189
- if cls .secretsfile_exists ():
213
+ secrets_file_path = os .path .join (directory , MCS_SECRETS_FILENAME )
214
+ if cls .secretsfile_exists (directory = directory ):
190
215
copyfile (
191
- filepath ,
216
+ secrets_file_path ,
192
217
os .path .join (
193
- os . path . dirname ( filepath ) ,
194
- f'{ os . path . basename ( filepath ) } .cmapi.save'
218
+ directory ,
219
+ f'{ MCS_SECRETS_FILENAME } .cmapi.save'
195
220
)
196
221
)
197
222
198
223
try :
199
224
with open (
200
- MCS_SECRETS_FILE_PATH , 'w' , encoding = 'utf-8'
225
+ secrets_file_path , 'w' , encoding = 'utf-8'
201
226
) as secrets_file :
202
227
json .dump (secrets , secrets_file )
203
228
except Exception as exc :
204
229
raise CEJError (f'Write to .secrets file failed.' ) from exc
205
230
206
231
try :
207
- os .chmod (MCS_SECRETS_FILE_PATH , stat .S_IRUSR )
232
+ os .chmod (secrets_file_path , stat .S_IRUSR )
208
233
userinfo = pwd .getpwnam (owner )
209
- os .chown (MCS_SECRETS_FILE_PATH , userinfo .pw_uid , userinfo .pw_gid )
210
- logging .debug (f'Permissions of .secrets file set to { owner } :read.' )
211
- logging .debug (f'Ownership of .secrets file given to { owner } .' )
234
+ os .chown (secrets_file_path , userinfo .pw_uid , userinfo .pw_gid )
235
+ logging .debug (
236
+ f'Permissions of { secrets_file_path } file set to { owner } :read.'
237
+ )
238
+ logging .debug (
239
+ f'Ownership of { secrets_file_path } file given to { owner } .'
240
+ )
212
241
except Exception as exc :
213
242
raise CEJError (
214
243
f'Failed to set permissions or ownership for .secrets file.'
0 commit comments