First Steps with Celery

In the previous post, we discussed that when we need multi-process or multi-server task distribution, Celery is one of the best available options. In this post going to take our first baby🍼 steps with FastAPI and Celery.

Before proceeding let's install the basic requirements, create a requirements.txt file and put the below requirements, and install them with pip install -r requirements.txt :

uvicorn==0.21.1
fastapi==0.95.0 
redis==4.5.4
celery==5.2.7
python-dotenv==1.0.0

Celery support in windows is limited. Windows users please use this pip install wheel==0.40.0 then pip install celery==3.1.25, It may work for you depending on your windows + python version.
Celery and Redis are the essential stuff, We will use dotenv to keep the project's environment variables in a separate .env file. Let's create the project structure now.

📁 ./
├─📄 config.py
├─📄 main.py
├─📄 .env
├─📄 .gitignore
├─📄 requirements.txt
└─📁 env/


Now, we can put our secrets or environment configuration settings in the .env file. Let's put two required settings, one is for broker URL and another one is to specify the result backend. Don't worry, we will explore these terminologies in a while.

# .env file
CELERY_BROKER_URL=redis://127.0.0.1:6379/0
CELERY_RESULT_BACKEND=redis://127.0.0.1:6379/0

Now, we can use the config file to accumulate the project settings in a single place. You might have this question, why can't we put these settings directly in the config.py file? That's because we will put the exception on .env outside our version control system. We can achive this by writing .env inside the .gitignore file. Another benefit is that every single developer can have their version of settings in the .env file. In a staging or production environments this broker url and result backend, in .env, could be a server's IP instead of a loopback address.

Let's proceed to config.py file now:

import os
from dotenv import load_dotenv

load_dotenv()


class Config:
	CELERY_BROKER_URL: str = os.environ.get("CELERY_BROKER_URL","redis://127.0.0.1:6379/0")
	CELERY_RESULT_BACKEND: str = os.environ.get("CELERY_RESULT_BACKEND","redis://127.0.0.1:6379/0")


settings = Config()

Before we proceed we need to make sure that our redis-server is up and working fine. The simplest way will be to use a redis image from docker. Alternatively, you can also install redis by following this guide.

docker run -p 6379:6379 --name redis_service -d redis

If you do a docker ps, you should see the redis process.

CONTAINER ID   IMAGE     STATUS         PORTS                  NAMES
80c3394ec097   redis     Up 8 seconds   6379->6379/tcp,        redis_service

You can send a ping to the terminal in docker process and make sure you get a PONG back.

docker exec -t redis_service redis-cli ping

Time to proceed to our main file where we will send our blocking task to celery to be consumed asynchronously. First of all we will create a Celery instance and instantiate it with the broker and result-backend URL. I prefer building something if the topic is interesting we ourselves find out the components. For this reason, I am not going deeper into Broker or Result backend, we will learn them as we use them. Put the below code in main.py file

import time
from fastapi import FastAPI
from celery import shared_task, Celery

from config import settings


app = FastAPI()

celery = Celery(
    __name__,
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)


@celery.task
def send_push_notification(device_token: str):
    time.sleep(10)  # simulates slow network call to firebase/sns
    with open("notification.log", mode="a") as notification_log:
        response = f"Successfully sent push notification to: {device_token}\n"
        notification_log.write(response)


@app.get("/push/{device_token}")
async def notify(device_token: str):
    send_push_notification.delay(device_token)
    return {"message": "Notification sent"}


Once this is ready, we must start the master celery process by typing celery -A main.celery worker --loglevel=info You should see the below results.

>> celery -A main.celery worker --loglevel=info
 
 -------------- [email protected] v5.2.7 (dawn-chorus)
- ** ---------- .> transport:   redis://127.0.0.1:6379/0
- ** ---------- .> results:     redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . main.send_push_notification

There are some things to note here:

  • Note the broker URL, and result backend
  • Note the name of the default queue is celery itself!
  • Note that Celery has recognized our task named send_push_notification.

To get the web request we also need to start the uvicorn server. Type: uvicorn main:app --reload

Try sending multiple api requests using Postman or by visiting the docs. You should notice that Celery handles the tasks parallelly, logs are not waiting 10 seconds for each hit. The response time should also be in milliseconds!

Note: Every time we make a change in our task function, we need to restart the celery workers.

This is a very minimal demonstration of the power of celery. We will explore it quite a lot. Stay tuned for more.

FastAPITutorial

Brige the gap between Tutorial hell and Industry. We want to bring in the culture of Clean Code, Test Driven Development.

We know, we might make it hard for you but definitely worth the efforts.

Contacts

Refunds:

Refund Policy
Social

Follow us on our social media channels to stay updated.

© Copyright 2022-23 Team FastAPITutorial