Background Jobs and Task Queues — BullMQ, Celery, and Sidekiq
When and why to use background jobs, message brokers, job patterns like delayed and scheduled jobs, retry strategies, and monitoring failed jobs. Code examples for BullMQ and Celery.
Your API endpoint should respond in under 200ms. But some operations take 5 seconds, 30 seconds, or several minutes -- sending emails, generating reports, processing images, syncing with third-party APIs. Doing these synchronously means the user stares at a spinner while your server holds a connection open.
Background jobs solve this by moving slow work off the request-response cycle. The API accepts the request, queues the work, responds immediately, and a separate worker processes the job asynchronously.
Architecture Overview
User Request → API Server → Queue (Redis/RabbitMQ) → Worker Process → Done
↓
Immediate Response
"Job queued, ID: abc123"
Three components:
- Producer: Your API server that creates jobs
- Broker: Message queue that stores jobs (Redis, RabbitMQ, SQS)
- Consumer/Worker: Separate process that pulls jobs and executes them
The broker decouples producers from consumers. If the worker crashes, jobs wait in the queue. If traffic spikes, you add more workers. If processing is slow, jobs queue up instead of timing out.
When to Use Background Jobs
| Scenario | Sync or Background? | Why |
|---|---|---|
| Send welcome email | Background | 2-5s SMTP call, user doesn't need to wait |
| Generate PDF report | Background | CPU-intensive, 10-30s |
| Process uploaded image | Background | Resizing, optimization, 5-15s |
| Sync data to Salesforce | Background | External API, unreliable latency |
| Calculate user permissions | Sync | Fast, needed for the response |
| Validate form input | Sync | Instant, user needs immediate feedback |
| Send webhook notifications | Background | External services may be slow/down |
| Aggregate analytics data | Background (scheduled) | Periodic batch processing |
| Charge a subscription | Background (scheduled) | Runs daily/monthly |
BullMQ (Node.js + Redis)
BullMQ is the standard for Node.js. It uses Redis as its broker and supports delayed jobs, rate limiting, retries, job priorities, and repeatable/scheduled jobs.
Setup
npm install bullmq ioredis
Creating a Queue and Adding Jobs
import { Queue } from "bullmq";
import IORedis from "ioredis";
const connection = new IORedis({
host: "localhost",
port: 6379,
maxRetriesPerRequest: null,
});
const emailQueue = new Queue("emails", { connection });
// Add a job from your API route
app.post("/api/register", async (req, res) => {
const user = await createUser(req.body);
// Queue the welcome email -- returns immediately
await emailQueue.add("welcome-email", {
userId: user.id,
email: user.email,
name: user.name,
});
res.status(201).json({ user });
});
Processing Jobs (Worker)
import { Worker } from "bullmq";
const emailWorker = new Worker(
"emails",
async (job) => {
console.log(Processing job ${job.id}: ${job.name});
switch (job.name) {
case "welcome-email":
await sendWelcomeEmail(job.data.email, job.data.name);
break;
case "password-reset":
await sendPasswordResetEmail(job.data.email, job.data.token);
break;
default:
throw new Error(Unknown job type: ${job.name});
}
return { sent: true, timestamp: Date.now() };
},
{
connection,
concurrency: 5, // Process 5 jobs simultaneously
}
);
emailWorker.on("completed", (job, result) => {
console.log(Job ${job.id} completed:, result);
});
emailWorker.on("failed", (job, err) => {
console.error(Job ${job?.id} failed:, err.message);
});
Job Options: Delays, Retries, Priority
// Delayed job -- send email 1 hour after registration
await emailQueue.add("follow-up-email", { userId: user.id }, {
delay: 60 60 1000, // 1 hour in ms
});
// Job with retries
await emailQueue.add("webhook-delivery", { url, payload }, {
attempts: 5,
backoff: {
type: "exponential",
delay: 1000, // 1s, 2s, 4s, 8s, 16s
},
});
// Priority job (lower number = higher priority)
await emailQueue.add("password-reset", { email }, {
priority: 1, // Process before regular emails
});
// Job with timeout
await emailQueue.add("generate-report", { reportId }, {
timeout: 30000, // Fail if takes longer than 30s
});
Scheduled/Repeatable Jobs
// Run every day at 3 AM
await emailQueue.add("daily-digest", {}, {
repeat: {
pattern: "0 3 *", // Cron syntax
},
});
// Run every 5 minutes
await emailQueue.add("health-check", {}, {
repeat: {
every: 5 60 1000,
},
});
Job Progress and Events
// In the worker -- report progress
const reportWorker = new Worker("reports", async (job) => {
const rows = await fetchData();
const total = rows.length;
for (let i = 0; i < total; i++) {
await processRow(rows[i]);
await job.updateProgress(Math.round(((i + 1) / total) * 100));
}
return { processedRows: total };
}, { connection });
// In your API -- check progress
app.get("/api/jobs/:id/status", async (req, res) => {
const job = await emailQueue.getJob(req.params.id);
if (!job) return res.status(404).json({ error: "Job not found" });
const state = await job.getState();
res.json({
id: job.id,
state, // "waiting", "active", "completed", "failed"
progress: job.progress, // 0-100
result: job.returnvalue, // Available after completion
failedReason: job.failedReason,
attemptsMade: job.attemptsMade,
});
});
Celery (Python + Redis/RabbitMQ)
Celery is the go-to for Python. It supports Redis and RabbitMQ as brokers, has built-in scheduling (Celery Beat), and integrates with Django and Flask.
Setup
pip install celery redis
Configuration
# celery_app.py
from celery import Celery
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1", # For storing results
)
app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_acks_late=True, # Acknowledge after completion, not receipt
worker_prefetch_multiplier=1, # Don't hoard tasks
)
Defining Tasks
# tasks.py
from celery_app import app
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_welcome_email(self, user_id: int, email: str):
try:
logger.info(f"Sending welcome email to {email}")
smtp_send(
to=email,
subject="Welcome!",
body=render_template("welcome.html", user_id=user_id),
)
return {"status": "sent", "email": email}
except SMTPConnectionError as exc:
logger.warning(f"SMTP connection failed, retrying: {exc}")
raise self.retry(exc=exc)
@app.task(bind=True)
def generate_report(self, report_id: int):
rows = fetch_report_data(report_id)
total = len(rows)
for i, row in enumerate(rows):
process_row(row)
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total},
)
save_report(report_id, rows)
return {"status": "complete", "rows_processed": total}
Calling Tasks
# In your Flask/Django view
from tasks import send_welcome_email, generate_report
# Fire and forget
send_welcome_email.delay(user_id=42, email="user@example.com")
# With options
send_welcome_email.apply_async(
args=[42, "user@example.com"],
countdown=3600, # Delay 1 hour
expires=7200, # Expire if not processed within 2 hours
)
# Get result (async)
result = generate_report.delay(report_id=99)
print(result.id) # Task ID
print(result.status) # PENDING, STARTED, PROGRESS, SUCCESS, FAILURE
print(result.result) # Return value when done
# Chain tasks
from celery import chain
workflow = chain(
fetch_data.s(source="api"),
transform_data.s(),
load_data.s(destination="warehouse"),
)
workflow.apply_async()
Celery Beat (Scheduled Tasks)
# celery_app.py
from celery.schedules import crontab
app.conf.beat_schedule = {
"daily-cleanup": {
"task": "tasks.cleanup_expired_sessions",
"schedule": crontab(hour=2, minute=0), # 2 AM daily
},
"hourly-sync": {
"task": "tasks.sync_external_data",
"schedule": crontab(minute=0), # Every hour
},
"every-5-minutes": {
"task": "tasks.check_payment_status",
"schedule": 300.0, # Every 300 seconds
},
}
Run with:
# Worker
celery -A celery_app worker --loglevel=info --concurrency=4
# Beat scheduler (separate process)
celery -A celery_app beat --loglevel=info
Retry Strategies
Retries are essential -- external services fail. But naive retries (immediate, unlimited) can overwhelm a failing service.
Exponential Backoff
Attempt 1: immediate
Attempt 2: wait 1s
Attempt 3: wait 2s
Attempt 4: wait 4s
Attempt 5: wait 8s
Exponential Backoff with Jitter
Without jitter, all retrying jobs hit the service at the same time (thundering herd). Add randomness:
// BullMQ
{
attempts: 5,
backoff: {
type: "custom",
},
}
// Custom backoff in worker
const worker = new Worker("queue", processor, {
connection,
settings: {
backoffStrategy: (attemptsMade) => {
const base = 1000 * Math.pow(2, attemptsMade);
const jitter = Math.random() * 1000;
return Math.min(base + jitter, 30000); // Cap at 30s
},
},
});
Retry Decision Matrix
| Error Type | Retry? | Strategy |
|---|---|---|
| Network timeout | Yes | Exponential backoff |
| 500 from external API | Yes | Exponential backoff |
| 429 rate limited | Yes | Respect Retry-After header |
| 400 bad request | No | Fix the data, fail permanently |
| 401 unauthorized | No | Fix credentials, fail permanently |
| Validation error | No | Bad data won't become good data |
| Database deadlock | Yes | Short delay, limited retries |
| Out of memory | No | Needs investigation, not retries |
Dead Letter Queues
After all retries are exhausted, jobs go to a dead letter queue (DLQ). This prevents data loss -- failed jobs sit there until a human investigates.
// BullMQ -- listen for permanently failed jobs
emailWorker.on("failed", async (job, err) => {
if (job && job.attemptsMade >= job.opts.attempts) {
// All retries exhausted -- this is a dead letter
await deadLetterQueue.add("failed-email", {
originalJob: job.data,
error: err.message,
failedAt: new Date().toISOString(),
attempts: job.attemptsMade,
});
alertOpsTeam(Email job ${job.id} permanently failed: ${err.message});
}
});
Monitoring and Observability
Running background jobs without monitoring is running blind. You need visibility into:
- Queue depth: How many jobs are waiting? Growing queue = workers can't keep up.
- Processing time: How long do jobs take? Degrading times = external service issues.
- Failure rate: What percentage of jobs fail? Spikes need investigation.
- Worker health: Are workers alive and processing?
BullMQ Dashboard (Bull Board)
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ExpressAdapter } from "@bull-board/express";
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath("/admin/queues");
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(reportQueue),
new BullMQAdapter(webhookQueue),
],
serverAdapter,
});
app.use("/admin/queues", authMiddleware, serverAdapter.getRouter());
Celery Monitoring (Flower)
pip install flower
celery -A celery_app flower --port=5555
Flower gives you a web dashboard with real-time task monitoring, worker status, and the ability to inspect and retry failed tasks.
BullMQ vs Celery vs Sidekiq
| Feature | BullMQ (Node) | Celery (Python) | Sidekiq (Ruby) |
|---|---|---|---|
| Language | JavaScript/TypeScript | Python | Ruby |
| Broker | Redis | Redis, RabbitMQ, SQS | Redis |
| Scheduling | Built-in repeatable | Celery Beat (separate process) | Sidekiq-Cron/Scheduler |
| Concurrency | Event loop + workers | Prefork/Gevent | Threads |
| Dashboard | Bull Board | Flower | Sidekiq Web UI |
| Rate limiting | Built-in | Via libraries | Sidekiq Enterprise |
| Priority queues | Yes | Yes | Yes |
| Job chaining | Flows API | Canvas (chain, chord, group) | Batch (Pro/Enterprise) |
| Maturity | 2020+ | 2009+ | 2012+ |
Common Pitfalls
1. Jobs that aren't idempotent. Workers can crash mid-job and reprocess it. If your job sends an email and crashes after sending but before acknowledging, the email gets sent twice. Design jobs so running them twice produces the same result as once.// Bad: not idempotent
async function processPayment(orderId) {
await chargeCard(orderId); // Charges again on retry!
}
// Good: idempotent
async function processPayment(orderId) {
const order = await getOrder(orderId);
if (order.paymentStatus === "charged") return; // Already done
await chargeCard(orderId);
await updateOrder(orderId, { paymentStatus: "charged" });
}
2. Serialization assumptions. Job data gets serialized to JSON (or msgpack). Dates become strings. Class instances become plain objects. Don't pass complex objects -- pass IDs and re-fetch from the database in the worker.
3. No timeout on jobs. A hung job blocks the worker forever. Always set a timeout.
4. Running workers and web servers in the same process. Workers should be separate processes. A CPU-intensive job shouldn't block your API from responding.
The CodeUp tutorials at codeup.dev cover related patterns like event-driven architecture and pub/sub messaging if you're building beyond simple job queues.