Node.js Streams: Processing Large Data Efficiently

Node.js Streams: Processing Large Data Efficiently

Streams are one of Node.js’s most powerful features for handling large amounts of data efficiently. Instead of loading entire files into memory, streams process data in chunks.

Understanding Stream Types

Node.js has four types of streams:

  • Readable: Read data from a source
  • Writable: Write data to a destination
  • Duplex: Both readable and writable
  • Transform: Modify data as it’s being read or written

Basic Readable Stream

Read a file using streams:

const fs = require("fs");

// Traditional approach (not recommended for large files)
fs.readFile("large-file.txt", "utf8", (err, data) => {
  if (err) throw err;
  console.log(data); // Entire file in memory!
});

// Stream approach (memory efficient)
const readStream = fs.createReadStream("large-file.txt", {
  encoding: "utf8",
  highWaterMark: 16 * 1024, // 16KB chunks
});

readStream.on("data", (chunk) => {
  console.log("Received chunk:", chunk.length, "bytes");
});

readStream.on("end", () => {
  console.log("Finished reading file");
});

readStream.on("error", (err) => {
  console.error("Error:", err);
});

Writable Streams

Write data efficiently:

const fs = require("fs");

const writeStream = fs.createWriteStream("output.txt");

writeStream.write("First line\n");
writeStream.write("Second line\n");
writeStream.end("Final line\n");

writeStream.on("finish", () => {
  console.log("All data written");
});

writeStream.on("error", (err) => {
  console.error("Write error:", err);
});

Piping Streams

Connect streams together for powerful data processing:

const fs = require("fs");
const zlib = require("zlib");

// Copy and compress a file
fs.createReadStream("input.txt")
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream("input.txt.gz"))
  .on("finish", () => {
    console.log("File compressed successfully");
  });

// Decompress a file
fs.createReadStream("input.txt.gz")
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream("output.txt"));

Transform Streams

Create custom transformations:

const { Transform } = require("stream");

// Transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const upperChunk = chunk.toString().toUpperCase();
    this.push(upperChunk);
    callback();
  }
}

// Usage
const fs = require("fs");

fs.createReadStream("input.txt")
  .pipe(new UppercaseTransform())
  .pipe(fs.createWriteStream("output-uppercase.txt"));

Practical Example: CSV Processing

Process large CSV files efficiently:

const fs = require("fs");
const { Transform } = require("stream");
const readline = require("readline");

class CSVParser extends Transform {
  constructor(options) {
    super(options);
    this.headers = null;
    this.lineNumber = 0;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split("\n");

    lines.forEach((line) => {
      this.lineNumber++;

      if (this.lineNumber === 1) {
        this.headers = line.split(",");
        return;
      }

      if (line.trim()) {
        const values = line.split(",");
        const obj = {};

        this.headers.forEach((header, index) => {
          obj[header.trim()] = values[index]?.trim() || "";
        });

        this.push(JSON.stringify(obj) + "\n");
      }
    });

    callback();
  }
}

// Process a large CSV file
fs.createReadStream("large-data.csv")
  .pipe(new CSVParser())
  .pipe(fs.createWriteStream("output.json"))
  .on("finish", () => {
    console.log("CSV processing complete");
  });

HTTP Streaming

Stream data in web applications:

const http = require("http");
const fs = require("fs");

const server = http.createServer((req, res) => {
  if (req.url === "/video") {
    const stat = fs.statSync("video.mp4");

    res.writeHead(200, {
      "Content-Type": "video/mp4",
      "Content-Length": stat.size,
    });

    // Stream video file to client
    const videoStream = fs.createReadStream("video.mp4");
    videoStream.pipe(res);
  } else if (req.url === "/download") {
    res.writeHead(200, {
      "Content-Type": "application/octet-stream",
      "Content-Disposition": 'attachment; filename="data.zip"',
    });

    fs.createReadStream("data.zip").pipe(res);
  }
});

server.listen(3000, () => {
  console.log("Server running on port 3000");
});

Backpressure Handling

Manage data flow to prevent memory issues:

const fs = require("fs");

const readStream = fs.createReadStream("large-file.txt");
const writeStream = fs.createWriteStream("output.txt");

readStream.on("data", (chunk) => {
  const canContinue = writeStream.write(chunk);

  if (!canContinue) {
    console.log("Backpressure detected, pausing read");
    readStream.pause();
  }
});

writeStream.on("drain", () => {
  console.log("Drain event, resuming read");
  readStream.resume();
});

// Better: Use pipe which handles backpressure automatically
readStream.pipe(writeStream);

Error Handling in Pipelines

Properly handle errors in stream pipelines:

const fs = require("fs");
const { pipeline } = require("stream");
const zlib = require("zlib");

pipeline(
  fs.createReadStream("input.txt"),
  zlib.createGzip(),
  fs.createWriteStream("input.txt.gz"),
  (err) => {
    if (err) {
      console.error("Pipeline failed:", err);
    } else {
      console.log("Pipeline succeeded");
    }
  },
);

Async Iteration with Streams

Use modern async/await syntax with streams:

const fs = require("fs");
const readline = require("readline");

async function processLineByLine() {
  const fileStream = fs.createReadStream("input.txt");

  const rl = readline.createInterface({
    input: fileStream,
    crlfDelay: Infinity,
  });

  let lineCount = 0;

  for await (const line of rl) {
    lineCount++;
    console.log(`Line ${lineCount}: ${line}`);
  }

  console.log(`Total lines: ${lineCount}`);
}

processLineByLine();

Conclusion

Streams are essential for building scalable Node.js applications. They enable efficient memory usage, better performance, and elegant data processing pipelines. Master streams to handle large-scale data processing with confidence.