23 : Authorization/Permissions in Fastapi

 Authorization and authentication are 2 different topics. Authentication is related to login and authorization is related to permission. Even if a person is logged in he/she may not have the necessary permissions. Consider our job-board has 3 admins. Now, anyone who knows our endpoints may make a put request and change our post! Even worse if that person puts a delete request for each of our job post! 😭
So, we need to verify if the person making the request has the necessary permissions. In our case, we want only the job posters or the superuser to be able to modify or delete the post. Enough talk lets see code. Before that, do you remember dependencies? We had created a dependency get_db which allows us to supply database connection to each request. This time also, we are going to create a dependency to identify current_user. fapis > version1 > route_login.py
 

from datetime import timedelta

from core.config import settings
from core.hashing import Hasher
from core.security import create_access_token
from db.repository.login import get_user
from db.session import get_db
from fastapi import APIRouter
from fastapi import Depends
from fastapi import HTTPException
from fastapi import status
from fastapi.security import OAuth2PasswordRequestForm,OAuth2PasswordBearer
from jose import JWTError, jwt
from schemas.tokens import Token
from sqlalchemy.orm import Session


router = APIRouter()
...


@router.post("/token", response_model=Token)
def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)
):
    ...
    ...

 
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/login/token")  #new

#new function, It works as a dependency
def get_current_user_from_token(token: str = Depends(oauth2_scheme),db: Session=Depends(get_db)): 
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
    )
    try:
        payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
        username: str = payload.get("sub")
        print("username/email extracted is ",username)
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(username=username,db=db)
    if user is None:
        raise credentials_exception
    return user
  • We are decoding the token if it is present and we are validating the user. Once we have a user, we are simply returning it. Thus, we get access to the current user.
  • This we can use as a dependency to identify the current user.
  • We can also compare the user id of the current user with that of the one who is trying to delete a job and we can allow or disallow.

Lets integrate permissions with our post view to attach real user id instead of hardcoded user id. And we are also going to attach this dependency with our route for delete, So, that only the users who have created the post or is superuser then only can delete the job post.

from typing import List
...
from sqlalchemy.orm import Session
from db.models.users import User #new
from apis.version1.route_login import get_current_user_from_token  #new


router = APIRouter()


@router.post("/create-job/", response_model=ShowJob)
def create_job(job: JobCreate, db: Session = Depends(get_db),current_user:User = Depends(get_current_user_from_token)):  #new dependency here
    job = create_new_job(job=job, db=db, owner_id=current_user.id)
    return job



@router.delete("/delete/{id}")
def delete_job(id: int,db: Session = Depends(get_db),current_user: User = Depends(get_current_user_from_token)):
    job = retreive_job(id =id,db=db)
    if not job:
        return HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail=f"Job with {id} does not exist")
    print(job.owner_id,current_user.id,current_user.is_superuser)
    if job.owner_id == current_user.id or current_user.is_superuser:
        delete_job_by_id(id=id,db=db,owner_id=current_user.id)
        return {"msg":"Successfully deleted."}
    raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
                            detail=f"You are not permitted!!!!")

All done, lets give it a try.🙈

Final git commit : add support for authorization · nofoobar/JobBoard-Fastapi@12b168a (github.com)

Prev: 22 : Authentication … Next: 24 : Unit …