Updated for nself v0.4.8
BullMQ is a powerful job queue system for Node.js applications, and nself provides first-class support for building and deploying BullMQ workers through the bullmq-js and bullmq-ts custom service templates. With automatic Redis configuration, built-in monitoring, and scalable worker templates, you can handle background processing, scheduled jobs, and distributed workflows with ease.
BullMQ workers in nself provide:
Add BullMQ workers using the custom service (CS_N) format in your .env file:
# Custom Service Format: CS_N=name:template:port:route
# Available BullMQ templates: bullmq-js, bullmq-ts
# Email worker with TypeScript
CS_1=email-worker:bullmq-ts:3001
# Image processor worker
CS_2=image-processor:bullmq-ts:3002
# Data sync worker
CS_3=data-sync:bullmq-ts:3003
# Ensure Redis is enabled
REDIS_ENABLED=true# Generate BullMQ workers from templates
nself build
# Start all services including workers
nself startThis creates the following structure:
services/
├── email-worker/
│ ├── src/
│ │ ├── processors/
│ │ │ └── email.processor.ts
│ │ ├── jobs/
│ │ │ └── send-email.job.ts
│ │ └── index.ts
│ ├── package.json
│ ├── tsconfig.json
│ └── Dockerfile
├── image-processor/
│ ├── src/
│ ├── package.json
│ └── Dockerfile
└── data-sync/
├── src/
├── package.json
└── Dockerfile// workers/email-worker/src/processors/email.processor.js
import { Worker } from 'bullmq';
import nodemailer from 'nodemailer';
const transporter = nodemailer.createTransporter({
host: process.env.SMTP_HOST,
port: process.env.SMTP_PORT,
secure: process.env.SMTP_SECURE === 'true',
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS,
},
});
const emailWorker = new Worker('email-queue', async (job) => {
const { to, subject, html, attachments } = job.data;
try {
const info = await transporter.sendMail({
from: process.env.SMTP_FROM,
to,
subject,
html,
attachments,
});
console.log('Email sent:', info.messageId);
return { messageId: info.messageId, status: 'sent' };
} catch (error) {
console.error('Email failed:', error);
throw error;
}
}, {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
concurrency: 5,
});
emailWorker.on('completed', (job, result) => {
console.log(`Email job ${job.id} completed:, result`);
});
emailWorker.on('failed', (job, err) => {
console.error(`Email job ${job.id} failed:`, err);
});
export default emailWorker;// workers/image-processor/src/processors/image.processor.js
import { Worker } from 'bullmq';
import sharp from 'sharp';
import AWS from 'aws-sdk';
const s3 = new AWS.S3({
endpoint: process.env.MINIO_ENDPOINT,
accessKeyId: process.env.MINIO_ACCESS_KEY,
secretAccessKey: process.env.MINIO_SECRET_KEY,
s3ForcePathStyle: true,
});
const imageWorker = new Worker('image-queue', async (job) => {
const { imageUrl, sizes, bucket } = job.data;
// Download original image
const response = await fetch(imageUrl);
const imageBuffer = await response.buffer();
const results = [];
// Process each size
for (const size of sizes) {
const { width, height, suffix } = size;
// Resize image
const processedBuffer = await sharp(imageBuffer)
.resize(width, height, { fit: 'cover' })
.jpeg({ quality: 85 })
.toBuffer();
// Upload to storage
const key = `images/${Date.now()}-${suffix}.jpg`;
await s3.upload({
Bucket: bucket,
Key: key,
Body: processedBuffer,
ContentType: 'image/jpeg',
}).promise();
results.push({
size: `${width}x${height}`,
url: `/storage/${bucket}/${key}`,
});
}
return { processedImages: results };
}, {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
concurrency: 3,
});
export default imageWorker;// In your NestJS service or API
import { Queue } from 'bullmq';
const emailQueue = new Queue('email-queue', {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
});
// Add immediate job
await emailQueue.add('send-welcome-email', {
to: 'user@example.com',
subject: 'Welcome to our platform!',
html: '<h1>Welcome!</h1><p>Thanks for signing up.</p>',
});
// Add delayed job
await emailQueue.add('send-reminder', {
to: 'user@example.com',
subject: 'Don\'t forget to complete your profile',
html: '<p>Complete your profile to get started.</p>',
}, {
delay: 24 * 60 * 60 * 1000, // 24 hours
});
// Add recurring job
await emailQueue.add('daily-digest', {
to: 'user@example.com',
subject: 'Your daily digest',
html: '<p>Here\'s what happened today...</p>',
}, {
repeat: { cron: '0 9 * * *' }, // Every day at 9 AM
});// High priority job
await emailQueue.add('urgent-notification', data, {
priority: 1, // Higher number = higher priority
});
// Job with retry configuration
await imageQueue.add('process-image', data, {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000,
},
});
// Job with timeout
await dataQueue.add('sync-data', data, {
timeout: 30000, // 30 seconds
});nself automatically configures Bull Dashboard for monitoring:
http://localhost:3001/bull// workers/shared/health.js
import { Queue } from 'bullmq';
export class HealthChecker {
constructor(queueName) {
this.queue = new Queue(queueName, {
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
});
}
async check() {
try {
const waiting = await this.queue.getWaiting();
const active = await this.queue.getActive();
const failed = await this.queue.getFailed();
return {
status: 'healthy',
stats: {
waiting: waiting.length,
active: active.length,
failed: failed.length,
},
};
} catch (error) {
return {
status: 'unhealthy',
error: error.message,
};
}
}
}// Chain jobs together
const processDataWorker = new Worker('process-data', async (job) => {
const { userId, data } = job.data;
// Process the data
const processedData = await processUserData(data);
// Chain to next job
await emailQueue.add('send-processing-complete', {
to: await getUserEmail(userId),
subject: 'Data processing complete',
html: `<p>Your data has been processed successfully.</p>`,
});
return processedData;
});// Add multiple jobs at once
const jobs = users.map(user => ({
name: 'send-newsletter',
data: {
to: user.email,
subject: 'Weekly Newsletter',
html: generateNewsletterHTML(user),
},
}));
await emailQueue.addBulk(jobs);import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer({
connection: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
});
// Create job flow
await flowProducer.add({
name: 'process-order',
queueName: 'order-processing',
data: { orderId: 123 },
children: [
{
name: 'validate-payment',
queueName: 'payment-validation',
data: { orderId: 123 },
},
{
name: 'update-inventory',
queueName: 'inventory-management',
data: { orderId: 123 },
},
{
name: 'send-confirmation',
queueName: 'email-queue',
data: { orderId: 123 },
},
],
});# Worker-specific configuration
BULLMQ_EMAIL_CONCURRENCY=10
BULLMQ_IMAGE_CONCURRENCY=5
BULLMQ_DATA_CONCURRENCY=2
# Redis configuration
BULLMQ_REDIS_HOST=redis
BULLMQ_REDIS_PORT=6379
BULLMQ_REDIS_PASSWORD=secure_password
BULLMQ_REDIS_DB=1
# Monitoring
BULLMQ_DASHBOARD_ENABLED=true
BULLMQ_DASHBOARD_PORT=3001
BULLMQ_DASHBOARD_AUTH=admin:password123
# Job retention
BULLMQ_COMPLETED_RETENTION=100
BULLMQ_FAILED_RETENTION=50# Scale workers using multiple CS_N entries
CS_1=email-worker-1:bullmq-ts:3001
CS_2=email-worker-2:bullmq-ts:3002
CS_3=email-worker-3:bullmq-ts:3003
CS_4=image-processor-1:bullmq-ts:3004
CS_5=image-processor-2:bullmq-ts:3005
# Rebuild and restart
nself build && nself restart# Add a BullMQ worker via CS_N configuration
# In .env file:
CS_1=pdf-generator:bullmq-ts:3001
# Rebuild to generate from template
nself build
# Start services
nself start
# View worker logs
nself logs email-worker
# Follow logs in real-time
nself logs email-worker -f
# Check service status
nself status# Scale workers using multiple CS_N entries
CS_1=email-worker-1:bullmq-ts:3001
CS_2=email-worker-2:bullmq-ts:3002
CS_3=email-worker-3:bullmq-ts:3003
# Or use Docker Compose deploy configuration
# Configure in .env and rebuild
nself build && nself restartNow that you understand BullMQ workers:
BullMQ workers provide a robust foundation for asynchronous processing and background jobs in your nself applications.