Node.js Interview Prep
Streams and Buffers

Practical Stream Patterns -- pipeline, Transform, and Real Workflows

Practical Stream Patterns -- pipeline, Transform, and Real Workflows

LinkedIn Hook

"Your Node.js server crashed in production -- not because of traffic, but because someone uploaded a 2 GB CSV."

The code looked harmless. fs.readFile('huge.csv'), parse it, send the result. Works perfectly in development with a 5 MB sample. Then a real user uploads a real file, and the entire process tries to allocate two gigabytes of RAM in one shot. The OS kills it. The pager goes off.

The fix is not "buy bigger servers." The fix is streams.

Streams let Node.js process gigabytes of data using kilobytes of memory. You read a chunk, transform it, write it, and forget it. The garbage collector reclaims memory continuously. CPU and I/O work in parallel instead of taking turns.

But streams have a reputation for being hard. Half the tutorials online still teach .pipe() chains that silently swallow errors. Half the production bugs I see come from forgetting that a 'error' listener on the source does not propagate to the destination.

In Lesson 4.3 I cover the patterns that actually matter: pipeline() for safe error handling, Transform streams you write yourself, readline for line-by-line CSV processing, zlib for gzip compression, and the rule for when to reach for fs.readFile instead of streaming.

Read the full lesson -> [link]

#NodeJS #BackendDevelopment #Streams #SystemDesign #InterviewPrep


Practical Stream Patterns -- pipeline, Transform, and Real Workflows thumbnail


What You'll Learn

  • How to copy files with readable.pipe(writable) and why pipe is just one line of code
  • How to process gigabyte-sized CSV and JSON files line-by-line with the readline module
  • How to build your own Transform stream by extending stream.Transform
  • How to chain compression with zlib.createGzip() into a stream pipeline
  • How to stream HTTP responses by writing directly to res (which is itself a Writable)
  • Why pipeline() is strictly better than .pipe() for error handling and resource cleanup
  • When fs.readFile is the right tool and when streaming is mandatory

The Factory Assembly Line Analogy

Picture two ways to build a car. The first way: bring every part for every car into one giant warehouse, lay them all out on the floor, then assemble all the cars at once. You need a warehouse the size of a stadium. If a single bolt is missing, the entire build is delayed. If the warehouse roof leaks, every car gets ruined at once.

The second way: an assembly line. A frame moves slowly down a conveyor belt. At station one, workers attach the engine. At station two, workers attach the wheels. At station three, painters spray the body. At every moment, only one car's worth of parts sits at each station. Memory -- the floor space -- stays small no matter how many cars you build.

That is exactly the difference between fs.readFile and a stream pipeline. readFile is the warehouse approach: load everything into memory, then process. A stream pipeline is the assembly line: a small chunk arrives at the first stage, passes through transformations, and exits as output -- continuously, with constant memory.

In Node.js terms, each "station" on the assembly line is a stream. A Readable stream produces chunks. A Writable stream consumes chunks. A Transform stream is both: it consumes chunks, modifies them, and produces new chunks. pipeline() is the conveyor belt that connects the stations and shuts the whole line down cleanly if any one station breaks.

+---------------------------------------------------------------+
|           THE STREAM ASSEMBLY LINE                            |
+---------------------------------------------------------------+
|                                                                |
|   [ Readable ] --> [ Transform ] --> [ Transform ] --> [ Writable ]
|     (source)         (gzip)            (encrypt)        (sink) |
|                                                                |
|   file.csv  ->  compress  ->  encrypt  ->  upload to S3        |
|                                                                |
|   At any moment:                                               |
|     - 64 KB in source buffer                                   |
|     - 64 KB inside gzip                                        |
|     - 64 KB inside encrypt                                     |
|     - 64 KB waiting on the network                             |
|                                                                |
|   Total memory: ~256 KB to process a 50 GB file.               |
|                                                                |
+---------------------------------------------------------------+

Pattern 1 -- File Copy with pipeline()

The simplest stream pattern in Node.js is copying a file. You open a Readable from the source, open a Writable to the destination, and connect them. The classic way uses .pipe(). The modern, correct way uses pipeline() from node:stream/promises.

// file-copy.js
// Copy a file of any size using a stream pipeline.
// Memory stays flat regardless of file size.
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

async function copyFile(source, destination) {
  // createReadStream returns a Readable stream that emits chunks
  const reader = createReadStream(source);

  // createWriteStream returns a Writable stream that accepts chunks
  const writer = createWriteStream(destination);

  // pipeline() wires them together AND handles:
  //   - error propagation (any stream error rejects the promise)
  //   - cleanup (closes both streams even on partial failure)
  //   - backpressure (pauses reader when writer is full)
  await pipeline(reader, writer);

  console.log(`Copied ${source} -> ${destination}`);
}

// Run it
copyFile('./huge-video.mp4', './backup/huge-video.mp4')
  .catch((err) => {
    // One catch handles errors from EITHER stream
    console.error('Copy failed:', err.message);
    process.exit(1);
  });

What is happening under the hood:

  1. createReadStream opens the source file with a 64 KB internal buffer (the default highWaterMark).
  2. pipeline calls reader.pipe(writer) internally and attaches 'error' listeners to both streams.
  3. The reader reads 64 KB, pushes it to the writer, and pauses if the writer's buffer fills up (backpressure).
  4. When the source reaches EOF, the reader emits 'end', the writer flushes, and the promise resolves.
  5. If anything fails -- disk full, source deleted, permission denied -- both file descriptors are closed and the promise rejects.

The same code copies a 1 KB text file or a 50 GB video. Memory usage is identical.


Pattern 2 -- Processing a Huge CSV Line-by-Line with readline

CSV and NDJSON files are common at scale: log exports, analytics dumps, database backups. Loading them into memory is impossible past a certain size. The readline module wraps any Readable stream and emits one event per line.

// process-large-csv.js
// Read a multi-gigabyte CSV without loading it into memory.
// readline buffers internally and splits on \n boundaries.
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';

async function processCsv(path) {
  const fileStream = createReadStream(path, { encoding: 'utf8' });

  // createInterface wraps the readable and emits 'line' for each line.
  // crlfDelay: Infinity treats \r\n as a single line break (Windows files).
  const rl = createInterface({
    input: fileStream,
    crlfDelay: Infinity,
  });

  let lineNumber = 0;
  let header = null;
  let totalRevenue = 0;

  // 'for await...of' is the cleanest way to consume readline.
  // It pauses the underlying stream automatically between iterations.
  for await (const line of rl) {
    lineNumber++;

    // First line is the header row
    if (lineNumber === 1) {
      header = line.split(',');
      continue;
    }

    // Skip blank lines
    if (line.trim() === '') continue;

    // Parse the row -- naive split works for clean CSVs only
    const fields = line.split(',');
    const row = Object.fromEntries(
      header.map((key, idx) => [key, fields[idx]]),
    );

    // Aggregate without keeping any rows in memory
    totalRevenue += Number(row.amount) || 0;

    // Log progress every 1M rows
    if (lineNumber % 1_000_000 === 0) {
      console.log(`Processed ${lineNumber.toLocaleString()} rows...`);
    }
  }

  console.log(`Done. ${lineNumber} lines, total revenue: $${totalRevenue}`);
}

processCsv('./sales-2026.csv').catch(console.error);

Why for await...of works here: the readline.Interface is an async iterable. Each iteration of the loop pulls the next line. Node.js automatically pauses the underlying file stream while your loop body runs, so a slow database write inside the loop will not cause memory to balloon -- backpressure is built in.

For NDJSON (newline-delimited JSON), the pattern is identical but you call JSON.parse(line) on each line instead of splitting on commas. This is how tools like jq and log shippers handle infinite-length log streams.


Pattern 3 -- Building Your Own Transform Stream

A Transform stream sits in the middle of a pipeline. It receives chunks from the upstream Readable, modifies them, and emits new chunks to the downstream Writable. You build one by extending stream.Transform and implementing _transform.

// uppercase-transform.js
// A Transform stream that uppercases every chunk passing through.
import { Transform } from 'node:stream';
import { createReadStream, createWriteStream } from 'node:fs';
import { pipeline } from 'node:stream/promises';

class UppercaseTransform extends Transform {
  // _transform is called once per chunk that arrives from upstream.
  //   chunk    -- a Buffer (or string if you set encoding)
  //   encoding -- the chunk encoding (usually 'buffer')
  //   callback -- call when done; pass an error to abort, or
  //               pass (null, transformedChunk) to push output
  _transform(chunk, encoding, callback) {
    // Convert Buffer -> string -> uppercase -> Buffer
    const upper = chunk.toString('utf8').toUpperCase();

    // Push the transformed chunk downstream and signal completion
    callback(null, Buffer.from(upper, 'utf8'));
  }

  // Optional: _flush is called once when the upstream ends.
  // Use it to emit any final data you have buffered internally.
  _flush(callback) {
    // No leftover state in this example
    callback();
  }
}

// Use it in a pipeline: read file -> uppercase -> write file
await pipeline(
  createReadStream('./input.txt'),
  new UppercaseTransform(),
  createWriteStream('./output-upper.txt'),
);

console.log('Transform complete.');

Important caveat about chunk boundaries: chunks arrive in 64 KB blocks by default. For a uppercase transform on ASCII text this is fine. But if you're transforming UTF-8 text with multi-byte characters, a chunk boundary may split a character in half. The safe pattern is to pipe through new TextDecoder() (via a small buffering Transform) or use setEncoding('utf8') on the source -- never call .toString('utf8') on a raw chunk that may end mid-character.

The same pattern works for any per-chunk operation: line counting, redacting credit card numbers, parsing protocol frames, encryption with crypto.createCipheriv() (which is itself a Transform stream you don't have to write).


Pattern 4 -- Gzip Compression in a Pipeline

zlib ships with Node.js and exposes ready-made Transform streams: createGzip, createGunzip, createDeflate, createBrotliCompress. You drop them into a pipeline like any other Transform.

// compress-and-encrypt.js
// Read a file, gzip it, encrypt it, write it -- all streaming.
// Memory stays flat even for terabyte-sized inputs.
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
import { createCipheriv, randomBytes } from 'node:crypto';
import { pipeline } from 'node:stream/promises';

async function archiveFile(source, destination, key) {
  // Create the gzip Transform with maximum compression level
  const gzip = createGzip({ level: 9 });

  // Generate a fresh IV for AES-256-GCM
  const iv = randomBytes(16);

  // createCipheriv returns a Transform stream -- it accepts plaintext
  // chunks and emits ciphertext chunks
  const cipher = createCipheriv('aes-256-gcm', key, iv);

  // Save the IV alongside the file so we can decrypt later
  await writeIvSidecar(`${destination}.iv`, iv);

  // Build the pipeline: file -> gzip -> encrypt -> file
  // pipeline() chains them and handles errors at any stage.
  await pipeline(
    createReadStream(source),
    gzip,
    cipher,
    createWriteStream(destination),
  );

  console.log(`Archived ${source} -> ${destination}`);
}

async function writeIvSidecar(path, iv) {
  const { writeFile } = await import('node:fs/promises');
  await writeFile(path, iv);
}

// Run it
const key = randomBytes(32); // In production, load from a key vault
archiveFile('./database-dump.sql', './backups/dump.sql.gz.enc', key)
  .catch((err) => console.error('Archive failed:', err));

A 10 GB database dump goes through this pipeline using only a few hundred kilobytes of memory total. The CPU compresses one chunk while the disk reads the next while the encryption transforms the previous one -- everything overlaps.


Pattern 5 -- Streaming an HTTP Response

The Node.js HTTP response object is a Writable stream. That means anything you can pipeline() into a file, you can pipeline() into an HTTP response. This is how you serve large files, video, or generated reports without buffering them in RAM.

// stream-server.js
// HTTP server that streams a gzipped file to the client.
// res IS a Writable -- no buffering, no memory spike.
import { createServer } from 'node:http';
import { createReadStream, statSync } from 'node:fs';
import { createGzip } from 'node:zlib';
import { pipeline } from 'node:stream/promises';

const server = createServer(async (req, res) => {
  if (req.url !== '/download') {
    res.writeHead(404);
    res.end('Not found');
    return;
  }

  const filePath = './huge-report.json';

  try {
    // Get file size for the Content-Length header (optional but nice)
    const stats = statSync(filePath);

    // Tell the client we're sending gzipped JSON
    res.writeHead(200, {
      'Content-Type': 'application/json',
      'Content-Encoding': 'gzip',
      // Don't set Content-Length when using gzip -- the compressed
      // size is unknown until streaming completes
      'Cache-Control': 'no-store',
    });

    // Pipeline: file on disk -> gzip transform -> HTTP response
    // If the client disconnects mid-download, pipeline() catches the
    // ECONNRESET and cleans up the file descriptor automatically.
    await pipeline(
      createReadStream(filePath),
      createGzip(),
      res, // res is a Writable stream
    );
  } catch (err) {
    // Headers may already be sent -- only respond if we can
    if (!res.headersSent) {
      res.writeHead(500);
      res.end('Server error');
    }
    console.error('Stream failed:', err.message);
  }
});

server.listen(3000, () => {
  console.log('Streaming server on http://localhost:3000');
});

The first byte of the response leaves the server before the file has even been fully read from disk. The client starts downloading immediately. The server's RSS memory barely moves regardless of whether the file is 1 MB or 1 TB.

This is the same pattern Express, Fastify, and Koa all use under the hood when you call res.sendFile(). Knowing how to write it from scratch lets you build custom streaming endpoints: server-sent events, video range requests, on-the-fly PDF generation.


ASCII Diagram -- Anatomy of a Transform Pipeline

+---------------------------------------------------------------+
|           pipeline(source, t1, t2, sink)                      |
+---------------------------------------------------------------+
|                                                                |
|   +----------+   chunks   +---------+   chunks   +---------+   |
|   | Readable | ---------> |Transform| ---------> |Transform|   |
|   |  source  |  64 KB     |   t1    |  64 KB     |   t2    |   |
|   +----------+            +---------+            +---------+   |
|        |                       |                       |       |
|        |                       v                       v       |
|        |                  _transform()            _transform() |
|        |                  + _flush()              + _flush()   |
|        |                                                |      |
|        |                                                v      |
|        |                                         +-----------+ |
|        +---------------------------------------> | Writable  | |
|                              chunks (64 KB)      |   sink    | |
|                                                  +-----------+ |
|                                                                |
|   Backpressure flows BACKWARDS:                                |
|     sink full -> t2 pauses -> t1 pauses -> source pauses       |
|                                                                |
|   Errors flow to pipeline()'s callback/promise.                |
|   Cleanup runs on every stream when ANY stream fails.          |
|                                                                |
+---------------------------------------------------------------+

Common Mistakes

1. Using .pipe() instead of pipeline(). The classic chain a.pipe(b).pipe(c) looks elegant but it is a footgun. If b errors, a keeps reading and c is never closed -- you leak file descriptors and the process can hang. .pipe() only forwards data; it does not forward errors. You would have to attach .on('error', ...) to every stream in the chain manually. pipeline() does that for you and additionally calls .destroy() on every stream when any one fails.

2. Forgetting backpressure when calling write() manually. If you write to a stream in a loop without checking the return value of write(), you can fill the internal buffer to gigabytes before the consumer drains it. Always check the boolean return of .write() -- if it returns false, wait for the 'drain' event before writing more. Or just use pipeline() and let Node handle it.

3. Calling chunk.toString('utf8') on a multi-byte text stream. A 64 KB chunk boundary can land in the middle of a UTF-8 character, producing a replacement character (U+FFFD). For text streams, set setEncoding('utf8') on the source -- Node buffers partial code points across chunk boundaries for you.

4. Reaching for streams when fs.readFile would be fine. Streams have overhead: object allocation, event loop ticks, callback machinery. For files under ~10 MB on a server with plenty of RAM, await fs.readFile() is faster, simpler, and uses less CPU. Streams are mandatory when (a) the file size is unbounded, (b) you want to start sending data before reading completes, or (c) you're chaining transforms. For everything else, just read the file.

5. Not handling the 'error' event on streams used outside pipeline(). Even if you only use a stream briefly, an unhandled 'error' event crashes the entire Node.js process. If for some reason you cannot use pipeline(), you must attach an .on('error', ...) listener to every stream you touch.


Interview Questions

1. "Why is pipeline() strictly better than .pipe() for connecting streams?"

.pipe() only forwards data chunks; it does not forward errors. If you write a.pipe(b).pipe(c) and b emits an error, a keeps reading from disk, c never gets .end() called on it, and you leak file descriptors and memory. To make .pipe() safe you would have to attach .on('error', cleanupFn) to every stream in the chain and write the cleanup logic by hand. pipeline() from node:stream/promises does all of that automatically: it propagates errors from any stream to a single promise rejection, calls .destroy() on every stream when any one fails, and ensures resources are released. There is essentially no reason to use raw .pipe() in new code.

2. "How does the readline module let you process a 100 GB CSV without running out of memory?"

readline.createInterface({ input: fileStream }) wraps a Readable stream and emits a 'line' event for each newline-terminated line. Internally it buffers only the current partial line plus whatever the underlying stream's highWaterMark (default 64 KB) holds. When you consume it with for await (const line of rl), Node automatically pauses the file stream between iterations -- so if your loop body is slow (a database insert, an API call), the file is not eagerly read ahead. Memory usage stays flat at a few hundred kilobytes regardless of whether the file is 10 MB or 100 GB. The same pattern handles NDJSON: just call JSON.parse(line) inside the loop.

3. "Walk me through building a custom Transform stream that uppercases text."

You import Transform from node:stream and either extend the class or pass options to its constructor. The key method is _transform(chunk, encoding, callback), which is invoked once per incoming chunk. You convert the chunk (a Buffer by default) to a string, transform it, convert back to a Buffer, and call callback(null, transformedBuffer) to push it downstream. Optionally you implement _flush(callback) for any final data you have buffered internally -- useful for things like CSV parsers that may hold a partial last row. Then you drop your Transform into a pipeline() between a Readable and a Writable. The Transform inherits backpressure for free: if the downstream Writable is full, your _transform is not called again until it drains.

4. "When should you use fs.readFile instead of a stream?"

Use fs.readFile when the file is small enough to fit comfortably in memory (typically under 10 MB on a server, smaller on memory-constrained environments), when you need the entire contents available before you can start processing (e.g., parsing JSON config, reading a template), and when simplicity matters more than memory efficiency. Streams add overhead -- event loop ticks, object allocation, backpressure machinery -- that makes them slower than readFile for small files. Use streams when the file size is unbounded or unknown, when you want to start producing output before the input is fully read (e.g., HTTP responses), when you're chaining transforms like compression or encryption, or when the file is larger than available RAM. Rule of thumb: if a hostile user could upload it, stream it.

5. "The HTTP response object is a Writable stream. What does that let you do?"

Because res implements the Writable interface, you can pipe any Readable directly into it -- a file from disk, a database cursor, the output of a child process, a gzipped Transform. This means you can stream a 10 GB file to a client without ever loading it into memory: pipeline(createReadStream(path), createGzip(), res). The first byte of the response leaves the server as soon as the first chunk is read from disk; the client starts downloading immediately; total server RSS stays at a few hundred KB. If the client disconnects mid-download, pipeline() catches the ECONNRESET and closes the file descriptor. This is the foundation of how Express's res.sendFile(), video range requests, and server-sent events all work under the hood.


Quick Reference -- Stream Patterns Cheat Sheet

+---------------------------------------------------------------+
|           STREAM PATTERNS CHEAT SHEET                         |
+---------------------------------------------------------------+
|                                                                |
|  FILE COPY:                                                    |
|  import { pipeline } from 'node:stream/promises'               |
|  await pipeline(                                               |
|    createReadStream(src),                                      |
|    createWriteStream(dst),                                     |
|  )                                                             |
|                                                                |
|  LINE-BY-LINE CSV:                                             |
|  import { createInterface } from 'node:readline'               |
|  const rl = createInterface({ input: fileStream })             |
|  for await (const line of rl) { ... }                          |
|                                                                |
|  CUSTOM TRANSFORM:                                             |
|  class MyT extends Transform {                                 |
|    _transform(chunk, enc, cb) {                                |
|      cb(null, transform(chunk))                                |
|    }                                                           |
|  }                                                             |
|                                                                |
|  GZIP PIPELINE:                                                |
|  await pipeline(                                               |
|    createReadStream(src),                                      |
|    createGzip(),                                               |
|    createWriteStream(dst + '.gz'),                             |
|  )                                                             |
|                                                                |
|  HTTP STREAMING:                                               |
|  await pipeline(                                               |
|    createReadStream(file),                                     |
|    createGzip(),                                               |
|    res, // res is a Writable                                   |
|  )                                                             |
|                                                                |
+---------------------------------------------------------------+

+---------------------------------------------------------------+
|           KEY RULES                                            |
+---------------------------------------------------------------+
|                                                                |
|  1. Prefer pipeline() over .pipe() -- always                   |
|  2. Use for await...of with readline for line processing       |
|  3. Implement _transform(chunk, enc, cb) for custom Transforms |
|  4. zlib.createGzip() drops into any pipeline                  |
|  5. res IS a Writable -- pipe directly into it                 |
|  6. Streams for unbounded data, readFile for small known files |
|  7. Always handle 'error' on any stream not in pipeline()      |
|                                                                |
+---------------------------------------------------------------+
ApproachMemoryUse When
fs.readFileO(file size)File < 10 MB, need full contents at once
createReadStream + pipeO(highWaterMark)Legacy code, you handle errors manually
pipeline()O(highWaterMark)Default choice for any stream chain
readline + for awaitO(line length)Line-oriented files (CSV, NDJSON, logs)
Custom TransformO(highWaterMark)Per-chunk modification (encrypt, parse, redact)
zlib.createGzip()O(window size)Compress on the fly in a pipeline

Prev: Lesson 4.2 -- Streams Introduction Next: Lesson 4.4 -- Streams in Real Applications


This is Lesson 4.3 of the Node.js Interview Prep Course -- 10 chapters, 42 lessons.

On this page