Working with Several Queues and Workers

A queue data structure stores tasks or messages in a first-in-first-out (FIFO) order. In a Celery application, a queue is used to store tasks waiting to be executed. When a worker is available, it retrieves and executes a task from the queue.

Multiple queues can be useful in scenarios where you want to prioritize certain types of tasks or separate them based on their level of importance. For example, you may want to prioritize tasks that are critical to your application's functioning or separate tasks that involve sensitive data from tasks that don't.

For instance, We can rely on the default 'celery' queue to send WhatsApp messages, send emails, and generate pdf reports. Generally, a large pdf report is going to take up a lot of time and meanwhile, our other important tasks are going to starve. One way around this is to put sending WhatsApp messages in a high-priority named queue and pdf generation tasks along with other slow tasks in the default queue.

There is nothing sort of priority that we are going to define, but we can hack priority by assigning more workers to the high-priority queue.😎
Here is our cleaned-up tasks.py file:

import time
from celery import shared_task


@shared_task(bind=True,queue="high_priority")
def send_notification(self,device_token: str):
    try:
        time.sleep(1)
    except Exception as e:
        raise self.retry(exc=e, countdown=10, max_retries=3)


@shared_task
def generate_transaction_report(user_id=1):
    time.sleep(5)
    return True

Notice the queue="high_priority" argument on the send_notification's decorator function. Whenever, we are going to delay send_notification, it will be put in a queue named high_priority and only the workers that are associated with the queue can consume it. We will make sure that we create more workers/consumers for the high_priority queue.
We also need to create the api routes to trigger these tasks. Here is our cleaned main.py file.

from fastapi import FastAPI
from celery import Celery

from config import settings
from tasks import send_notification, generate_transaction_report


app = FastAPI()

celery = Celery(
    __name__,
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)


@app.get("/push/{device_token}")
async def notify(device_token: str):
    send_notification.delay(device_token)
    return {"message": "Notification sent"}


@app.get("/export/report")
async def notify():
    generate_transaction_report.delay()
    return {"message": "Generating report"}

Since the generate_transaction_report needs the default queue named "celery" and send_notification needs a "high_priority" queue, This time we are going to need 4 terminals!!
Terminal 1: uvicorn main:app --reload
Terminal 2:  watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A main.celery worker -Q celery,high_priority --loglevel=info -n default
Terminal 3: watchmedo auto-restart --directory=./ --pattern=*.py --recursive -- celery -A main.celery worker -Q high_priority --loglevel=info -n higher_priority
Terminal 4 ✈️celery -A main.celery flower --port=5555

We are being biased here and giving more number of workers to high_priority queue! Now, we can visit the flower dashboard and explore the consumers. If we send send_notification tasks using docs, We should see that the tasks are routed to the 2 high_priority workers almost equally.

 

FastAPITutorial

Brige the gap between Tutorial hell and Industry. We want to bring in the culture of Clean Code, Test Driven Development.

We know, we might make it hard for you but definitely worth the efforts.

Contacts

Refunds:

Refund Policy
Social

Follow us on our social media channels to stay updated.

© Copyright 2022-23 Team FastAPITutorial