Celery: The Complete Guide for 2026

Published February 12, 2026 · 22 min read

Celery is the standard tool for running background tasks in Python applications. When your web server needs to send emails, process uploaded files, call slow APIs, or run periodic reports, Celery lets you offload that work to separate worker processes so your users get instant responses. It handles task distribution, retries, scheduling, and result tracking across any number of machines.

This guide covers everything you need to go from zero to production: architecture, broker setup, task creation, error handling, Django and Flask integration, periodic scheduling with Beat, advanced workflows with Canvas, monitoring, and the pitfalls that trip up most teams.

1. What Is Celery

Celery is a distributed task queue that processes work asynchronously across one or more worker processes. Instead of executing a slow operation inside your request-response cycle, you hand it off to Celery. A worker picks it up, executes it, and optionally stores the result.

Common use cases include sending transactional emails, generating PDFs or reports, processing image and video uploads, syncing data with third-party APIs, running machine learning inference jobs, and executing periodic maintenance tasks like cache cleanup or database pruning.

Celery is written in Python, but it can communicate with any language through its message protocol. It supports multiple message brokers (Redis, RabbitMQ, Amazon SQS), multiple result backends (Redis, PostgreSQL, Django ORM, MongoDB), and scales horizontally by spinning up more workers.

2. Architecture: Workers, Brokers, and Result Backends

Celery has three core components:

Optionally, a result backend stores task return values and state. If you need to check whether a task succeeded or retrieve its output, you configure a result backend (Redis, database, etc.). If you only fire-and-forget, you can skip it.

# The flow: Producer -> Broker -> Worker -> Result Backend
#
# [Django/Flask App] --send_task--> [Redis/RabbitMQ]
#                                        |
#                                   [Worker 1]  [Worker 2]  [Worker N]
#                                        |
#                                   [Result Backend (optional)]

3. Installation and Setup

Install Celery with the Redis bundle, which includes the redis-py client:

pip install "celery[redis]"

Make sure Redis is running. On Linux: sudo apt install redis-server && sudo systemctl start redis. On macOS: brew install redis && brew services start redis. Or use Docker: docker run -d -p 6379:6379 redis:7-alpine.

Create your Celery application in a file called celery_app.py:

# celery_app.py
from celery import Celery

app = Celery(
    "myproject",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

# Optional configuration
app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,          # hard kill after 5 min
    task_soft_time_limit=240,     # raise SoftTimeLimitExceeded after 4 min
    worker_prefetch_multiplier=1, # fair scheduling
)

4. Creating Your First Task

Define tasks by decorating functions with @app.task:

# tasks.py
from celery_app import app
import time

@app.task
def add(x, y):
    return x + y

@app.task(name="send_welcome_email")
def send_welcome_email(user_id):
    """Send a welcome email to a new user."""
    from myapp.models import User
    from myapp.email import send_email

    user = User.objects.get(id=user_id)
    send_email(
        to=user.email,
        subject="Welcome!",
        body=f"Hi {user.name}, thanks for signing up."
    )
    return {"status": "sent", "user_id": user_id}

@app.task(bind=True)
def process_upload(self, file_path):
    """Process an uploaded file. bind=True gives access to self (the task instance)."""
    self.update_state(state="PROCESSING", meta={"step": "reading file"})
    time.sleep(2)  # simulate work
    self.update_state(state="PROCESSING", meta={"step": "resizing"})
    time.sleep(3)
    return {"file": file_path, "result": "processed"}

Start a worker to consume tasks:

celery -A celery_app worker --loglevel=info

The -A flag points to the module containing your Celery app instance. The worker connects to the broker, discovers registered tasks, and waits for messages.

5. Calling Tasks

There are three main ways to dispatch tasks:

from tasks import add, send_welcome_email

# 1. delay() — shorthand, most common
result = add.delay(4, 6)

# 2. apply_async() — full control over options
result = add.apply_async(
    args=(4, 6),
    countdown=10,           # wait 10 seconds before executing
    expires=60,             # discard if not started within 60 seconds
    queue="high-priority",  # route to a specific queue
    retry=True,
    retry_policy={"max_retries": 3, "interval_start": 1}
)

# 3. Signatures — reusable task descriptors
from celery import signature
sig = add.s(4, 6)      # creates a signature
sig.delay()             # execute it later

# calling with eta (exact time)
from datetime import datetime, timedelta
add.apply_async(args=(4, 6), eta=datetime.utcnow() + timedelta(minutes=5))

# send_welcome_email in a view
def register_user(request):
    user = User.objects.create(name="Alice", email="alice@example.com")
    send_welcome_email.delay(user.id)  # non-blocking
    return JsonResponse({"status": "registered"})

Always pass serializable arguments (IDs, strings, numbers) to tasks, not ORM objects or file handles. The task arguments get serialized to JSON and sent through the broker.

6. Task States and Results

Every task goes through a lifecycle of states. With a result backend configured, you can track them:

result = add.delay(4, 6)

# Check state
result.state    # PENDING -> STARTED -> SUCCESS (or FAILURE/RETRY)
result.ready()  # True when finished (success or failure)

# Get the return value (blocks until ready, with timeout)
value = result.get(timeout=10)   # returns 10

# Check for failure
if result.failed():
    print(result.traceback)

# Task states:
# PENDING  — task is waiting in the queue (or unknown task ID)
# STARTED  — worker picked it up (requires task_track_started=True)
# SUCCESS  — completed successfully, result stored
# FAILURE  — raised an exception
# RETRY    — task is being retried after failure
# REVOKED  — task was cancelled before execution

Avoid calling result.get() inside a web request handler. That blocks the request until the task finishes, defeating the purpose of async processing. Instead, return the task ID to the client and let them poll a status endpoint.

# Status polling pattern
# POST /api/reports/ -> returns {"task_id": "abc-123"}
# GET  /api/reports/abc-123/ -> returns {"state": "PROCESSING", "progress": 45}

from celery.result import AsyncResult

def check_task_status(request, task_id):
    result = AsyncResult(task_id)
    response = {"task_id": task_id, "state": result.state}
    if result.state == "SUCCESS":
        response["result"] = result.result
    elif result.state == "FAILURE":
        response["error"] = str(result.result)
    elif result.info:
        response["progress"] = result.info
    return JsonResponse(response)

7. Error Handling and Retries

Celery has built-in retry logic. The simplest approach is autoretry_for, which automatically retries when specific exceptions are raised:

import requests
from celery_app import app

@app.task(
    autoretry_for=(requests.ConnectionError, requests.Timeout),
    retry_backoff=True,       # exponential backoff: 1s, 2s, 4s, 8s...
    retry_backoff_max=600,    # cap at 10 minutes
    retry_jitter=True,        # add randomness to avoid thundering herd
    max_retries=5,
)
def call_external_api(endpoint, payload):
    response = requests.post(endpoint, json=payload, timeout=30)
    response.raise_for_status()
    return response.json()

For manual retry control, use self.retry() with bind=True:

@app.task(bind=True, max_retries=3)
def charge_payment(self, order_id, amount):
    try:
        result = payment_gateway.charge(order_id, amount)
        return {"order_id": order_id, "charged": amount}
    except payment_gateway.TemporaryError as exc:
        # Retry in 60 seconds
        raise self.retry(exc=exc, countdown=60)
    except payment_gateway.PermanentError as exc:
        # Don't retry — mark order as failed
        Order.objects.filter(id=order_id).update(status="payment_failed")
        raise  # task goes to FAILURE state

Handle the SoftTimeLimitExceeded exception to clean up when a task runs too long:

from celery.exceptions import SoftTimeLimitExceeded

@app.task(soft_time_limit=120)
def long_running_job(data_id):
    try:
        # process data...
        result = heavy_computation(data_id)
        return result
    except SoftTimeLimitExceeded:
        # clean up partial work
        cleanup_temp_files(data_id)
        raise  # re-raise so Celery marks it as failed

8. Celery with Django

Django integration requires three files. First, the Celery app configuration:

# myproject/celery.py
import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

app = Celery("myproject")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()  # finds tasks.py in each installed app

Import the app in your project's __init__.py:

# myproject/__init__.py
from .celery import app as celery_app
__all__ = ("celery_app",)

Add Celery settings to settings.py:

# myproject/settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = "UTC"
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 300
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

Define tasks inside your Django apps:

# orders/tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_order_confirmation(order_id):
    """shared_task doesn't need the app instance, making it reusable."""
    from orders.models import Order
    order = Order.objects.get(id=order_id)
    send_mail(
        subject=f"Order #{order.id} Confirmed",
        message=f"Your order for ${order.total} has been confirmed.",
        from_email="noreply@example.com",
        recipient_list=[order.user.email],
    )
    return {"order_id": order_id, "email": order.user.email}

# Call from a view
# orders/views.py
from orders.tasks import send_order_confirmation

def create_order(request):
    order = Order.objects.create(...)
    send_order_confirmation.delay(order.id)
    return JsonResponse({"order_id": order.id})

Start the worker alongside your Django dev server:

celery -A myproject worker --loglevel=info

9. Celery with Flask

Flask integration requires sharing the app context with Celery tasks:

# app.py
from flask import Flask
from celery import Celery, Task

def create_celery(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery = Celery(app.name, task_cls=FlaskTask)
    celery.config_from_object(app.config, namespace="CELERY")
    celery.set_default()
    app.extensions["celery"] = celery
    return celery

app = Flask(__name__)
app.config.from_mapping(
    CELERY_BROKER_URL="redis://localhost:6379/0",
    CELERY_RESULT_BACKEND="redis://localhost:6379/1",
)
celery = create_celery(app)

# tasks.py
from celery import shared_task

@shared_task
def generate_report(report_id):
    from models import Report  # import inside task for app context
    report = Report.query.get(report_id)
    # generate PDF...
    report.status = "completed"
    db.session.commit()
    return {"report_id": report_id}

# routes.py
@app.post("/api/reports")
def create_report():
    report = Report(status="pending")
    db.session.add(report)
    db.session.commit()
    generate_report.delay(report.id)
    return {"task": "queued", "report_id": report.id}

10. Celery Beat: Periodic Tasks and Scheduling

Celery Beat is a scheduler process that sends tasks to workers at defined intervals. Configure schedules in your Celery app:

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Run every 10 minutes
    "cleanup-expired-sessions": {
        "task": "maintenance.tasks.cleanup_sessions",
        "schedule": 600.0,  # seconds (or use timedelta(minutes=10))
    },
    # Run daily at midnight UTC
    "generate-daily-report": {
        "task": "reports.tasks.daily_summary",
        "schedule": crontab(hour=0, minute=0),
    },
    # Run every Monday at 9am
    "weekly-digest": {
        "task": "emails.tasks.send_weekly_digest",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
        "args": (),
    },
    # Run on the first day of every month
    "monthly-billing": {
        "task": "billing.tasks.process_monthly_invoices",
        "schedule": crontab(hour=6, minute=0, day_of_month=1),
    },
}

# Start Beat alongside your workers:
# Terminal 1: celery -A myproject worker --loglevel=info
# Terminal 2: celery -A myproject beat --loglevel=info
#
# Or combined (development only):
# celery -A myproject worker --beat --loglevel=info

For Django, install django-celery-beat to manage schedules from the admin panel:

pip install django-celery-beat

# settings.py
INSTALLED_APPS = [..., "django_celery_beat"]
CELERY_BEAT_SCHEDULER = "django_celery_beat.schedulers:DatabaseScheduler"

# Run migrations: python manage.py migrate
# Now manage periodic tasks in Django Admin > Periodic Tasks

Only run one Beat instance per cluster. Multiple Beat processes will send duplicate tasks. In production, use a process manager like systemd or supervisord to ensure exactly one Beat process is running.

11. Task Routing and Priority Queues

By default, all tasks go to the celery queue. Route tasks to different queues to prioritize work and isolate workloads:

# Configuration
app.conf.task_routes = {
    "emails.tasks.*": {"queue": "emails"},
    "reports.tasks.*": {"queue": "reports"},
    "billing.tasks.*": {"queue": "critical"},
}

# Or route per-task
@app.task(queue="critical")
def charge_payment(order_id, amount):
    ...

# Start workers consuming specific queues:
# celery -A myproject worker -Q critical --concurrency=2
# celery -A myproject worker -Q emails --concurrency=4
# celery -A myproject worker -Q reports --concurrency=1
# celery -A myproject worker -Q celery,emails  # multiple queues

# Route at call time
send_welcome_email.apply_async(
    args=(user_id,),
    queue="emails",
    priority=9  # 0 = lowest, 9 = highest (RabbitMQ)
)

Use separate queues for CPU-intensive tasks (low concurrency workers), I/O-bound tasks (higher concurrency), and critical tasks (dedicated workers with monitoring). This prevents a flood of report-generation tasks from starving your payment processing queue.

12. Monitoring with Flower

Flower gives you a real-time web dashboard for your Celery cluster:

# Install and run
pip install flower
celery -A myproject flower --port=5555

# With basic auth
celery -A myproject flower --basic-auth=admin:secret --port=5555

# With Prometheus metrics
celery -A myproject flower --broker_api=redis://localhost:6379/0

Flower shows worker status (online, offline, heartbeat), active and reserved tasks, task history with execution times, success and failure rates, and lets you shut down, restart, or pool-resize workers from the UI.

For CLI monitoring without Flower:

# List active tasks
celery -A myproject inspect active

# Worker stats
celery -A myproject inspect stats

# Registered tasks
celery -A myproject inspect registered

# Revoke a task
celery -A myproject control revoke <task-id> --terminate

# Purge all pending tasks (careful!)
celery -A myproject purge

13. Celery Canvas: Chains, Groups, and Chords

Canvas primitives let you compose tasks into workflows:

from celery import chain, group, chord
from tasks import fetch_data, transform, save_result, notify, aggregate

# Chain — sequential pipeline, each result feeds the next
workflow = chain(
    fetch_data.s("https://api.example.com/data"),
    transform.s(),
    save_result.s()
)
workflow.delay()  # fetch -> transform(fetch_result) -> save(transform_result)

# Group — parallel execution
batch = group(
    fetch_data.s(url) for url in urls
)
result = batch.delay()
results = result.get()  # list of all results

# Chord — group + callback when all complete
workflow = chord(
    [transform.s(item) for item in items],
    aggregate.s()  # called with list of transform results
)
workflow.delay()

# Practical example: process user signup
from celery import chain

def on_user_signup(user_id):
    workflow = chain(
        create_profile.si(user_id),       # si() = immutable signature (ignores previous result)
        send_welcome_email.si(user_id),
        sync_to_crm.si(user_id),
    )
    workflow.delay()

Use .s() for signatures that receive the previous result as the first argument, and .si() for immutable signatures that ignore the previous result.

14. Best Practices and Common Pitfalls

Pass IDs, not objects. Serialize task arguments as simple types (integers, strings, lists). Never pass Django model instances or file objects; pass the primary key and re-fetch inside the task. The object may change between when you enqueue and when the worker executes.

Make tasks idempotent. Workers can execute a task more than once (broker redelivery, visibility timeout, manual retry). Design tasks so running them twice produces the same result. Use database constraints and check-before-write patterns.

Set time limits. Always configure task_time_limit and task_soft_time_limit. Without them, a hung task blocks a worker slot forever. Soft limits raise an exception you can catch; hard limits kill the worker process.

Use worker_prefetch_multiplier=1. The default (4) causes workers to grab multiple tasks at once. If tasks take varying amounts of time, this leads to unfair distribution. Set it to 1 for even load balancing.

Never call .get() inside a task. Calling result.get() inside another task can cause deadlocks, especially with the prefork pool. Use Canvas primitives (chains, chords) instead to compose dependent tasks.

Use acks_late=True for critical tasks. By default, Celery acknowledges a task as soon as the worker receives it. With acks_late=True, the task is acknowledged only after it completes successfully, so if the worker crashes mid-execution, the broker redelivers it.

@app.task(acks_late=True, reject_on_worker_lost=True)
def critical_billing_task(invoice_id):
    """If the worker crashes, the broker will redeliver this task."""
    process_invoice(invoice_id)

Monitor your dead letter queue. Tasks that fail after all retries go to the dead letter queue (or are silently discarded depending on config). Set up alerting so failed tasks don't go unnoticed.

Test tasks synchronously. In tests, set CELERY_TASK_ALWAYS_EAGER=True to execute tasks inline without a broker. This simplifies testing but does not test serialization or concurrency issues. For integration tests, use a real broker.

# pytest fixture for eager mode
@pytest.fixture(autouse=True)
def celery_eager(settings):
    settings.CELERY_TASK_ALWAYS_EAGER = True
    settings.CELERY_TASK_EAGER_PROPAGATES = True

def test_send_email(celery_eager):
    result = send_order_confirmation.delay(order_id=1)
    assert result.get() == {"order_id": 1, "email": "user@example.com"}

Frequently Asked Questions

What is Celery and when should I use it?

Celery is a distributed task queue for Python that runs time-consuming operations in background worker processes instead of blocking your web server. Use it when you need to send emails, process images or videos, generate reports, call third-party APIs, run scheduled jobs, or handle any work that takes more than a few hundred milliseconds. It supports multiple message brokers (Redis, RabbitMQ), result backends, and scales horizontally by adding more workers.

How does Celery compare to RQ and Dramatiq?

Celery is the most feature-rich option with Canvas workflows (chains, groups, chords), periodic scheduling via Beat, multiple broker support, and a large ecosystem. RQ (Redis Queue) is simpler and Redis-only, ideal for small projects needing basic background jobs. Dramatiq sits in between with better defaults (automatic retries, result expiry) and a simpler API, but fewer workflow primitives. Choose Celery for production systems needing scheduling, complex workflows, and multi-broker support. Choose RQ for simplicity. Choose Dramatiq for a middle ground.

How do I use Celery with Django?

Create a celery.py file in your Django project that initializes the Celery app with os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'yourproject.settings'), then call app.config_from_object('django.conf:settings', namespace='CELERY') and app.autodiscover_tasks(). Import the app in __init__.py. Add CELERY_BROKER_URL to settings. Define tasks in tasks.py files inside your Django apps using @shared_task. See the full Django section above for working code.

What is Celery Beat and how does periodic scheduling work?

Celery Beat is a scheduler that sends tasks to workers at defined intervals. Configure schedules using beat_schedule with crontab() for cron-like schedules or timedelta for fixed intervals. Run it with celery -A proj beat. Beat only sends tasks; workers execute them. For Django, use django-celery-beat to store schedules in the database and manage them from the admin panel. Always run exactly one Beat instance per cluster to avoid duplicate tasks. See the full Beat section for configuration examples.

How do I monitor Celery workers and tasks?

Flower is the standard monitoring tool. Install with pip install flower and run celery -A proj flower for a web dashboard on port 5555 showing real-time worker status, active tasks, timing, and success/failure rates. From the CLI, celery inspect active shows running tasks and celery inspect stats shows worker statistics. For production, integrate with Prometheus using celery-exporter. See the full Monitoring section for details.

Related Resources