Advertisement
  1. Code
  2. Python
  3. Django

Using Celery With Django for Background Task Processing

Scroll to top
Read Time: 18 min

Web applications usually start out simple but can become quite complex, and most of them quickly exceed the responsibility of only responding to HTTP requests.

When that happens, one must make a distinction between what has to happen instantly (usually in the HTTP request lifecycle) and what can happen eventually. Why is that? Well, because when your application becomes overloaded with traffic, simple things like this make the difference.

Operations in a web application can be classified as critical or request-time operations and background tasks, the ones that happen outside request time. These map to the ones described above:

  • needs to happen instantly: request-time operations
  • needs to happen eventually: background tasks

Request-time operations can be done on a single request/response cycle without worrying that the operation will time out or that the user might have a bad experience. Common examples include CRUD (Create, Read, Update, Delete) database operations and user management (Login/Logout routines).

Background tasks are different as they are usually quite time-consuming and are prone to failure, mostly due to external dependencies. Some common scenarios among complex web applications include:

  • sending confirmation or activity emails
  • daily crawling and scraping some information from various sources and storing them
  • performing data analysis
  • deleting unneeded resources
  • exporting documents/photos in various formats

Background tasks are the main focus of this tutorial. The most common programming pattern used for this scenario is the Producer Consumer Architecture.

In simple terms, this architecture can be described like this:

  • Producers create data or tasks.
  • Tasks are put into a queue that is referred to as the task queue.
  • Consumers are responsible for consuming the data or running the tasks.

Usually, the consumers retrieve tasks from the queue in a first-in-first-out (FIFO) fashion or according to their priorities. The consumers are also referred to as workers, and that is the term we will be using throughout, as it is consistent with the terminology used by the technologies discussed.

What kind of tasks can be processed in the background? Tasks that:

  • are not essential for the basic functionality of the web application
  • can't be run in the request/response cycle since they are slow (I/O intensive, etc.)
  • depend on external resources that might not be available or not behave as expected
  • might need to be retried at least once
  • have to be executed on a schedule

Celery is the de facto choice for doing background task processing in the Python/Django ecosystem. It has a simple and clear API, and it integrates beautifully with Django. It supports various technologies for the task queue and various paradigms for the workers.

In this tutorial, we're going to create a Django toy web application (dealing with real-world scenarios) that uses background task processing.

Setting Things Up

Assuming you are already familiar with Python package management and virtual environments, let's install Django:

1
$ pip install Django

I've decided to build yet another blogging application. The focus of the application will be on simplicity. A user can simply create an account and without too much fuss can create a post and publish it to the platform.

Set up the quick_publisher Django project:

1
$ django-admin startproject quick_publisher

Let's get the app started:

1
$ cd quick_publisher 
2
$ ./manage.py startapp main

When starting a new Django project, I like to create a main application that contains, among other things, a custom user model. More often than not, I encounter limitations of the default Django User model. Having a custom User model gives us the benefit of flexibility.

1
# main/models.py

2
3
from django.db import models
4
from django.contrib.auth.models import AbstractBaseUser,PermissionsMixin,BaseUserManager
5
6
# Create your models here.

7
8
class UserAccountManager(BaseUserManager):
9
    def create_user(self, email, password=None, **extra_fields):
10
        if not email:
11
            raise ValueError('Users must have an email address')
12
13
        user = self.model(
14
            email=self.normalize_email(email), **extra_fields
15
        )
16
17
        user.set_password(password)
18
        user.save(using=self._db)
19
        return user
20
 
21
    def create_superuser(self, email, password=None,**extra_fields):
22
        user = self.create_user(
23
            email,
24
            password=password,**extra_fields
25
            
26
        )
27
        user.is_admin = True
28
        user.save(using=self._db)
29
        return user
30
31
    
32
class MyUser(AbstractBaseUser):
33
    email = models.EmailField(
34
        verbose_name='email address',
35
        max_length=255,
36
        unique=True,
37
    )
38
    first_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
39
    last_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
40
    is_active = models.BooleanField(default=True)
41
    is_admin = models.BooleanField(default=False)
42
43
    objects = UserAccountManager()
44
45
    USERNAME_FIELD = 'email'
46
    REQUIRED_FIELDS = ['first_name','last_name']
47
48
    def __str__(self):
49
        return self.email
50
51
    def has_perm(self, perm, obj=None):
52
        "Does the user have a specific permission?"
53
        # Simplest possible answer: Yes, always

54
        return True
55
56
    def has_module_perms(self, app_label):
57
        "Does the user have permissions to view the app `app_label`?"
58
        # Simplest possible answer: Yes, always

59
        return True
60
61
    @property
62
    def is_staff(self):
63
        "Is the user a member of staff?"
64
        # Simplest possible answer: All admins are staff

65
        return self.is_admin
66
67
 

Make sure to check out the Django documentation if you are not familiar with how custom user models work.

Now we need to tell Django to use this User model instead of the default one. Add this line to the quick_publisher/settings.py file:

1
AUTH_USER_MODEL = 'main.User'

We also need to add the main application to the INSTALLED_APPS list in the quick_publisher/settings.py file.

1
INSTALLED_APPS = [
2
    'django.contrib.admin',
3
    'django.contrib.auth',
4
    'django.contrib.contenttypes',
5
    'django.contrib.sessions',
6
    'django.contrib.messages',
7
    'django.contrib.staticfiles',
8
    'main',
9
]

We can now create the migrations, apply them, and create a superuser to be able to log in to the Django admin panel:

1
$ ./manage.py makemigrations main 
2
$ ./manage.py migrate 
3
$ ./manage.py createsuperuser

Let's now create a separate Django application that's responsible for posts:

1
$ ./manage.py startapp publish

Let's define a simple Post model in publish/models.py:

1
from django.db import models
2
from django.utils import timezone
3
from django.contrib.auth import get_user_model
4
5
6
class Post(models.Model):
7
    author = models.ForeignKey(get_user_model())
8
    created = models.DateTimeField('Created Date', default=timezone.now)
9
    title = models.CharField('Title', max_length=200)
10
    content = models.TextField('Content')
11
    slug = models.SlugField('Slug')
12
13
    def __str__(self):
14
        return '"%s" by %s' % (self.title, self.author)

Hooking the Post model with the Django admin is done in the publish/admin.py file like this:

1
from django.contrib import admin
2
from .models import Post
3
4
5
@admin.register(Post)
6
class PostAdmin(admin.ModelAdmin):
7
    pass

Finally, let's hook the publish application with our project by adding it to the INSTALLED_APPS list.

1
INSTALLED_APPS = [
2
    'django.contrib.admin',
3
    'django.contrib.auth',
4
    'django.contrib.contenttypes',
5
    'django.contrib.sessions',
6
    'django.contrib.messages',
7
    'django.contrib.staticfiles',
8
    'main',
9
    'publish',
10
]

We can now run the server and head over to https://localhost:8000/admin/ and create our first posts so that we have something to play with:

1
$ ./manage.py runserver

I trust you've done your homework and you've created the posts.

Let's move on. The next obvious step is to create a way to view the published posts.

1
# publish/views.py

2
3
from django.http import Http404
4
from django.shortcuts import render
5
from .models import Post
6
7
8
def view_post(request, slug):
9
    try:
10
        post = Post.objects.get(slug=slug)
11
    except Post.DoesNotExist:
12
        raise Http404("Poll does not exist")
13
14
    return render(request, 'post.html', context={'post': post})

Let's associate our new view with an URL in quick_publish/urls.py.

1
from django.contrib import admin
2
from django.urls import path,include
3
from publish.views import view_post
4
5
6
7
urlpatterns = [
8
    path('admin/', admin.site.urls),
9
    path('<slug:slug>',view_post,name='view_post'),
10
11
]

Finally, let's create the template that renders the post in: publish/templates/publish/post.html.

1
<!DOCTYPE html>
2
<html>
3
<head lang="en">
4
    <meta charset="UTF-8">
5
    <title></title>
6
</head>
7
<body>
8
    <h1>{{ post.title }}</h1>
9
    <p>{{ post.content }}</p>
10
    <p>Published by {{ post.author.first_name }} on {{ post.created }}</p>
11
</body>
12
</html>

We can now head to http://localhost:8000/the-slug-of-the-post-you-created/ in the browser.

my first postmy first postmy first post

It's not exactly a miracle of web design, but making good-looking posts is beyond the scope of this tutorial.

Sending Confirmation Emails

Here's the classic scenario:

  • You create an account on a platform.
  • You provide an email address to be uniquely identified on the platform.
  • The platform checks you are indeed the owner of the email address by sending an email with a confirmation link.
  • Until you perform the verification, you are not able to (fully) use the platform.

Let's add an is_verified flag and the verification_uuid to the User model:

1
# main/models.py

2
import uuid
3
4
5
class MyUser(AbstractBaseUser):
6
    email = models.EmailField(verbose_name='email address',max_length=255,unique=True, )
7
    first_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
8
    last_name = models.CharField(verbose_name='first name', max_length=30, blank=True)
9
    is_verified = models.BooleanField(verbose_name = 'verified', default=False) 
10
    is_active = models.BooleanField(default=True)
11
    is_admin = models.BooleanField(default=False)
12
    verification_uuid = models.UUIDField(verbose_name ='Unique Verification UUID', default=uuid.uuid4)
13

Let's use this occasion to add the User model to the admin:

1
from django.contrib import admin
2
from .models import User
3
4
5
@admin.register(User)
6
class UserAdmin(admin.ModelAdmin):
7
    pass

Let's make the changes reflect in the database:

1
$ ./manage.py makemigrations 
2
$ ./manage.py migrate

We now need to write a piece of code that sends an email when a user instance is created. This is what Django signals are for, and this is a perfect occasion to touch this subject.

Signals are fired before/after certain events occur in the application. We can define callback functions that are triggered automatically when the signals are fired. To make a callback trigger, we must first connect it to a signal.

We're going to create a callback that will be triggered after a User model has been created. We'll add this code after the User model definition in: main/models.py

1
from django.db.models import signals
2
from django.core.mail import send_mail
3
from django.urls import reverse
4
5
6
def user_post_save(sender, instance, signal, *args, **kwargs):
7
    if not instance.is_verified:
8
        # Send verification email

9
        send_mail(
10
            'Verify your QuickPublisher account',
11
            'Follow this link to verify your account: '
12
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(instance.verification_uuid)}),
13
            'from@quickpublisher.dev',
14
            [instance.email],
15
            fail_silently=False,
16
        )
17
18
signals.post_save.connect(user_post_save, sender=User)

Here we've defined a user_post_save function and connected it to the post_save signal (one that is triggered after a model has been saved) sent by the User model.

Django doesn't just send emails out on its own; it needs to be tied to an email service. For the sake of simplicity, you can add your Gmail credentials in quick_publisher/settings.py, or you can add your favourite email provider.

Here's what the Gmail configuration looks like:

1
EMAIL_USE_TLS = True
2
EMAIL_HOST = 'smtp.gmail.com'
3
EMAIL_HOST_USER = '<YOUR_GMAIL_USERNAME>@gmail.com'
4
EMAIL_HOST_PASSWORD = '<YOUR_GMAIL_PASSWORD>'
5
EMAIL_PORT = 587

To test things out, go into the admin panel and create a new user with a valid email address you can quickly check. If all went well, you'll receive an email with a verification link.

The verification routine is not ready yet. Here's how to verify the account:

1
from django.shortcuts import render,redirect
2
from django.http import Http404
3
from .models import MyUser
4
5
# Create your views here.

6
7
 
8
def home(request):
9
    return render(request, 'main/home.html')
10
 
11
 
12
def verify(request, uuid):
13
    try:
14
        user = MyUser.objects.get(verification_uuid=uuid, is_verified=False)
15
    except MyUser.DoesNotExist:
16
        raise Http404("User does not exist or is already verified")
17
 
18
    user.is_verified = True
19
    user.save()
20
 
21
    return redirect('home')

Hook the views up in quick_publish/urls.py.

1
# quick_publish/urls.py

2
3
from django.contrib import admin
4
from django.urls import path,include
5
6
from publish.views import view_post
7
from main.views import home,verify
8
9
10
11
urlpatterns = [
12
    path('admin/', admin.site.urls),
13
    path('',home, name = 'home'),
14
    path('<slug:slug>',view_post),
15
    path('verify/<uuid>',verify, name ='verify'),
16
17
]

Also, remember to create a home.html file under main/templates/main/home.html. It will be rendered by the home view.

Try to run the entire scenario all over again. If all is well, you'll receive an email with a valid verification URL.

email verificationemail verificationemail verification

If you follow the URL and then check in the admin, you can see how the account has been verified.

Sending Emails Asynchronously

Here's the problem with what we've done so far. You might have noticed that creating a user is a bit slow. That's because Django sends the verification email inside the request time.

This is how it works: we send the user data to the Django application. The application creates a User model and then creates a connection to Gmail (or another service you selected). Django waits for the response, and only then does it return a response to our browser.

Here is where Celery comes in. First, make sure it is installed:

1
$ pip install Celery

We now need to create a Celery application in our Django application:

1
# quick_publish/celery.py

2
3
import os
4
from celery import Celery
5
6
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')
7
8
app = Celery('quick_publisher')
9
app.config_from_object('django.conf:settings')
10
11
# Load task modules from all registered Django app configs.

12
app.autodiscover_tasks()

Celery is a task queue. It receives tasks from our Django application, and it will run them in the background. Celery needs to be paired with other services that act as brokers.

Brokers intermediate the sending of messages between the web application and Celery. In this tutorial, we'll be using Redis. Redis is easy to install, and we can easily get started with it without too much fuss.

You can install Redis by following the instructions on the Redis Quick Start page. You'll need to install the Redis Python library, pip install redis, and the bundle necessary for using Redis and Celery: pip install celery[redis].

Start the Redis server in a separate console like this: $ redis-server

Let's add the Celery/Redis related configs into quick_publisher/settings.py:

1
# REDIS related settings 

2
REDIS_HOST = 'localhost' 
3
REDIS_PORT = '6379' 
4
BROKER_URL = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0' 
5
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 3600} 
6
CELERY_RESULT_BACKEND = 'redis://' + REDIS_HOST + ':' + REDIS_PORT + '/0'

Before anything can be run in Celery, it must be declared as a task. Here's how to do this:

1
# main/tasks.py

2
3
import logging
4
5
from django.urls import reverse
6
from django.core.mail import send_mail
7
from django.contrib.auth import get_user_model
8
from quick_publisher.celery import app
9
10
11
@app.task
12
def send_verification_email(user_id):
13
    UserModel = get_user_model()
14
    try:
15
        user = UserModel.objects.get(pk=user_id)
16
        send_mail(
17
            'Verify your QuickPublisher account',
18
            'Follow this link to verify your account: '
19
                'http://localhost:8000%s' % reverse('verify', kwargs={'uuid': str(user.verification_uuid)}),
20
            'from@quickpublisher.dev',
21
            [user.email],
22
            fail_silently=False,
23
        )
24
    except UserModel.DoesNotExist:
25
        logging.warning("Tried to send verification email to non-existing user '%s'" % user_id)

What we've done here is this: we moved the sending verification email functionality into another file called tasks.py.

A few notes:

  • The name of the file is important. Celery goes through all the apps in INSTALLED_APPS and registers the tasks in tasks.py files.
  • Notice how we decorated the send_verification_email function with @app.task. This tells Celery this is a task that will be run in the task queue.
  • Notice how we expect as argument user_id rather than a User object. This is because we might have trouble serializing complex objects when sending the tasks to Celery. It's best to keep them simple.

Going back to main/models.py, import the send_verification_email function and run it after a  new user is created. The signal code turns into:

1
from django.db.models import signals
2
from main.tasks import send_verification_email
3
4
5
def user_post_save(sender, instance, signal, *args, **kwargs):
6
    if not instance.is_verified:
7
        # Send verification email

8
        send_verification_email.delay(instance.pk)
9
10
signals.post_save.connect(user_post_save, sender=User)

Notice how we call the .delay method on the task object. This means we're sending the task off to Celery and we don't wait for the result. If we used send_verification_email(instance.pk) instead, we would still be sending it to Celery, but would be waiting for the task to finish, which is not what we want.

Before you start creating a new user, there's a catch. Celery is a service, and we need to start it. Open a new console, make sure you activate the appropriate virtualenv, and navigate to the project folder.

1
$ celery -A quick_publisher.celery worker --loglevel=debug --concurrency=4

This starts four Celery process workers. You should see something like this:

1
[2022-09-23 14:56:17,565: INFO/MainProcess] celery@vaati-Yoga-9-14ITL5 ready.

Yes, now you can finally go and create another user. Notice how there's no delay, and make sure to watch the logs in the Celery console and see if the tasks are properly executed. This should look something like this:

1
[2022-09-23 14:58:38,165: INFO/MainProcess] Task main.tasks.send_verification_email[4f8f8455-3a61-48d2-b02f-ad6786b362e1] received
2
[2022-09-23 14:58:42,228: INFO/ForkPoolWorker-4] Task main.tasks.send_verification_email[4f8f8455-3a61-48d2-b02f-ad6786b362e1] succeeded in 4.0618907359967125s: None

Periodic Tasks With Celery

Here's another common scenario. Most mature web applications send their users lifecycle emails in order to keep them engaged. Some common examples of lifecycle emails:

  • monthly reports
  • activity notifications (likes, friendship requests, etc.)
  • reminders to accomplish certain actions ("Don't forget to activate your account")

Here's what we're going to do in our app. We're going to count how many times every post has been viewed and send a daily report to the author. Once every single day, we're going to go through all the users, fetch their posts, and send an email with a table containing the posts and view counts.

Let's change the Post model so that we can accommodate the view counts scenario.

1
class Post(models.Model):
2
    author = models.ForeignKey(get_user_model(),on_delete= models.CASCADE)
3
    created = models.DateTimeField('Created Date', default=timezone.now)
4
    title = models.CharField('Title', max_length=200)
5
    content = models.TextField('Content')
6
    slug = models.SlugField('Slug')
7
    view_count = models.IntegerField("View Count", default=0)
8
9
    def get_absolute_url(self):
10
        return reverse("home", args=[str(self.id)])
11
 
12
    def __str__(self):
13
        return '"%s" by %s' % (self.title, self.author)

As always, when we change a model, we need to migrate the database:

1
$ ./manage.py makemigrations 
2
3
$ ./manage.py migrate

Let's also modify the view_post Django view to count views:

1
def view_post(request, slug):
2
    try:
3
        post = Post.objects.get(slug=slug)
4
    except Post.DoesNotExist:
5
        raise Http404("Poll does not exist")
6
    
7
    post.view_count += 1
8
    post.save()
9
10
    return render(request, 'publish/post.html', context={'post': post})

It would be useful to display the view_count in the template. Add this <p>Viewed {{ post.view_count }} times</p> somewhere inside the publisher/templates/post.html file. Do a few views on a post now and see how the counter increases.

post with view countpost with view countpost with view count

Let's create a Celery task. Since it is about posts, I'm going to place it in publish/tasks.py:

1
from django.template import Template, Context
2
from django.core.mail import send_mail
3
from django.contrib.auth import get_user_model
4
from quick_publisher.celery import app
5
from publish.models import Post
6
7
8
REPORT_TEMPLATE = """

9
Here's how you did till now:

10


11
{% for post in posts %}

12
        "{{ post.title }}": viewed {{ post.view_count }} times |

13


14
{% endfor %}

15
"""
16
17
18
@app.task
19
def send_view_count_report():
20
    for user in get_user_model().objects.all():
21
        posts = Post.objects.filter(author=user)
22
        if not posts:
23
            continue
24
25
        template = Template(REPORT_TEMPLATE)
26
27
        send_mail(
28
            'Your QuickPublisher Activity',
29
            template.render(context=Context({'posts': posts})),
30
            'from@quickpublisher.dev',
31
            [user.email],
32
            fail_silently=False,
33
        )

Every time you make changes to the Celery tasks, remember to restart the Celery process. Celery needs to discover and reload tasks. Before creating a periodic task, we should test this out in the Django shell to make sure everything works as intended:

1
$ ./manage.py shell 
2
3
In [1]: from publish.tasks import send_view_count_report 
4
5
In [2]: send_view_count_report.delay()

Hopefully, you received a nifty little report in your email.

Let's now create a periodic task. Open up quick_publisher/celery.py and register the periodic tasks:

1
# quick_publisher/celery.py

2
3
import os
4
from celery import Celery
5
from celery.schedules import crontab
6
7
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'quick_publisher.settings')
8
9
app = Celery('quick_publisher')
10
app.config_from_object('django.conf:settings')
11
12
# Load task modules from all registered Django app configs.

13
app.autodiscover_tasks()
14
15
app.conf.beat_schedule = {
16
    'send-report-every-single-minute': {
17
        'task': 'publish.tasks.send_view_count_report',
18
        'schedule': crontab(),  # change to `crontab(minute=0, hour=0)` if you want it to run daily at midnight

19
    },
20
}

So far, we've created a schedule that will run the task publish.tasks.send_view_count_report every minute as indicated by the crontab() notation. You can also specify various Celery Crontab schedules.

Open up another console, activate the appropriate environment, and start the Celery Beat service.

1
$ celery -A quick_publisher beat

The Beat service's job is to push tasks in Celery according to the schedule. Take into account that the schedule makes the send_view_count_report task run every minute according to the setup. It's good for testing but not recommended for a real-world web application.

update email from celeryupdate email from celeryupdate email from celery

5. Making Tasks More Reliable

Tasks are often used to perform unreliable operations, operations that depend on external resources or that can easily fail due to various reasons. Here's a guideline for making them more reliable:

  • Make tasks idempotent. An idempotent task is a task that, if stopped midway, doesn't change the state of the system in any way. The task either makes full changes to the system or none at all.
  • Retry the tasks. If the task fails, it's a good idea to try it again and again until it's executed successfully. You can do this in Celery with Celery Retry. One other interesting thing to look at is the Exponential Backoff algorithm. This could come in handy when thinking about limiting unnecessary load on the server from retried tasks.

Conclusions

I hope this has been an interesting tutorial for you and a good introduction to using Celery with Django.

Here are a few conclusions we can draw:

  • It's good practice to keep unreliable and time-consuming tasks outside the request time.
  • Long-running tasks should be executed in the background by worker processes (or other paradigms).
  • Background tasks can be used for various tasks that are not critical for the basic functioning of the application.
  • Celery can also handle periodic tasks using the celery beat service.
  • Tasks can be more reliable if made idempotent and retried (maybe using exponential backoff).

This post has been updated with contributions from Esther Vaati. Esther is a software developer and writer for Envato Tuts+.

Advertisement
Did you find this post useful?
Want a weekly email summary?
Subscribe below and we’ll send you a weekly email summary of all new Code tutorials. Never miss out on learning about the next big thing.
Advertisement
Looking for something to help kick start your next project?
Envato Market has a range of items for sale to help get you started.