1
- import requests
2
1
from fastapi import Depends , HTTPException , Security
3
2
from fastapi .security import HTTPAuthorizationCredentials , HTTPBearer
4
3
from jose import jwt
5
4
from starlette import status
6
5
from starlette .requests import Request
7
6
8
- from genotype_api .config import security_settings
7
+ from genotype_api .config import security_settings , keycloak_client
9
8
from genotype_api .database .models import User
10
9
from genotype_api .database .store import Store , get_store
11
10
from genotype_api .dto .user import CurrentUser
12
11
13
-
14
- def decode_id_token (token : str ):
15
- try :
16
- payload = jwt .decode (
17
- token ,
18
- key = requests .get (security_settings .jwks_uri ).json (),
19
- algorithms = [security_settings .algorithm ],
20
- audience = security_settings .client_id ,
21
- options = {
22
- "verify_at_hash" : False ,
23
- },
24
- )
25
- return payload
26
- except jwt .JWTError :
27
- return None
12
+ from genotype_api .exceptions import AuthenticationError
13
+ from genotype_api .services .authentication .service import AuthenticationService
28
14
29
15
30
16
class JWTBearer (HTTPBearer ):
31
- def __init__ (self , auto_error : bool = True ):
17
+ def __init__ (self , auth_service : AuthenticationService , auto_error : bool = True ):
32
18
super (JWTBearer , self ).__init__ (auto_error = auto_error )
19
+ self .auth_service = auth_service
33
20
34
21
async def __call__ (self , request : Request ):
35
22
credentials : HTTPAuthorizationCredentials = await super (JWTBearer , self ).__call__ (request )
@@ -48,19 +35,23 @@ async def __call__(self, request: Request):
48
35
49
36
def verify_jwt (self , jwtoken : str ) -> dict | None :
50
37
try :
51
- payload = decode_id_token (jwtoken )
38
+ payload : dict = self . auth_service . verify_token (jwtoken ). model_dump ( )
52
39
if payload and "email" in payload :
53
40
return {"email" : payload ["email" ]}
54
41
else :
55
42
return None
56
- except jwt . JWTError :
43
+ except AuthenticationError as error :
57
44
raise HTTPException (
58
45
status_code = status .HTTP_403_FORBIDDEN ,
59
- detail = "Invalid token or expired token. " ,
46
+ detail = f" { error } " ,
60
47
)
61
48
62
49
63
- jwt_scheme = JWTBearer ()
50
+ auth_service = AuthenticationService (
51
+ redirect_uri = security_settings .keycloak_redirect_uri ,
52
+ keycloak_client = keycloak_client ,
53
+ )
54
+ jwt_scheme = JWTBearer (auth_service = auth_service )
64
55
65
56
66
57
async def get_active_user (
0 commit comments