Last time, we did notice an issue that: Logs of celery tasks simply don't appear in our application. The reason being that celery tasks are not processed in the same Python thread but rather in a completely different subprocess called a worker process which does not have the necessary context required of the logging setup.
In this one, we are going to fix it. First of all let's do some cleanup. Let's move the celery tasks to a new file tasks.py. In this way, we will be able to dilute the responsibilities of the main.py file. Here is our tasks.py
import time
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger("tasks")
@shared_task
def send_notification(device_token: str):
    logger.info("starting background task")
    time.sleep(10)  # simulates slow network call to firebase/sns
    try:
        a=11/0  #critical part that may fail, and its analysis is important
    except Exception as e:
        logger.error(f"exception while division {e}")
You might have noticed two major changes:
- We are using get_task_logger instead of the normal logger to retrieve the logging configuration and do the necessary setup required for the background task.
- Instead of using @celery.task, we are using @shared_task decorator. If we don't do this, it will lead to a circular dependency problem. The main.py will need tasks.py for the definition of the send_notification function and task.py would need celery instance imported from main.py!
Here is a tip: Instead os using logger.error, we can use logger.exception to get traceback information and much more detailed error analysis.
Time to proceed for the main.py file:
import os
import logging
from fastapi import FastAPI
from celery import Celery
from celery.signals import after_setup_logger
from config import settings
from tasks import send_notification
import os
if not os.path.exists('logs'):
   os.makedirs('logs')
logging.basicConfig(filename='logs/app.log',level=logging.INFO , format='%(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
app = FastAPI()
celery = Celery(
    __name__,
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)
@after_setup_logger.connect
def setup_celery_logger(logger, *args, **kwargs):
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    logger = logging.getLogger("tasks")
    fh = logging.FileHandler('logs/tasks.log')
    fh.setFormatter(formatter)
    logger.addHandler(fh)
@app.get("/push/{device_token}")
async def notify(device_token: str):
    logger.info("sending notification in background")
    send_notification.delay(device_token)
    return {"message": "Notification sent"}
Looks like a large file but only a few things we need to consider. Just concentrate on the setup_celery_logger() function. Celery provides us with a hook/signal that is called as soon as the initial setup is done. We are utilizing the after_setup_logger signal to define a logging configuration for our background tasks. Once this is done we can utilize it in our tasks.py file to log to the file named tasks.log
Now, we can start the uvicorn ðŸ¦„ and celery server, this time ideally you should see both the application logs as well as celery logs in the logs directory.
Unfortunately, I am not following best practices here to keep things simple. I really do believe that version 1 is better than version None. If you have the bandwidth, here are some suggestions/exercises:
- Try to use rotating file handlers with a backup count of 10.
- Use a dictconfig/file config/pydantic config to keep logging configuration information.
- Try playing with setup_celery_logger() and get_task_logger() to explore why they are essential, what if we use logging.get_logger() , What would happen if we use __name__ in get_task_logger() or any other possible combination that you can think of.
