1. Wann braucht man eine Queue?
Die kurze Antwort: Immer dann, wenn eine Operation länger als 2–3 Sekunden dauert oder fehlschlagen kann, ohne dass der Nutzer sofort warten soll. In der Praxis trifft das auf eine überraschend breite Kategorie zu.
| Use Case |
Typische Dauer |
Problem ohne Queue |
Empfehlung |
| AI-Dokumentenanalyse |
5–60 s pro Dokument |
HTTP-Timeout, kein Retry |
Queue Pflicht |
| Email-Newsletter |
10 ms × 10.000 Empfänger |
Rate-Limiting, SMTP-Block |
Queue Pflicht |
| Video-Encoding |
30 s – 10 min |
Worker-Blockade, Timeouts |
Queue Pflicht |
| Image-Processing |
0,5–5 s pro Bild |
CPU-Spikes, Parallelitätsprobleme |
Queue empfohlen |
| Webhook-Delivery |
Netzwerkabhängig |
Keine Retries bei Fehler |
Queue empfohlen |
| Einfache DB-Abfrage |
<100 ms |
Kein Problem |
Kein Overhead nötig |
Faustregel: Wenn dein HTTP-Request intern einen externen API-Call (OpenAI, Claude, SMTP, S3) macht — sollte er hinter einer Queue stecken. Der Nutzer bekommt sofort eine Job-ID zurück, der Rest passiert asynchron.
2. BullMQ Setup: Producer, Consumer, Worker mit Redis
BullMQ ist der moderne Nachfolger von Bull — vollständig in TypeScript geschrieben, deutlich bessere Fehlerbehandlung und native Unterstützung für Job-Flows. Als Storage-Backend nutzt es Redis, das bereits auf den meisten Produktionsservern läuft.
Installation:
npm install bullmq ioredis
npm install -D @types/node typescript ts-node
Die grundlegende Architektur besteht aus drei Teilen: einer Queue (Warteschlange), einem Worker (verarbeitet Jobs) und einem QueueEvents-Listener für Monitoring. Claude Code generiert dieses Grundgerüst auf Anhieb korrekt:
Claude Code Prompt
Erstelle ein BullMQ Setup mit TypeScript für eine "document-processing" Queue. Ich brauche: (1) Eine queue.ts die Jobs hinzufügt, (2) einen worker.ts der Jobs verarbeitet, (3) Retry-Konfiguration mit exponential backoff, (4) Job-Events für Logging. Redis läuft lokal auf Port 6379.
// queue.ts — Producer
import { Queue } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis({
host: process.env.REDIS_HOST || 'localhost',
port: 6379,
maxRetriesPerRequest: null // Pflicht für BullMQ
});
export const documentQueue = new Queue('document-processing', {
connection,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000 // 2s, 4s, 8s
},
removeOnComplete: { count: 100 },
removeOnFail: { count: 500 }
}
});
// Job hinzufügen
export async function enqueueDocument(documentId: string, priority = 5) {
const job = await documentQueue.add(
'analyze',
{ documentId, enqueuedAt: new Date().toISOString() },
{ priority } // 1 = höchste Priorität
);
return job.id;
}
// worker.ts — Consumer
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis({
host: process.env.REDIS_HOST || 'localhost',
maxRetriesPerRequest: null
});
const worker = new Worker(
'document-processing',
async (job: Job) => {
console.log(`[Worker] Verarbeite Job ${job.id}: ${job.data.documentId}`);
// Hier kommt die eigentliche Verarbeitungslogik
await processDocument(job.data.documentId);
return { status: 'done', processedAt: new Date().toISOString() };
},
{
connection,
concurrency: 5, // Max 5 parallele Jobs
limiter: {
max: 10,
duration: 60000 // Max 10 Jobs pro Minute
}
}
);
worker.on('completed', (job) =>
console.log(`✓ Job ${job.id} abgeschlossen`));
worker.on('failed', (job, err) =>
console.error(`✗ Job ${job?.id} fehlgeschlagen: ${err.message}`));
Wichtig: maxRetriesPerRequest: null ist bei BullMQ keine optionale Einstellung — ohne sie crashed die IORedis-Verbindung beim ersten Queue-Fehler. Claude Code setzt diesen Parameter automatisch, manuelle Implementierungen vergessen ihn regelmäßig.
3a. AI-Verarbeitungs-Queue: 100 Dokumente mit Claude analysieren
Das klassischste KI-Queue-Szenario: Ein Upload-Endpoint empfängt 100 PDFs, jede soll durch Claude analysiert werden. Synchron würde das 5–10 Minuten blockieren. Mit einer Queue: sofortige Antwort, parallele Verarbeitung mit Rate-Limiting gegen die Anthropic API.
// ai-worker.ts — Claude-Dokument-Analyse Queue
import { Worker, Job, Queue } from 'bullmq';
import Anthropic from '@anthropic-ai/sdk';
const anthropic = new Anthropic({
apiKey: process.env.ANTHROPIC_API_KEY
});
interface DocumentJob {
documentId: string;
content: string;
analysisType: 'summary' | 'sentiment' | 'extraction';
}
const aiWorker = new Worker<DocumentJob>(
'ai-analysis',
async (job: Job<DocumentJob>) => {
const { documentId, content, analysisType } = job.data;
// Fortschritt melden (sichtbar im Bull Board)
await job.updateProgress(10);
const prompt = buildPrompt(analysisType, content);
const message = await anthropic.messages.create({
model: 'claude-opus-4-5',
max_tokens: 1024,
messages: [{ role: 'user', content: prompt }]
});
await job.updateProgress(90);
const result = message.content[0].type === 'text'
? message.content[0].text : '';
// Ergebnis in DB speichern
await saveAnalysisResult(documentId, analysisType, result);
await job.updateProgress(100);
return { documentId, analysisType, tokensUsed: message.usage.output_tokens };
},
{
connection,
concurrency: 5, // Max 5 parallele Claude-Calls
limiter: {
max: 50,
duration: 60000 // Max 50 Requests/Minute (Anthropic Tier 1)
}
}
);
Rate-Limiting für Anthropic: Tier-1-Accounts haben standardmäßig 50 RPM. Mit limiter.max: 50, duration: 60000 bleibt die Queue sicher unter diesem Limit — auch wenn 500 Jobs gleichzeitig in der Queue warten. BullMQ verteilt sie automatisch.
Der API-Endpoint gibt sofort zurück:
// api/upload.ts — Non-blocking Upload
app.post('/upload-documents', async (req, res) => {
const { documents } = req.body; // Array von 100 Docs
const jobIds = await Promise.all(
documents.map((doc: any, i: number) =>
aiQueue.add('analyze', doc, {
priority: i === 0 ? 1 : 5, // Erstes Dokument hat Prio
jobId: `doc-${doc.id}` // Deduplizierung via jobId
})
)
);
res.json({
status: 'queued',
jobCount: jobIds.length,
checkStatusAt: `/api/jobs/${jobIds[0]}`
}); // Sofortige Antwort!
});
3b. Email-Queue: Newsletter-Versand mit Rate-Limiting
10.000 Emails direkt in einer Schleife zu versenden führt garantiert zu SMTP-Throttling oder einer IP-Sperrung. Eine Email-Queue mit konfigurierbaren Delays ist die einzig skalierbare Lösung.
// email-worker.ts — Newsletter Queue mit Rate-Limiting
import { Worker, Queue } from 'bullmq';
import nodemailer from 'nodemailer';
export const emailQueue = new Queue('email-send', {
connection,
defaultJobOptions: {
attempts: 5,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: { count: 50 },
removeOnFail: false // Fehler aufbewahren für Debug
}
});
const transporter = nodemailer.createTransport({
host: process.env.SMTP_HOST,
port: 587,
auth: {
user: process.env.SMTP_USER,
pass: process.env.SMTP_PASS
}
});
const emailWorker = new Worker(
'email-send',
async (job) => {
const { to, subject, html, from } = job.data;
await transporter.sendMail({ from, to, subject, html });
console.log(`✓ Email an ${to} gesendet (Job ${job.id})`);
},
{
connection,
concurrency: 3, // Nie mehr als 3 gleichzeitige SMTP-Verbindungen
limiter: {
max: 100,
duration: 3600000 // Max 100 Emails/Stunde
}
}
);
// Newsletter-Batch einstellen
export async function scheduleNewsletter(
recipients: string[],
subject: string,
html: string
) {
const jobs = recipients.map((email, i) => ({
name: 'send',
data: { to: email, subject, html, from: 'newsletter@example.com' },
opts: {
delay: i * 100, // 100ms Abstand zwischen Emails
jobId: `newsletter-${subject}-${email}` // Duplikat-Schutz
}
}));
await emailQueue.addBulk(jobs);
console.log(`${recipients.length} Emails eingereiht`);
}
3c. Image-Processing-Queue: Bilder konvertieren und skalieren
Bilder serverseitig zu transformieren ist CPU-intensiv. Eine Queue mit begrenzter Concurrency verhindert, dass ein Batch-Upload den gesamten Server lahmlegt.
// image-worker.ts — Sharp-basiertes Image Processing
import { Worker } from 'bullmq';
import sharp from 'sharp';
import path from 'path';
interface ImageJob {
inputPath: string;
outputDir: string;
sizes: { width: number; suffix: string }[];
format: 'webp' | 'avif' | 'jpeg';
}
const imageWorker = new Worker<ImageJob>(
'image-processing',
async (job) => {
const { inputPath, outputDir, sizes, format } = job.data;
const results: string[] = [];
for (const [i, size] of sizes.entries()) {
const filename = path.basename(inputPath, path.extname(inputPath));
const outputPath = path.join(outputDir, `${filename}-${size.suffix}.${format}`);
await sharp(inputPath)
.resize(size.width)
[format]({ quality: 85 })
.toFile(outputPath);
results.push(outputPath);
await job.updateProgress(Math.round(((i + 1) / sizes.length) * 100));
}
return { processed: results.length, outputs: results };
},
{
connection,
concurrency: 2 // CPU-intensiv: nur 2 parallele Jobs
}
);
4. Job-Prioritäten, Retries und Timeouts konfigurieren
BullMQ bietet fein granulare Kontrolle über das Job-Verhalten. Diese Einstellungen machen den Unterschied zwischen einem robusten System und einem das bei der ersten Netzwerkstörung zusammenbricht.
// Vollständige Job-Optionen Referenz
const jobOptions = {
// Priorität: 1 (höchste) bis ~2^21 (niedrigste)
priority: 1,
// Retry-Konfiguration
attempts: 5,
backoff: {
type: 'exponential', // oder 'fixed'
delay: 1000 // Basis-Delay in ms
// Delays: 1s, 2s, 4s, 8s, 16s
},
// Timeout: Job wird nach N ms als fehlgeschlagen markiert
timeout: 30000, // 30 Sekunden
// Verzögerter Start (Scheduled Jobs)
delay: 5000, // Job startet erst nach 5s
// Wiederkehrende Jobs
repeat: {
pattern: '0 9 * * 1-5', // Montag-Freitag 9 Uhr
tz: 'Europe/Berlin'
},
// Aufräumen
removeOnComplete: {
count: 500, // Max 500 erledigte Jobs behalten
age: 86400 // Älter als 24h löschen
},
removeOnFail: {
count: 1000 // Mehr fehlgeschlagene Jobs aufbewahren
},
// Deduplizierung
jobId: `user-${userId}-report` // Gleiche ID = nicht doppelt eingereiht
};
Praxistipp: Unterschiedliche Queues für unterschiedliche Prioritäten
Statt alle Jobs in eine Queue zu packen, lohnt sich ein Multi-Queue-Setup: high-priority, default, low-priority. Jede Queue bekommt eigene Worker mit unterschiedlicher Concurrency. Kritische Jobs (Passwortzurücksetzung) warten nie hinter Batch-Exports.
5. Dead Letter Queue: fehlgeschlagene Jobs handeln
Auch nach 5 Retries können Jobs fehlschlagen — falsches Dokumentformat, temporär nicht erreichbare externe API, unerwartete Datenstruktur. Diese Jobs einfach zu vergessen ist keine Option. BullMQ parkt sie in einem failed-Zustand, aber eine explizite Dead Letter Queue macht den Umgang damit strukturierter.
// dead-letter-queue.ts — Fehlgeschlagene Jobs verwalten
import { Queue, Worker, QueueEvents } from 'bullmq';
const dlq = new Queue('dead-letter', { connection });
// Fehlgeschlagene Jobs automatisch in DLQ verschieben
const queueEvents = new QueueEvents('ai-analysis', { connection });
queueEvents.on('failed', async ({ jobId, failedReason }) => {
const job = await aiQueue.getJob(jobId);
if (!job) return;
// Nur nach Ausschöpfung aller Retries
if (job.attemptsMade >= (job.opts.attempts || 3)) {
await dlq.add('failed-job', {
originalQueue: 'ai-analysis',
originalJobId: jobId,
originalData: job.data,
failedReason,
failedAt: new Date().toISOString(),
attemptsMade: job.attemptsMade
});
console.error(`[DLQ] Job ${jobId} nach ${job.attemptsMade} Versuchen in DLQ`);
}
});
// DLQ-Worker: Alerts senden + in DB protokollieren
const dlqWorker = new Worker('dead-letter', async (job) => {
const { originalJobId, failedReason, originalData } = job.data;
// Alert-Email an Entwickler senden
await emailQueue.add('send', {
to: process.env.ALERT_EMAIL,
subject: `[ALERT] Job ${originalJobId} fehlgeschlagen`,
html: `<p>Reason: ${failedReason}</p><pre>${JSON.stringify(originalData, null, 2)}</pre>`
});
// In DB für manuelle Nachbearbeitung speichern
await db.query(
'INSERT INTO failed_jobs (job_id, reason, data) VALUES ($1, $2, $3)',
[originalJobId, failedReason, originalData]
);
}, { connection, concurrency: 1 });
// Manuelles Retry eines DLQ-Jobs
export async function retryFromDLQ(dlqJobId: string) {
const job = await dlq.getJob(dlqJobId);
if (!job) throw new Error('DLQ-Job nicht gefunden');
await aiQueue.add('analyze', job.data.originalData, { attempts: 3 });
await job.remove();
console.log(`[DLQ] Job ${dlqJobId} erneut eingereiht`);
}
DLQ Best Practice: Fehlgeschlagene Jobs nie automatisch unendlich oft retrien — das führt zu Loops bei strukturellen Fehlern (z.B. falsches Datenformat). Stattdessen: maximale Retries definieren → DLQ → menschliche Prüfung → bewusstes manuelles Retry.
6. Monitoring: Bull Board für Queue-Übersicht
Eine Queue ohne Monitoring ist eine Black Box. Bull Board ist das offizielle Dashboard für BullMQ — zeigt aktive, wartende, fehlgeschlagene und erledigte Jobs in Echtzeit, direkt im Browser.
// bullboard.ts — Dashboard Setup mit Express
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import express from 'express';
import basicAuth from 'express-basic-auth';
npm install @bull-board/api @bull-board/express express-basic-auth
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(documentQueue),
new BullMQAdapter(emailQueue),
new BullMQAdapter(imageQueue),
new BullMQAdapter(dlq)
],
serverAdapter
});
const app = express();
// Passwort-Schutz für Production
app.use(
'/admin/queues',
basicAuth({
users: { admin: process.env.BULL_BOARD_PASSWORD! },
challenge: true
}),
serverAdapter.getRouter()
);
app.listen(3001, () =>
console.log('Bull Board: http://localhost:3001/admin/queues'));
Was Bull Board zeigt: Anzahl wartender/aktiver/erledigter/fehlgeschlagener Jobs pro Queue, Job-Details mit Payload und Fehler-Stack-Trace, manuelle Retry-Buttons, Durchsatz-Charts. Für Production unbedingt mit express-basic-auth absichern.
Zusammenfassung: Der vollständige Queue-Stack
- ✓ BullMQ + Redis als Fundament — battle-tested, TypeScript-first, aktiv gewartet
- ✓ Producer/Consumer-Trennung — Enqueue in der API, Verarbeitung im Worker-Prozess
- ✓ Concurrency + Rate-Limiting — verhindert API-Throttling und CPU-Überlastung
- ✓ Exponential Backoff — gibt externe Services Zeit zur Erholung
- ✓ Job-Prioritäten — kritische Operationen überholen Batch-Jobs
- ✓ Dead Letter Queue — fehlgeschlagene Jobs gehen nicht verloren
- ✓ Bull Board — Echtzeit-Monitoring ohne eigene Infrastruktur
Claude Code Prompt für den kompletten Stack
Mit folgendem Prompt generiert Claude Code das gesamte Setup in einem Schritt: "Erstelle ein vollständiges BullMQ-Queue-System in TypeScript mit: Redis-Verbindung, drei Queues (ai-processing, email, images), Worker für jede Queue, Dead Letter Queue, Bull Board Dashboard auf Port 3001, docker-compose.yml mit Redis. Füge Kommentare für alle nicht-offensichtlichen Konfigurationen hinzu."
KI-Workflows professionell automatisieren
Von der Queue bis zum Deployment — teste wie Claude Code deinen kompletten Backend-Stack in Minuten aufbaut.
14 Tage kostenlos testen →