Issue
Problem
I currently have JWT dependency named jwt which makes sure it passes JWT authentication stage before hitting the endpoint like this:
sample_endpoint.py
:
from fastapi import APIRouter, Depends, Request
from JWTBearer import JWTBearer
from jwt import jwks
router = APIRouter()
jwt = JWTBearer(jwks)
@router.get("/test_jwt", dependencies=[Depends(jwt)])
async def test_endpoint(request: Request):
return True
Below is the JWT dependency which authenticate users using JWT (source: https://medium.com/datadriveninvestor/jwt-authentication-with-fastapi-and-aws-cognito-1333f7f2729e):
JWTBearer.py
from typing import Dict, Optional, List
from fastapi import HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, jwk, JWTError
from jose.utils import base64url_decode
from pydantic import BaseModel
from starlette.requests import Request
from starlette.status import HTTP_403_FORBIDDEN
JWK = Dict[str, str]
class JWKS(BaseModel):
keys: List[JWK]
class JWTAuthorizationCredentials(BaseModel):
jwt_token: str
header: Dict[str, str]
claims: Dict[str, str]
signature: str
message: str
class JWTBearer(HTTPBearer):
def __init__(self, jwks: JWKS, auto_error: bool = True):
super().__init__(auto_error=auto_error)
self.kid_to_jwk = {jwk["kid"]: jwk for jwk in jwks.keys}
def verify_jwk_token(self, jwt_credentials: JWTAuthorizationCredentials) -> bool:
try:
public_key = self.kid_to_jwk[jwt_credentials.header["kid"]]
except KeyError:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="JWK public key not found"
)
key = jwk.construct(public_key)
decoded_signature = base64url_decode(jwt_credentials.signature.encode())
return key.verify(jwt_credentials.message.encode(), decoded_signature)
async def __call__(self, request: Request) -> Optional[JWTAuthorizationCredentials]:
credentials: HTTPAuthorizationCredentials = await super().__call__(request)
if credentials:
if not credentials.scheme == "Bearer":
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Wrong authentication method"
)
jwt_token = credentials.credentials
message, signature = jwt_token.rsplit(".", 1)
try:
jwt_credentials = JWTAuthorizationCredentials(
jwt_token=jwt_token,
header=jwt.get_unverified_header(jwt_token),
claims=jwt.get_unverified_claims(jwt_token),
signature=signature,
message=message,
)
except JWTError:
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")
if not self.verify_jwk_token(jwt_credentials):
raise HTTPException(status_code=HTTP_403_FORBIDDEN, detail="JWK invalid")
return jwt_credentials
jwt.py
:
import os
import requests
from dotenv import load_dotenv
from fastapi import Depends, HTTPException
from starlette.status import HTTP_403_FORBIDDEN
from app.JWTBearer import JWKS, JWTBearer, JWTAuthorizationCredentials
load_dotenv() # Automatically load environment variables from a '.env' file.
jwks = JWKS.parse_obj(
requests.get(
f"https://cognito-idp.{os.environ.get('COGNITO_REGION')}.amazonaws.com/"
f"{os.environ.get('COGNITO_POOL_ID')}/.well-known/jwks.json"
).json()
)
jwt = JWTBearer(jwks)
async def get_current_user(
credentials: JWTAuthorizationCredentials = Depends(auth)
) -> str:
try:
return credentials.claims["username"]
except KeyError:
HTTPException(status_code=HTTP_403_FORBIDDEN, detail="Username missing")
api_key_dependency.py
(very simplified right now, it will be changed):
from fastapi import Security, FastAPI, HTTPException
from fastapi.security.api_key import APIKeyHeader
from starlette.status import HTTP_403_FORBIDDEN
async def get_api_key(
api_key_header: str = Security(api_key_header)
):
API_KEY = ... getting API KEY logic ...
if api_key_header == API_KEY:
return True
else:
raise HTTPException(
status_code=HTTP_403_FORBIDDEN, detail="Could not validate credentials"
)
Question
Depending on the situation, I would like to first check if it has API Key in the header, and if its present, use that to authenticate. Otherwise, I would like to use jwt dependency for authentication. I want to make sure that if either api-key authentication or jwt authentication passes, the user is authenticated. Would this be possible in FastAPI (i.e. having multiple dependencies and if one of them passes, authentication passed). Thank you!
Solution
Sorry, got lost with things to do
The endpoint has a unique dependency, call it check from the file check_auth
ENDPOINT
from fastapi import APIRouter, Depends, Request
from check_auth import check
from JWTBearer import JWTBearer
from jwt import jwks
router = APIRouter()
jwt = JWTBearer(jwks)
@router.get("/test_jwt", dependencies=[Depends(check)])
async def test_endpoint(request: Request):
return True
The function check will depend on two separate dependencies, one for api-key and one for JWT. If both or one of these passes, the authentication passes. Otherwise, we raise exception as shown below.
DEPENDENCY
def key_auth(api_key=Header(None)):
if not api_key:
return None
... verification logic goes here ...
def jwt(authorization=Header(None)):
if not authorization:
return None
... verification logic goes here ...
async def check(key_result=Depends(jwt_auth), jwt_result=Depends(key_auth)):
if not (key_result or jwt_result):
raise Exception
Answered By - lsabi
0 comments:
Post a Comment
Note: Only a member of this blog may post a comment.