March 26, 202610 min read

Background Jobs and Task Queues — BullMQ, Celery, and Sidekiq

When and why to use background jobs, message brokers, job patterns like delayed and scheduled jobs, retry strategies, and monitoring failed jobs. Code examples for BullMQ and Celery.

background jobs queues bullmq celery async
Ad 336x280

Your API endpoint should respond in under 200ms. But some operations take 5 seconds, 30 seconds, or several minutes -- sending emails, generating reports, processing images, syncing with third-party APIs. Doing these synchronously means the user stares at a spinner while your server holds a connection open.

Background jobs solve this by moving slow work off the request-response cycle. The API accepts the request, queues the work, responds immediately, and a separate worker processes the job asynchronously.

Architecture Overview

User Request → API Server → Queue (Redis/RabbitMQ) → Worker Process → Done
                  ↓
            Immediate Response
            "Job queued, ID: abc123"

Three components:


  1. Producer: Your API server that creates jobs

  2. Broker: Message queue that stores jobs (Redis, RabbitMQ, SQS)

  3. Consumer/Worker: Separate process that pulls jobs and executes them


The broker decouples producers from consumers. If the worker crashes, jobs wait in the queue. If traffic spikes, you add more workers. If processing is slow, jobs queue up instead of timing out.

When to Use Background Jobs

ScenarioSync or Background?Why
Send welcome emailBackground2-5s SMTP call, user doesn't need to wait
Generate PDF reportBackgroundCPU-intensive, 10-30s
Process uploaded imageBackgroundResizing, optimization, 5-15s
Sync data to SalesforceBackgroundExternal API, unreliable latency
Calculate user permissionsSyncFast, needed for the response
Validate form inputSyncInstant, user needs immediate feedback
Send webhook notificationsBackgroundExternal services may be slow/down
Aggregate analytics dataBackground (scheduled)Periodic batch processing
Charge a subscriptionBackground (scheduled)Runs daily/monthly

BullMQ (Node.js + Redis)

BullMQ is the standard for Node.js. It uses Redis as its broker and supports delayed jobs, rate limiting, retries, job priorities, and repeatable/scheduled jobs.

Setup

npm install bullmq ioredis

Creating a Queue and Adding Jobs

import { Queue } from "bullmq";
import IORedis from "ioredis";

const connection = new IORedis({
host: "localhost",
port: 6379,
maxRetriesPerRequest: null,
});

const emailQueue = new Queue("emails", { connection });

// Add a job from your API route
app.post("/api/register", async (req, res) => {
const user = await createUser(req.body);

// Queue the welcome email -- returns immediately
await emailQueue.add("welcome-email", {
userId: user.id,
email: user.email,
name: user.name,
});

res.status(201).json({ user });
});

Processing Jobs (Worker)

import { Worker } from "bullmq";

const emailWorker = new Worker(
"emails",
async (job) => {
console.log(Processing job ${job.id}: ${job.name});

switch (job.name) {
case "welcome-email":
await sendWelcomeEmail(job.data.email, job.data.name);
break;
case "password-reset":
await sendPasswordResetEmail(job.data.email, job.data.token);
break;
default:
throw new Error(Unknown job type: ${job.name});
}

return { sent: true, timestamp: Date.now() };
},
{
connection,
concurrency: 5, // Process 5 jobs simultaneously
}
);

emailWorker.on("completed", (job, result) => {
console.log(Job ${job.id} completed:, result);
});

emailWorker.on("failed", (job, err) => {
console.error(Job ${job?.id} failed:, err.message);
});

Job Options: Delays, Retries, Priority

// Delayed job -- send email 1 hour after registration
await emailQueue.add("follow-up-email", { userId: user.id }, {
  delay: 60  60  1000, // 1 hour in ms
});

// Job with retries
await emailQueue.add("webhook-delivery", { url, payload }, {
attempts: 5,
backoff: {
type: "exponential",
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
});

// Priority job (lower number = higher priority)
await emailQueue.add("password-reset", { email }, {
priority: 1, // Process before regular emails
});

// Job with timeout
await emailQueue.add("generate-report", { reportId }, {
timeout: 30000, // Fail if takes longer than 30s
});

Scheduled/Repeatable Jobs

// Run every day at 3 AM
await emailQueue.add("daily-digest", {}, {
  repeat: {
    pattern: "0 3   *", // Cron syntax
  },
});

// Run every 5 minutes
await emailQueue.add("health-check", {}, {
repeat: {
every: 5 60 1000,
},
});

Job Progress and Events

// In the worker -- report progress
const reportWorker = new Worker("reports", async (job) => {
  const rows = await fetchData();
  const total = rows.length;

for (let i = 0; i < total; i++) {
await processRow(rows[i]);
await job.updateProgress(Math.round(((i + 1) / total) * 100));
}

return { processedRows: total };
}, { connection });

// In your API -- check progress
app.get("/api/jobs/:id/status", async (req, res) => {
const job = await emailQueue.getJob(req.params.id);
if (!job) return res.status(404).json({ error: "Job not found" });

const state = await job.getState();
res.json({
id: job.id,
state, // "waiting", "active", "completed", "failed"
progress: job.progress, // 0-100
result: job.returnvalue, // Available after completion
failedReason: job.failedReason,
attemptsMade: job.attemptsMade,
});
});

Celery (Python + Redis/RabbitMQ)

Celery is the go-to for Python. It supports Redis and RabbitMQ as brokers, has built-in scheduling (Celery Beat), and integrates with Django and Flask.

Setup

pip install celery redis

Configuration

# celery_app.py
from celery import Celery

app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1", # For storing results
)

app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True, # Acknowledge after completion, not receipt
worker_prefetch_multiplier=1, # Don't hoard tasks
)

Defining Tasks

# tasks.py
from celery_app import app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int, email: str):
try:
logger.info(f"Sending welcome email to {email}")
smtp_send(
to=email,
subject="Welcome!",
body=render_template("welcome.html", user_id=user_id),
)
return {"status": "sent", "email": email}
except SMTPConnectionError as exc:
logger.warning(f"SMTP connection failed, retrying: {exc}")
raise self.retry(exc=exc)

@app.task(bind=True)
def generate_report(self, report_id: int):
rows = fetch_report_data(report_id)
total = len(rows)

for i, row in enumerate(rows):
process_row(row)
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total},
)

save_report(report_id, rows)
return {"status": "complete", "rows_processed": total}

Calling Tasks

# In your Flask/Django view
from tasks import send_welcome_email, generate_report

# Fire and forget
send_welcome_email.delay(user_id=42, email="user@example.com")

# With options
send_welcome_email.apply_async(
    args=[42, "user@example.com"],
    countdown=3600,  # Delay 1 hour
    expires=7200,    # Expire if not processed within 2 hours
)

# Get result (async)
result = generate_report.delay(report_id=99)
print(result.id)       # Task ID
print(result.status)   # PENDING, STARTED, PROGRESS, SUCCESS, FAILURE
print(result.result)   # Return value when done

# Chain tasks
from celery import chain
workflow = chain(
    fetch_data.s(source="api"),
    transform_data.s(),
    load_data.s(destination="warehouse"),
)
workflow.apply_async()

Celery Beat (Scheduled Tasks)

# celery_app.py
from celery.schedules import crontab

app.conf.beat_schedule = {
"daily-cleanup": {
"task": "tasks.cleanup_expired_sessions",
"schedule": crontab(hour=2, minute=0), # 2 AM daily
},
"hourly-sync": {
"task": "tasks.sync_external_data",
"schedule": crontab(minute=0), # Every hour
},
"every-5-minutes": {
"task": "tasks.check_payment_status",
"schedule": 300.0, # Every 300 seconds
},
}

Run with:

# Worker
celery -A celery_app worker --loglevel=info --concurrency=4

# Beat scheduler (separate process) celery -A celery_app beat --loglevel=info

Retry Strategies

Retries are essential -- external services fail. But naive retries (immediate, unlimited) can overwhelm a failing service.

Exponential Backoff

Attempt 1: immediate
Attempt 2: wait 1s
Attempt 3: wait 2s
Attempt 4: wait 4s
Attempt 5: wait 8s

Exponential Backoff with Jitter

Without jitter, all retrying jobs hit the service at the same time (thundering herd). Add randomness:

// BullMQ
{
  attempts: 5,
  backoff: {
    type: "custom",
  },
}

// Custom backoff in worker
const worker = new Worker("queue", processor, {
connection,
settings: {
backoffStrategy: (attemptsMade) => {
const base = 1000 * Math.pow(2, attemptsMade);
const jitter = Math.random() * 1000;
return Math.min(base + jitter, 30000); // Cap at 30s
},
},
});

Retry Decision Matrix

Error TypeRetry?Strategy
Network timeoutYesExponential backoff
500 from external APIYesExponential backoff
429 rate limitedYesRespect Retry-After header
400 bad requestNoFix the data, fail permanently
401 unauthorizedNoFix credentials, fail permanently
Validation errorNoBad data won't become good data
Database deadlockYesShort delay, limited retries
Out of memoryNoNeeds investigation, not retries

Dead Letter Queues

After all retries are exhausted, jobs go to a dead letter queue (DLQ). This prevents data loss -- failed jobs sit there until a human investigates.

// BullMQ -- listen for permanently failed jobs
emailWorker.on("failed", async (job, err) => {
  if (job && job.attemptsMade >= job.opts.attempts) {
    // All retries exhausted -- this is a dead letter
    await deadLetterQueue.add("failed-email", {
      originalJob: job.data,
      error: err.message,
      failedAt: new Date().toISOString(),
      attempts: job.attemptsMade,
    });

alertOpsTeam(Email job ${job.id} permanently failed: ${err.message});
}
});

Monitoring and Observability

Running background jobs without monitoring is running blind. You need visibility into:

  • Queue depth: How many jobs are waiting? Growing queue = workers can't keep up.
  • Processing time: How long do jobs take? Degrading times = external service issues.
  • Failure rate: What percentage of jobs fail? Spikes need investigation.
  • Worker health: Are workers alive and processing?

BullMQ Dashboard (Bull Board)

import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ExpressAdapter } from "@bull-board/express";

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/admin/queues");

createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(reportQueue),
new BullMQAdapter(webhookQueue),
],
serverAdapter,
});

app.use("/admin/queues", authMiddleware, serverAdapter.getRouter());

Celery Monitoring (Flower)

pip install flower
celery -A celery_app flower --port=5555

Flower gives you a web dashboard with real-time task monitoring, worker status, and the ability to inspect and retry failed tasks.

BullMQ vs Celery vs Sidekiq

FeatureBullMQ (Node)Celery (Python)Sidekiq (Ruby)
LanguageJavaScript/TypeScriptPythonRuby
BrokerRedisRedis, RabbitMQ, SQSRedis
SchedulingBuilt-in repeatableCelery Beat (separate process)Sidekiq-Cron/Scheduler
ConcurrencyEvent loop + workersPrefork/GeventThreads
DashboardBull BoardFlowerSidekiq Web UI
Rate limitingBuilt-inVia librariesSidekiq Enterprise
Priority queuesYesYesYes
Job chainingFlows APICanvas (chain, chord, group)Batch (Pro/Enterprise)
Maturity2020+2009+2012+
Pick the one that matches your stack. They all solve the same core problem well.

Common Pitfalls

1. Jobs that aren't idempotent. Workers can crash mid-job and reprocess it. If your job sends an email and crashes after sending but before acknowledging, the email gets sent twice. Design jobs so running them twice produces the same result as once.
// Bad: not idempotent
async function processPayment(orderId) {
  await chargeCard(orderId); // Charges again on retry!
}

// Good: idempotent
async function processPayment(orderId) {
const order = await getOrder(orderId);
if (order.paymentStatus === "charged") return; // Already done
await chargeCard(orderId);
await updateOrder(orderId, { paymentStatus: "charged" });
}

2. Serialization assumptions. Job data gets serialized to JSON (or msgpack). Dates become strings. Class instances become plain objects. Don't pass complex objects -- pass IDs and re-fetch from the database in the worker. 3. No timeout on jobs. A hung job blocks the worker forever. Always set a timeout. 4. Running workers and web servers in the same process. Workers should be separate processes. A CPU-intensive job shouldn't block your API from responding.

The CodeUp tutorials at codeup.dev cover related patterns like event-driven architecture and pub/sub messaging if you're building beyond simple job queues.

Ad 728x90