Streams gehören zum Herzstück von Node.js. Wer große Dateien verarbeitet, HTTP-Requests streamt oder Echtzeit-Daten transformiert, kommt an Streams nicht vorbei. In diesem Guide zeigt Claude Code, wie du Readable, Writable, Transform und Duplex Streams effizient einsetzt — von den Grundlagen bis zur modernen Web Streams API in Node.js 18+.
1. Streams Grundlagen — Readable, Writable, Duplex, Transform
Node.js kennt vier Stream-Typen. Jeder hat eine klare Rolle in der Daten-Pipeline:
ObjectMode
Standardmäßig arbeiten Streams mit Buffer- oder String-Daten. Mit objectMode: true können beliebige JavaScript-Objekte gestreamt werden — ideal für strukturierte Datenverarbeitung.
// Die wichtigsten Stream-Events
const { createReadStream } = require('fs');
const stream = createReadStream('./data.csv');
// 'data' — Chunk verfügbar (flowing mode)
stream.on('data', (chunk) => {
console.log(`Chunk: ${chunk.length} Bytes`);
});
// 'end' — Kein Datum mehr
stream.on('end', () => console.log('Fertig!'));
// 'error' — Fehler im Stream
stream.on('error', (err) => console.error('Stream Error:', err));
// 'close' — Stream geschlossen, Ressourcen freigegeben
stream.on('close', () => console.log('Stream closed'));
// Writable-Events
const { createWriteStream } = require('fs');
const writer = createWriteStream('./out.csv');
writer.on('drain', () => console.log('Buffer geleert, weiter schreiben!'));
writer.on('finish', () => console.log('Writable fertig'));
Verwende Streams für alles, was größer als 1 MB ist. Streams laden Daten in kleinen Chunks in den Arbeitsspeicher — so vermeidest du Memory-Overflows bei großen CSV-, JSON- oder Mediendateien.
2. Readable Streams — createReadStream, Readable.from(), async iterator
Readable Streams gibt es in zwei Modi: flowing (Daten kommen automatisch via data-Event) und paused (Daten werden explizit mit read() abgerufen). Claude Code empfiehlt den async iterator als modernste und sicherste Variante.
const fs = require('fs');
const { Readable } = require('stream');
// 1. createReadStream — klassisch, für Dateien
const fileStream = fs.createReadStream('./large.json', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64 KB Chunks
});
// 2. Readable.from() — aus Array, Generator oder iterable
async function* generateData() {
for (let i = 0; i < 1000; i++) {
yield JSON.stringify({ id: i, value: Math.random() }) + '\n';
}
}
const generatorStream = Readable.from(generateData());
// 3. Async Iterator — modern, sauber, try/finally für Cleanup
async function processFile(path) {
const stream = fs.createReadStream(path, { encoding: 'utf8' });
let totalChars = 0;
try {
for await (const chunk of stream) {
totalChars += chunk.length;
// Verarbeite jeden Chunk sequentiell
}
} finally {
stream.destroy(); // Ressourcen freigeben
}
return totalChars;
}
Custom Readable mit push() und null
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max) {
super({ objectMode: true });
this.max = max;
this.current = 0;
}
_read() {
if (this.current >= this.max) {
this.push(null); // Signal: Stream beendet
return;
}
this.push({ id: this.current++, ts: Date.now() });
}
}
const counter = new CounterStream(5);
for await (const item of counter) {
console.log(item); // { id: 0, ts: ... }, { id: 1, ts: ... }, ...
}
Ein Stream startet im paused mode. Er wechselt in den flowing mode, sobald du einen data-Event-Listener hinzufügst oder resume() aufrufst. Async iterators nutzen intern den paused mode — kontrollierter und backpressure-freundlicher.
3. Transform Streams — _transform, _flush, CSV und JSON
Transform Streams sind das Schweizer Taschenmesser der Stream-Welt: Sie lesen Input, verarbeiten ihn und schreiben Output. Perfekt für CSV-Parsing, JSON-Transformation oder Datenkompression.
const { Transform } = require('stream');
class CSVParser extends Transform {
constructor(options = {}) {
super({ ...options, objectMode: true });
this.headers = null;
this.buffer = '';
this.lineCount = 0;
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Unvollständige Zeile aufbewahren
for (const line of lines) {
if (!line.trim()) continue;
const values = line.split(',');
if (!this.headers) {
this.headers = values.map(h => h.trim());
} else {
const obj = {};
this.headers.forEach((h, i) => obj[h] = values[i]?.trim());
this.push(obj);
this.lineCount++;
}
}
callback();
}
_flush(callback) {
// Letzten Buffer-Rest verarbeiten
if (this.buffer.trim() && this.headers) {
const values = this.buffer.split(',');
const obj = {};
this.headers.forEach((h, i) => obj[h] = values[i]?.trim());
this.push(obj);
}
console.log(`CSV geparst: ${this.lineCount} Zeilen`);
callback();
}
}
// Verwendung
const parser = new CSVParser();
fs.createReadStream('./data.csv')
.pipe(parser)
.on('data', row => console.log(row));
JSON Transformer & Zlib Kompression
const zlib = require('zlib');
const { pipeline } = require('stream/promises');
// JSON-Transformer: Objekte filtern und transformieren
class JSONTransformer extends Transform {
constructor(transformFn) {
super({ objectMode: true });
this.transformFn = transformFn;
}
_transform(obj, encoding, callback) {
const result = this.transformFn(obj);
if (result !== null && result !== undefined) {
this.push(result);
}
callback();
}
}
// Pipeline: CSV → Filter → JSON → Gzip → Datei
await pipeline(
fs.createReadStream('./input.csv'),
new CSVParser(),
new JSONTransformer(row => row.active === 'true' ? row : null),
new Transform({
objectMode: true,
writableObjectMode: true,
transform(obj, enc, cb) {
this.push(JSON.stringify(obj) + '\n');
cb();
}
}),
zlib.createGzip(),
fs.createWriteStream('./output.jsonl.gz')
);
console.log('Komprimiert und gespeichert!');
_flush wird aufgerufen, wenn der Input-Stream endet. Hier werden gepufferte Daten (letzte Zeile, etc.) verarbeitet. Vergiss callback() nicht — sonst hängt der Stream ewig.
4. pipeline() und compose() — Fehlerbehandlung und Cleanup
Das klassische .pipe() hat einen kritischen Fehler: Fehler propagieren nicht automatisch durch die gesamte Pipeline. stream.pipeline() und seine Promise-Variante stream/promises sind die korrekte Lösung.
const { pipeline: pipelineCallback } = require('stream');
const { pipeline } = require('stream/promises');
const { promisify } = require('util');
// ❌ pipe() — Fehler werden NICHT automatisch weitergeleitet!
fs.createReadStream('./in.txt')
.pipe(someTransform)
.pipe(fs.createWriteStream('./out.txt'));
// Bei Fehler in someTransform: out.txt bleibt offen, Memory Leak!
// ✅ pipeline() — Fehler propagieren, alle Streams werden zerstört
async function processFile(input, output) {
try {
await pipeline(
fs.createReadStream(input),
zlib.createGunzip(), // Dekomprimieren
new CSVParser(), // CSV parsen
new JSONTransformer(row => ({ ...row, processed: true })),
stringifyStream(), // Zurück zu String
zlib.createGzip(), // Komprimieren
fs.createWriteStream(output)
);
console.log('Pipeline erfolgreich!');
} catch (err) {
// Alle Streams wurden bereits zerstört + geschlossen
console.error('Pipeline fehlgeschlagen:', err.message);
throw err;
}
}
// Ältere Codebase? promisify nutzen:
const pipelineAsync = promisify(pipelineCallback);
await pipelineAsync(source, transform, dest);
stream.compose() — Streams kombinieren
const { compose } = require('stream');
// compose() bündelt mehrere Streams zu EINEM wiederverwendbaren Stream
const csvToJsonGzip = compose(
new CSVParser(),
new JSONTransformer(row => row),
stringifyStream(),
zlib.createGzip()
);
// Jetzt als eigenständiger Stream verwendbar:
await pipeline(
fs.createReadStream('./data.csv'),
csvToJsonGzip, // Wiederverwendbarer zusammengesetzter Stream
fs.createWriteStream('./data.jsonl.gz')
);
pipeline() ruft automatisch stream.destroy(err) auf allen Streams auf, wenn ein Fehler auftritt. Das verhindert Memory Leaks und offene File-Handles. Mit pipe() musst du das manuell implementieren — ein häufiger Bug in Production-Code.
5. Backpressure — highWaterMark, drain Event, warum es wichtig ist
Backpressure ist der Mechanismus, der verhindert, dass ein schneller Producer einen langsamen Consumer überwältigt. Ohne Backpressure-Handling kann Node.js Hunderte von MB in den RAM laden, bevor der Consumer aufholt.
const fs = require('fs');
// highWaterMark = interner Buffer-Schwellwert (Standard: 16 KB für Bytes)
const readable = fs.createReadStream('./huge-file.bin', {
highWaterMark: 128 * 1024 // 128 KB
});
const writable = fs.createWriteStream('./output.bin', {
highWaterMark: 64 * 1024 // 64 KB
});
// Korrekte Backpressure-Behandlung:
function pumpWithBackpressure(readable, writable) {
return new Promise((resolve, reject) => {
readable.on('data', (chunk) => {
// write() gibt false zurück wenn Buffer voll
const canContinue = writable.write(chunk);
if (!canContinue) {
readable.pause(); // Producer pausieren!
console.log('Backpressure: readable pausiert');
writable.once('drain', () => {
console.log('drain: readable fortgesetzt');
readable.resume(); // Consumer bereit → weitermachen
});
}
});
readable.on('end', () => writable.end());
writable.on('finish', resolve);
readable.on('error', reject);
writable.on('error', reject);
});
}
// In der Praxis: pipeline() macht das automatisch!
await pipeline(readable, writable);
Backpressure in objektbasierten Streams messen
class ThrottledWriter extends Writable {
constructor() {
super({
objectMode: true,
highWaterMark: 100 // Max 100 Objekte im Buffer
});
}
async _write(obj, encoding, callback) {
try {
await saveToDatabase(obj);
callback(); // Signalisiert: bereit für nächstes Objekt
} catch (err) {
callback(err); // Fehler weitergeben
}
}
}
// Monitoring: writableLength zeigt aktuellen Buffer-Füllstand
setInterval(() => {
console.log(`Buffer: ${writer.writableLength}/${writer.writableHighWaterMark}`);
}, 1000);
Ohne Backpressure kann ein schneller File-Reader (GB/s) einen langsamen DB-Writer (MB/s) so überlasten, dass Node.js abstürzt. pipeline() implementiert Backpressure automatisch. Bei manuellem pipe() oder Custom-Streams musst du es selbst verwalten.
6. Web Streams API — ReadableStream, WritableStream, fetch Streaming
Seit Node.js 18 ist die WHATWG Web Streams API als globals verfügbar — dieselbe API wie im Browser. Das ermöglicht isomorphen Code und direktes Streaming von fetch().
// Node.js 18+: Web Streams als globals verfügbar
// 1. fetch() Response als Stream verarbeiten
async function streamLargeDownload(url, outputPath) {
const response = await fetch(url);
if (!response.ok) throw new Error(`HTTP ${response.status}`);
// response.body ist ein ReadableStream (WHATWG)
const reader = response.body.getReader();
const writer = fs.createWriteStream(outputPath);
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
writer.write(value); // Uint8Array → Node.js Writable
}
} finally {
reader.releaseLock();
writer.end();
}
}
// 2. ReadableStream selbst erstellen
const webReadable = new ReadableStream({
async start(controller) {
const data = ['Hello', ' ', 'World'];
for (const chunk of data) {
controller.enqueue(new TextEncoder().encode(chunk));
await new Promise(r => setTimeout(r, 100));
}
controller.close();
}
});
// 3. Web → Node.js Streams Interop (Node.js 17+)
const { Readable, Writable } = require('stream');
const nodeReadable = Readable.fromWeb(webReadable);
const nodeWritable = Writable.toWeb(fs.createWriteStream('./out.txt'));
await pipeline(nodeReadable, someTransform(), fs.createWriteStream('./result.txt'));
TransformStream — Web Streams API
// TransformStream: Web Streams äquivalent zu Node.js Transform
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
const text = new TextDecoder().decode(chunk);
controller.enqueue(
new TextEncoder().encode(text.toUpperCase())
);
}
});
// Direkt in Fetch Response Pipeline
const response = await fetch('https://api.example.com/stream');
const transformed = response.body
.pipeThrough(upperCaseTransform)
.pipeThrough(new TextDecoderStream());
const reader = transformed.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
process.stdout.write(value);
}
// Node.js → Web Stream (für isomorphen Code)
const { ReadableStream: WebReadable } = require('stream/web');
const webStream = Readable.toWeb(fs.createReadStream('./data.bin'));
Nutze Web Streams API für neuen isomorphen Code (läuft in Node.js und Browser). Nutze Node.js Streams für Legacy-Kompatibilität, performance-kritische Server-Workloads und wenn du das reiche Ecosystem (zlib, crypto, etc.) brauchst. Readable.fromWeb() und Readable.toWeb() verbinden beide Welten nahtlos.
Zusammenfassung: Stream-Wahl nach Anwendungsfall
- Große Dateien lesen:
createReadStream+async iterator - CSV/JSON transformieren: Custom
TransformStream mitobjectMode - Mehrere Streams verketten:
pipeline()ausstream/promises - Wiederverwendbare Pipelines:
stream.compose() - Memory-Kontrolle:
highWaterMarkanpassen +drainEvent - fetch() streamen: Web Streams API mit
response.body.getReader() - Isomorpher Code:
ReadableStream/TransformStream(Node.js 18+)
Claude Code hilft dir dabei, diese Muster korrekt zu implementieren — von der initialen Stream-Architektur über Backpressure-Handling bis hin zur vollständigen Fehlerbehandlung mit pipeline(). Streams sind mächtig, aber komplex: Mit dem richtigen AI-Assistenten sparst du stundenlange Debugging-Sessions.