Node.js Streams: Processing Large Data Efficiently

Node.js Streams: Processing Large Data Efficiently
Streams are one of Node.js’s most powerful features for handling large amounts of data efficiently. Instead of loading entire files into memory, streams process data in chunks.
Understanding Stream Types
Node.js has four types of streams:
- Readable: Read data from a source
- Writable: Write data to a destination
- Duplex: Both readable and writable
- Transform: Modify data as it’s being read or written
Basic Readable Stream
Read a file using streams:
const fs = require("fs");
// Traditional approach (not recommended for large files)
fs.readFile("large-file.txt", "utf8", (err, data) => {
if (err) throw err;
console.log(data); // Entire file in memory!
});
// Stream approach (memory efficient)
const readStream = fs.createReadStream("large-file.txt", {
encoding: "utf8",
highWaterMark: 16 * 1024, // 16KB chunks
});
readStream.on("data", (chunk) => {
console.log("Received chunk:", chunk.length, "bytes");
});
readStream.on("end", () => {
console.log("Finished reading file");
});
readStream.on("error", (err) => {
console.error("Error:", err);
});Writable Streams
Write data efficiently:
const fs = require("fs");
const writeStream = fs.createWriteStream("output.txt");
writeStream.write("First line\n");
writeStream.write("Second line\n");
writeStream.end("Final line\n");
writeStream.on("finish", () => {
console.log("All data written");
});
writeStream.on("error", (err) => {
console.error("Write error:", err);
});Piping Streams
Connect streams together for powerful data processing:
const fs = require("fs");
const zlib = require("zlib");
// Copy and compress a file
fs.createReadStream("input.txt")
.pipe(zlib.createGzip())
.pipe(fs.createWriteStream("input.txt.gz"))
.on("finish", () => {
console.log("File compressed successfully");
});
// Decompress a file
fs.createReadStream("input.txt.gz")
.pipe(zlib.createGunzip())
.pipe(fs.createWriteStream("output.txt"));Transform Streams
Create custom transformations:
const { Transform } = require("stream");
// Transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const upperChunk = chunk.toString().toUpperCase();
this.push(upperChunk);
callback();
}
}
// Usage
const fs = require("fs");
fs.createReadStream("input.txt")
.pipe(new UppercaseTransform())
.pipe(fs.createWriteStream("output-uppercase.txt"));Practical Example: CSV Processing
Process large CSV files efficiently:
const fs = require("fs");
const { Transform } = require("stream");
const readline = require("readline");
class CSVParser extends Transform {
constructor(options) {
super(options);
this.headers = null;
this.lineNumber = 0;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split("\n");
lines.forEach((line) => {
this.lineNumber++;
if (this.lineNumber === 1) {
this.headers = line.split(",");
return;
}
if (line.trim()) {
const values = line.split(",");
const obj = {};
this.headers.forEach((header, index) => {
obj[header.trim()] = values[index]?.trim() || "";
});
this.push(JSON.stringify(obj) + "\n");
}
});
callback();
}
}
// Process a large CSV file
fs.createReadStream("large-data.csv")
.pipe(new CSVParser())
.pipe(fs.createWriteStream("output.json"))
.on("finish", () => {
console.log("CSV processing complete");
});HTTP Streaming
Stream data in web applications:
const http = require("http");
const fs = require("fs");
const server = http.createServer((req, res) => {
if (req.url === "/video") {
const stat = fs.statSync("video.mp4");
res.writeHead(200, {
"Content-Type": "video/mp4",
"Content-Length": stat.size,
});
// Stream video file to client
const videoStream = fs.createReadStream("video.mp4");
videoStream.pipe(res);
} else if (req.url === "/download") {
res.writeHead(200, {
"Content-Type": "application/octet-stream",
"Content-Disposition": 'attachment; filename="data.zip"',
});
fs.createReadStream("data.zip").pipe(res);
}
});
server.listen(3000, () => {
console.log("Server running on port 3000");
});Backpressure Handling
Manage data flow to prevent memory issues:
const fs = require("fs");
const readStream = fs.createReadStream("large-file.txt");
const writeStream = fs.createWriteStream("output.txt");
readStream.on("data", (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
console.log("Backpressure detected, pausing read");
readStream.pause();
}
});
writeStream.on("drain", () => {
console.log("Drain event, resuming read");
readStream.resume();
});
// Better: Use pipe which handles backpressure automatically
readStream.pipe(writeStream);Error Handling in Pipelines
Properly handle errors in stream pipelines:
const fs = require("fs");
const { pipeline } = require("stream");
const zlib = require("zlib");
pipeline(
fs.createReadStream("input.txt"),
zlib.createGzip(),
fs.createWriteStream("input.txt.gz"),
(err) => {
if (err) {
console.error("Pipeline failed:", err);
} else {
console.log("Pipeline succeeded");
}
},
);Async Iteration with Streams
Use modern async/await syntax with streams:
const fs = require("fs");
const readline = require("readline");
async function processLineByLine() {
const fileStream = fs.createReadStream("input.txt");
const rl = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});
let lineCount = 0;
for await (const line of rl) {
lineCount++;
console.log(`Line ${lineCount}: ${line}`);
}
console.log(`Total lines: ${lineCount}`);
}
processLineByLine();Conclusion
Streams are essential for building scalable Node.js applications. They enable efficient memory usage, better performance, and elegant data processing pipelines. Master streams to handle large-scale data processing with confidence.