Writing a scheduler using Django and CeleryBeat, CronTab

Today we’re diving into something super practical—how to build a medicine reminder app using Django, Celery-Beat, and Docker. If you’ve been dabbling with Django and Celery, you know how crucial task scheduling can be. Sending payment reminders, scheduling notifications just like Jira, where you get bombarded by “overdue tickets” and much much more. And if you’re like me, you’ve probably been stuck in the weeds of setting up periodic tasks at some point. But, let’s simplify it with Celery-Beat and make it… almost fun.

Scheduling things are usually done via Crontabs. And if you’ve ever tried managing crontabs directly on Linux, you know it’s an absolute pain. You have to set them up using cryptic syntax, something like 0 9 * * *, which basically means “run this at 9 AM every day.” But, oh boy, miss one asterisk or forget to restart your cron service and nothing works. 🙄

Setting up Celery and Celery-Beat as services with Systemd is another headache. You’ve got to configure celery.service and celery-beat.service files, ensuring they’re running smoothly with Redis as the broker. And let’s not forget, each time you make a small change, you have to restart services, check logs, and deal with errors like:

Failed to start celery.service: Unit celery.service not found.

With Celery-Beat, we avoid the whole mess of manually writing and managing crontabs. The scheduling is baked into Django itself, and Celery-Beat takes care of the rest. Plus, it’s integrated with your database, so you can easily query schedules and update them without editing cryptic files. No more SSHing into your server and praying you didn’t mess up a cron job! 🙌

I've failed a million times almost, trying to setup systemmd services, and I've told myself “I aint no sys admin, I am a pythonista, I like to just pip my features”

Well, hopefully this blog helps you set up an app for yourself, maybe its a calendar app, or a simple reminder app for anything.

We’re going to build a simple medicine reminder app. My grandma takes a lots of meds, and even though we take care of her, sometimes she forgets, or sometimes we do. ITs kinda messy. So with the help of this app, we are gonna send scheduled reminders.

But first, lets summon Docker.

We’re going to containerize everything as they save us from all the “it works on my machine” nonsense. Let’s get Celery Beat and Celery workers running in Docker.

Here is the link to the dockerfile btw. Which is also required

Here’s what our docker-compose.yml might look like:

version: '3'

services:
  redis:
    image: “redis:alpine”

  celery:
    build: .
    command: celery -A reminder_app worker -l info
    volumes:
      - .:/app
    depends_on:
      - redis

  celery-beat:
    build: .
    command: celery -A reminder_app beat -l info
    volumes:
      - .:/app
    depends_on:
      - redis

  django:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/app
    depends_on:
      - redis

This sets us up with Redis as the broker, and both Celery and Celery-Beat containers, which handle task execution and scheduling respectively. I want to address one thing here, since I was a dummy most of my life, I am assuming someone might ask, why this redis?

Simply put, Redis acts as a message broker. When you schedule a task, Celery uses Redis to store the task and ensure that it gets executed at the right time. Without Redis (or another broker like RabbitMQ), Celery wouldn’t know what to do with all these tasks. It’s basically the “middleman” that holds tasks in a queue until they’re picked up by the Celery worker to be executed. So yeah, no Redis, no task execution.

Medicine Reminder Model

We’re building a reminder app, so we need a way to schedule when the reminders should go off. Enter the Recurrence model, which handles the schedule of your reminders. We’ll use the django_celery_beat library to manage periodic tasks (schedules).

Here’s a simplified version of the Recurrence model:

import calendar
import zoneinfo
from django.db import models
from django_celery_beat.models import CrontabSchedule, IntervalSchedule

TIMEZONE = zoneinfo.ZoneInfo('Asia/Kolkata')

class Recurrence(models.Model):
    repeat_choices = [
        ('none', 'Does not repeat'),
        ('daily', 'Daily'),
        ('weekly', 'Weekly'),
        ('monthly', 'Monthly'),
        ('yearly', 'Yearly')
    ]
    repeat = models.CharField(max_length=10, choices=repeat_choices, default='none')
    repeat_every = models.PositiveIntegerField(blank=True, null=True)
    days_of_week = models.CharField(max_length=255, blank=True, null=True)
    day_of_month = models.PositiveIntegerField(blank=True, null=True)
    month_of_year = models.PositiveIntegerField(blank=True, null=True)
    minute = models.PositiveIntegerField(default=0)
    hour = models.PositiveIntegerField(default=9)  # default: 9AM
    
    def __str__(self):
        return f“Reminder set for {self.description}”

    @property
    def description(self):
        # Return a friendly string describing the schedule
        if self.repeat == 'daily':
            return “Daily at {}:{}”.format(self.hour, self.minute)
        elif self.repeat == 'weekly':
            days = self.days_of_week or 'every day'
            return “Weekly on {} at {}:{}”.format(days, self.hour, self.minute)
        return “Custom schedule”

This gives us the flexibility to schedule reminders daily, weekly, monthly, or even yearly. You can specify the time, day of the week, or day of the month when the reminder should be sent.

This model will include a get_schedule function, which is gonna return the appropriate schedule. This is the part that tells Celery-Beat how to handle the recurrence schedule based on the user’s input. Without it, Celery wouldn’t know what the schedule looks like.

def get_schedule(self):
    if self.repeat == 'daily':
        # Default to all days if not specified
        days = self.days_of_week or '0,1,2,3,4,5,6'
        return CrontabSchedule.objects.get_or_create(
            minute=self.minute,
            hour=self.hour,
            day_of_week=days,
            timezone=TIMEZONE
        )
    elif self.repeat == 'weekly':
        if not self.days_of_week:
            raise ValueError(“Days of week must be provided for weekly recurrence”)
        return CrontabSchedule.objects.get_or_create(
            minute=self.minute,
            hour=self.hour,
            day_of_week=self.days_of_week,
            timezone=TIMEZONE
        )
    elif self.repeat == 'monthly':
        day_of_month = self.day_of_month or 1
        return CrontabSchedule.objects.get_or_create(
            minute=self.minute,
            hour=self.hour,
            day_of_month=day_of_month,
            timezone=TIMEZONE
        )
    elif self.repeat == 'yearly':
        if not self.day_of_month or not self.month_of_year:
            raise ValueError(“Both day of month and month of year must be provided for yearly recurrence”)
        return CrontabSchedule.objects.get_or_create(
            minute=self.minute,
            hour=self.hour,
            day_of_month=self.day_of_month,
            month_of_year=self.month_of_year,
            timezone=TIMEZONE
        )
    elif self.repeat_type == 'interval':
        return IntervalSchedule.objects.get_or_create(
            every=self.interval,
            period=IntervalSchedule.HOURS,
        )

    return None

Tying It to the Medicine Model

We need to associate this recurrence schedule with the medicine model itself. Here’s a simplified Medicine model that uses our Recurrence model:

class Medicine(models.Model):
    name = models.CharField(max_length=255)
    dosage = models.CharField(max_length=255)
    recurrence = models.ForeignKey(Recurrence, on_delete=models.CASCADE, blank=True, null=True)

    def __str__(self):
        return f"{self.name} - {self.dosage}"

Once the Medicine is created and the schedule is set, Celery will handle sending out reminders. But for that, we need a task to run.

Sending Notifications with Celery

Now let’s create the task to send the notification. We’ll use Celery for this, so here’s how the tasks.py could look:

from celery import shared_task
from fcm_django.models import FCMDevice

@shared_task
def send_medicine_reminder(medicine_id):
    # Get the medicine instance
    medicine = Medicine.objects.get(id=medicine_id)
    
    # Send notification
    devices = FCMDevice.objects.all()
    for device in devices:
        device.send_message(
            title=“Medicine Reminder”,
            body=f“Don’t forget to take your {medicine.name} ({medicine.dosage})!”
        )

Here we’re using FCM (Firebase Cloud Messaging) to send notifications. Simple enough, right? Every time this task is called, it will remind the user to take their medicine based on the schedule we’ve defined. Setting up of the fcm token from the user device is not discussed in this blog. It can be an email reminder as well.

Scheduling the Task with Celery-Beat

Now the real magic happens. We tie the recurrence model to Celery-Beat, so it knows when to run the send_medicine_reminder task.

In the schedule_task method of the Medicine model, we schedule a periodic task based on the recurrence:

from django_celery_beat.models import PeriodicTask

def schedule_task(self):
    if not self.recurrence or self.recurrence.repeat == 'none':
        PeriodicTask.objects.filter(name=f'medicine-{self.id}').delete()
        return

    schedule, _ = self.recurrence.get_schedule()
    PeriodicTask.objects.update_or_create(
        name=f'medicine-{self.id}',
        defaults={
            'task': 'reminder.tasks.send_medicine_reminder',
            'crontab': schedule,
            'enabled': True,
            'args': json.dumps([self.id]),
        }
    )

When a Medicine or its recurrence is deleted, we need to ensure that the corresponding Celery tasks get cleaned up too. This is where the unschedule_task function comes into play.

The purpose of this function is to remove any periodic tasks related to a Medicine instance when it’s no longer needed (e.g., if the user stops taking the medicine or deletes the reminder). Here’s how the unschedule_task method looks:

@hook(AFTER_DELETE)
def unschedule_task(self):
    # Delete the associated Celery task
    PeriodicTask.objects.filter(name=f'medicine-{self.id}').delete()

Every time a Medicine instance is deleted, this function will look for any tasks scheduled under the name medicine-{self.id} and delete them from the periodic task table. Without this cleanup, you'd have zombie tasks running for reminders that no longer exist. Optimized!

This was just a sample app idea, I've taken the concepts from what I've learned over the years. This app does not exist yet, and I might make a open source version of it. Visit banerjeerishi.comfor more!

— Rishi Banerjee
Sep 2024