Node.js & Backend

Node.js Streams mit Claude Code: Datenverarbeitung 2026

Node.js Streams für effiziente Datenverarbeitung — Claude Code baut Readable, Transform und Writable Streams mit pipeline() und Backpressure-Handling.

📅 6. Mai 2026 ⏱ 11 min Lesezeit 🏗 Node.js & Backend

Streams gehören zum Herzstück von Node.js. Wer große Dateien verarbeitet, HTTP-Requests streamt oder Echtzeit-Daten transformiert, kommt an Streams nicht vorbei. In diesem Guide zeigt Claude Code, wie du Readable, Writable, Transform und Duplex Streams effizient einsetzt — von den Grundlagen bis zur modernen Web Streams API in Node.js 18+.

1. Streams Grundlagen — Readable, Writable, Duplex, Transform

Node.js kennt vier Stream-Typen. Jeder hat eine klare Rolle in der Daten-Pipeline:

📺
Readable
Lesen von Daten. Z.B. Dateien, HTTP-Requests, stdin
📝
Writable
Schreiben von Daten. Z.B. Dateien, HTTP-Responses, stdout
🔄
Duplex
Beides gleichzeitig — lesen & schreiben. Z.B. TCP-Sockets
⚙️
Transform
Lesen, transformieren, schreiben. Z.B. Kompression, Parsing

ObjectMode

Standardmäßig arbeiten Streams mit Buffer- oder String-Daten. Mit objectMode: true können beliebige JavaScript-Objekte gestreamt werden — ideal für strukturierte Datenverarbeitung.

Readable Stream Events im Überblick
// Die wichtigsten Stream-Events
const { createReadStream } = require('fs');
const stream = createReadStream('./data.csv');

// 'data'  — Chunk verfügbar (flowing mode)
stream.on('data', (chunk) => {
  console.log(`Chunk: ${chunk.length} Bytes`);
});

// 'end'   — Kein Datum mehr
stream.on('end', () => console.log('Fertig!'));

// 'error' — Fehler im Stream
stream.on('error', (err) => console.error('Stream Error:', err));

// 'close' — Stream geschlossen, Ressourcen freigegeben
stream.on('close', () => console.log('Stream closed'));

// Writable-Events
const { createWriteStream } = require('fs');
const writer = createWriteStream('./out.csv');

writer.on('drain', () => console.log('Buffer geleert, weiter schreiben!'));
writer.on('finish', () => console.log('Writable fertig'));
Praxis-Tipp

Verwende Streams für alles, was größer als 1 MB ist. Streams laden Daten in kleinen Chunks in den Arbeitsspeicher — so vermeidest du Memory-Overflows bei großen CSV-, JSON- oder Mediendateien.

2. Readable Streams — createReadStream, Readable.from(), async iterator

Readable Streams gibt es in zwei Modi: flowing (Daten kommen automatisch via data-Event) und paused (Daten werden explizit mit read() abgerufen). Claude Code empfiehlt den async iterator als modernste und sicherste Variante.

Readable Drei Arten, einen Readable Stream zu konsumieren
const fs = require('fs');
const { Readable } = require('stream');

// 1. createReadStream — klassisch, für Dateien
const fileStream = fs.createReadStream('./large.json', {
  encoding: 'utf8',
  highWaterMark: 64 * 1024 // 64 KB Chunks
});

// 2. Readable.from() — aus Array, Generator oder iterable
async function* generateData() {
  for (let i = 0; i < 1000; i++) {
    yield JSON.stringify({ id: i, value: Math.random() }) + '\n';
  }
}
const generatorStream = Readable.from(generateData());

// 3. Async Iterator — modern, sauber, try/finally für Cleanup
async function processFile(path) {
  const stream = fs.createReadStream(path, { encoding: 'utf8' });
  let totalChars = 0;

  try {
    for await (const chunk of stream) {
      totalChars += chunk.length;
      // Verarbeite jeden Chunk sequentiell
    }
  } finally {
    stream.destroy(); // Ressourcen freigeben
  }

  return totalChars;
}

Custom Readable mit push() und null

const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max) {
    super({ objectMode: true });
    this.max = max;
    this.current = 0;
  }

  _read() {
    if (this.current >= this.max) {
      this.push(null); // Signal: Stream beendet
      return;
    }
    this.push({ id: this.current++, ts: Date.now() });
  }
}

const counter = new CounterStream(5);
for await (const item of counter) {
  console.log(item); // { id: 0, ts: ... }, { id: 1, ts: ... }, ...
}
Flowing vs Paused Mode

Ein Stream startet im paused mode. Er wechselt in den flowing mode, sobald du einen data-Event-Listener hinzufügst oder resume() aufrufst. Async iterators nutzen intern den paused mode — kontrollierter und backpressure-freundlicher.

3. Transform Streams — _transform, _flush, CSV und JSON

Transform Streams sind das Schweizer Taschenmesser der Stream-Welt: Sie lesen Input, verarbeiten ihn und schreiben Output. Perfekt für CSV-Parsing, JSON-Transformation oder Datenkompression.

Transform CSV Parser als Transform Stream
const { Transform } = require('stream');

class CSVParser extends Transform {
  constructor(options = {}) {
    super({ ...options, objectMode: true });
    this.headers = null;
    this.buffer = '';
    this.lineCount = 0;
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Unvollständige Zeile aufbewahren

    for (const line of lines) {
      if (!line.trim()) continue;
      const values = line.split(',');

      if (!this.headers) {
        this.headers = values.map(h => h.trim());
      } else {
        const obj = {};
        this.headers.forEach((h, i) => obj[h] = values[i]?.trim());
        this.push(obj);
        this.lineCount++;
      }
    }
    callback();
  }

  _flush(callback) {
    // Letzten Buffer-Rest verarbeiten
    if (this.buffer.trim() && this.headers) {
      const values = this.buffer.split(',');
      const obj = {};
      this.headers.forEach((h, i) => obj[h] = values[i]?.trim());
      this.push(obj);
    }
    console.log(`CSV geparst: ${this.lineCount} Zeilen`);
    callback();
  }
}

// Verwendung
const parser = new CSVParser();
fs.createReadStream('./data.csv')
  .pipe(parser)
  .on('data', row => console.log(row));

JSON Transformer & Zlib Kompression

const zlib = require('zlib');
const { pipeline } = require('stream/promises');

// JSON-Transformer: Objekte filtern und transformieren
class JSONTransformer extends Transform {
  constructor(transformFn) {
    super({ objectMode: true });
    this.transformFn = transformFn;
  }

  _transform(obj, encoding, callback) {
    const result = this.transformFn(obj);
    if (result !== null && result !== undefined) {
      this.push(result);
    }
    callback();
  }
}

// Pipeline: CSV → Filter → JSON → Gzip → Datei
await pipeline(
  fs.createReadStream('./input.csv'),
  new CSVParser(),
  new JSONTransformer(row => row.active === 'true' ? row : null),
  new Transform({
    objectMode: true,
    writableObjectMode: true,
    transform(obj, enc, cb) {
      this.push(JSON.stringify(obj) + '\n');
      cb();
    }
  }),
  zlib.createGzip(),
  fs.createWriteStream('./output.jsonl.gz')
);
console.log('Komprimiert und gespeichert!');
_flush ist Pflicht

_flush wird aufgerufen, wenn der Input-Stream endet. Hier werden gepufferte Daten (letzte Zeile, etc.) verarbeitet. Vergiss callback() nicht — sonst hängt der Stream ewig.

4. pipeline() und compose() — Fehlerbehandlung und Cleanup

Das klassische .pipe() hat einen kritischen Fehler: Fehler propagieren nicht automatisch durch die gesamte Pipeline. stream.pipeline() und seine Promise-Variante stream/promises sind die korrekte Lösung.

Pipeline pipeline() vs pipe() — warum pipeline() gewinnt
const { pipeline: pipelineCallback } = require('stream');
const { pipeline } = require('stream/promises');
const { promisify } = require('util');

// ❌ pipe() — Fehler werden NICHT automatisch weitergeleitet!
fs.createReadStream('./in.txt')
  .pipe(someTransform)
  .pipe(fs.createWriteStream('./out.txt'));
// Bei Fehler in someTransform: out.txt bleibt offen, Memory Leak!

// ✅ pipeline() — Fehler propagieren, alle Streams werden zerstört
async function processFile(input, output) {
  try {
    await pipeline(
      fs.createReadStream(input),
      zlib.createGunzip(),      // Dekomprimieren
      new CSVParser(),          // CSV parsen
      new JSONTransformer(row => ({ ...row, processed: true })),
      stringifyStream(),        // Zurück zu String
      zlib.createGzip(),        // Komprimieren
      fs.createWriteStream(output)
    );
    console.log('Pipeline erfolgreich!');
  } catch (err) {
    // Alle Streams wurden bereits zerstört + geschlossen
    console.error('Pipeline fehlgeschlagen:', err.message);
    throw err;
  }
}

// Ältere Codebase? promisify nutzen:
const pipelineAsync = promisify(pipelineCallback);
await pipelineAsync(source, transform, dest);

stream.compose() — Streams kombinieren

const { compose } = require('stream');

// compose() bündelt mehrere Streams zu EINEM wiederverwendbaren Stream
const csvToJsonGzip = compose(
  new CSVParser(),
  new JSONTransformer(row => row),
  stringifyStream(),
  zlib.createGzip()
);

// Jetzt als eigenständiger Stream verwendbar:
await pipeline(
  fs.createReadStream('./data.csv'),
  csvToJsonGzip, // Wiederverwendbarer zusammengesetzter Stream
  fs.createWriteStream('./data.jsonl.gz')
);
Wichtig: Cleanup bei Fehlern

pipeline() ruft automatisch stream.destroy(err) auf allen Streams auf, wenn ein Fehler auftritt. Das verhindert Memory Leaks und offene File-Handles. Mit pipe() musst du das manuell implementieren — ein häufiger Bug in Production-Code.

5. Backpressure — highWaterMark, drain Event, warum es wichtig ist

Backpressure ist der Mechanismus, der verhindert, dass ein schneller Producer einen langsamen Consumer überwältigt. Ohne Backpressure-Handling kann Node.js Hunderte von MB in den RAM laden, bevor der Consumer aufholt.

Backpressure highWaterMark und drain — korrekte Implementierung
const fs = require('fs');

// highWaterMark = interner Buffer-Schwellwert (Standard: 16 KB für Bytes)
const readable = fs.createReadStream('./huge-file.bin', {
  highWaterMark: 128 * 1024 // 128 KB
});
const writable = fs.createWriteStream('./output.bin', {
  highWaterMark: 64 * 1024  // 64 KB
});

// Korrekte Backpressure-Behandlung:
function pumpWithBackpressure(readable, writable) {
  return new Promise((resolve, reject) => {
    readable.on('data', (chunk) => {
      // write() gibt false zurück wenn Buffer voll
      const canContinue = writable.write(chunk);

      if (!canContinue) {
        readable.pause(); // Producer pausieren!
        console.log('Backpressure: readable pausiert');

        writable.once('drain', () => {
          console.log('drain: readable fortgesetzt');
          readable.resume(); // Consumer bereit → weitermachen
        });
      }
    });

    readable.on('end', () => writable.end());
    writable.on('finish', resolve);
    readable.on('error', reject);
    writable.on('error', reject);
  });
}

// In der Praxis: pipeline() macht das automatisch!
await pipeline(readable, writable);

Backpressure in objektbasierten Streams messen

class ThrottledWriter extends Writable {
  constructor() {
    super({
      objectMode: true,
      highWaterMark: 100 // Max 100 Objekte im Buffer
    });
  }

  async _write(obj, encoding, callback) {
    try {
      await saveToDatabase(obj);
      callback(); // Signalisiert: bereit für nächstes Objekt
    } catch (err) {
      callback(err); // Fehler weitergeben
    }
  }
}

// Monitoring: writableLength zeigt aktuellen Buffer-Füllstand
setInterval(() => {
  console.log(`Buffer: ${writer.writableLength}/${writer.writableHighWaterMark}`);
}, 1000);
Warum Backpressure kritisch ist

Ohne Backpressure kann ein schneller File-Reader (GB/s) einen langsamen DB-Writer (MB/s) so überlasten, dass Node.js abstürzt. pipeline() implementiert Backpressure automatisch. Bei manuellem pipe() oder Custom-Streams musst du es selbst verwalten.

6. Web Streams API — ReadableStream, WritableStream, fetch Streaming

Seit Node.js 18 ist die WHATWG Web Streams API als globals verfügbar — dieselbe API wie im Browser. Das ermöglicht isomorphen Code und direktes Streaming von fetch().

Web Streams ReadableStream und fetch() Streaming — Node.js 18+
// Node.js 18+: Web Streams als globals verfügbar

// 1. fetch() Response als Stream verarbeiten
async function streamLargeDownload(url, outputPath) {
  const response = await fetch(url);

  if (!response.ok) throw new Error(`HTTP ${response.status}`);

  // response.body ist ein ReadableStream (WHATWG)
  const reader = response.body.getReader();
  const writer = fs.createWriteStream(outputPath);

  try {
    while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      writer.write(value); // Uint8Array → Node.js Writable
    }
  } finally {
    reader.releaseLock();
    writer.end();
  }
}

// 2. ReadableStream selbst erstellen
const webReadable = new ReadableStream({
  async start(controller) {
    const data = ['Hello', ' ', 'World'];
    for (const chunk of data) {
      controller.enqueue(new TextEncoder().encode(chunk));
      await new Promise(r => setTimeout(r, 100));
    }
    controller.close();
  }
});

// 3. Web → Node.js Streams Interop (Node.js 17+)
const { Readable, Writable } = require('stream');

const nodeReadable = Readable.fromWeb(webReadable);
const nodeWritable = Writable.toWeb(fs.createWriteStream('./out.txt'));

await pipeline(nodeReadable, someTransform(), fs.createWriteStream('./result.txt'));

TransformStream — Web Streams API

// TransformStream: Web Streams äquivalent zu Node.js Transform
const upperCaseTransform = new TransformStream({
  transform(chunk, controller) {
    const text = new TextDecoder().decode(chunk);
    controller.enqueue(
      new TextEncoder().encode(text.toUpperCase())
    );
  }
});

// Direkt in Fetch Response Pipeline
const response = await fetch('https://api.example.com/stream');
const transformed = response.body
  .pipeThrough(upperCaseTransform)
  .pipeThrough(new TextDecoderStream());

const reader = transformed.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  process.stdout.write(value);
}

// Node.js → Web Stream (für isomorphen Code)
const { ReadableStream: WebReadable } = require('stream/web');
const webStream = Readable.toWeb(fs.createReadStream('./data.bin'));
Node.js 18+ Empfehlung

Nutze Web Streams API für neuen isomorphen Code (läuft in Node.js und Browser). Nutze Node.js Streams für Legacy-Kompatibilität, performance-kritische Server-Workloads und wenn du das reiche Ecosystem (zlib, crypto, etc.) brauchst. Readable.fromWeb() und Readable.toWeb() verbinden beide Welten nahtlos.

Zusammenfassung: Stream-Wahl nach Anwendungsfall

Claude Code hilft dir dabei, diese Muster korrekt zu implementieren — von der initialen Stream-Architektur über Backpressure-Handling bis hin zur vollständigen Fehlerbehandlung mit pipeline(). Streams sind mächtig, aber komplex: Mit dem richtigen AI-Assistenten sparst du stundenlange Debugging-Sessions.

Node.js Streams mit AI-Unterstützung bauen

Claude Code analysiert deine Stream-Pipelines, findet Backpressure-Probleme und schreibt fehlerfreien Transform-Code. Teste es 14 Tage kostenlos.

Kostenlos testen — kein Kreditkarte