MongoDB Aggregation Pipeline: Advanced Data Processing

MongoDB Aggregation Pipeline: Advanced Data Processing
MongoDB’s aggregation pipeline is a powerful framework for processing and transforming documents. Learn how to perform complex data analysis and transformations efficiently.
Basic Aggregation Structure
Aggregation pipelines consist of stages that process documents:
const mongoose = require('mongoose');
// Sample data structure
const Order = mongoose.model('Order', {
customer: String,
items: [{
product: String,
quantity: Number,
price: Number
}],
status: String,
orderDate: Date,
totalAmount: Number
});
// Basic aggregation
const results = await Order.aggregate([
{ $match: { status: 'completed' } },
{ $group: {
_id: '$customer',
totalOrders: { $sum: 1 },
totalSpent: { $sum: '$totalAmount' }
}
},
{ $sort: { totalSpent: -1 } },
{ $limit: 10 }
]);
console.log(results);
// [
// { _id: 'Alice', totalOrders: 15, totalSpent: 1250.50 },
// { _id: 'Bob', totalOrders: 8, totalSpent: 890.25 },
// ...
// ]Common Aggregation Stages
$match - Filter Documents
// Filter orders from the last 30 days
const thirtyDaysAgo = new Date();
thirtyDaysAgo.setDate(thirtyDaysAgo.getDate() - 30);
const recentOrders = await Order.aggregate([
{
$match: {
orderDate: { $gte: thirtyDaysAgo },
status: { $in: ['completed', 'shipped'] }
}
}
]);$group - Group and Aggregate
// Calculate statistics by product category
const Product = mongoose.model('Product', {
name: String,
category: String,
price: Number,
sales: Number
});
const categoryStats = await Product.aggregate([
{
$group: {
_id: '$category',
totalProducts: { $sum: 1 },
avgPrice: { $avg: '$price' },
minPrice: { $min: '$price' },
maxPrice: { $max: '$price' },
totalSales: { $sum: '$sales' }
}
},
{
$sort: { totalSales: -1 }
}
]);$project - Reshape Documents
// Transform and select specific fields
const transformedOrders = await Order.aggregate([
{
$project: {
customerName: '$customer',
year: { $year: '$orderDate' },
month: { $month: '$orderDate' },
itemCount: { $size: '$items' },
total: '$totalAmount',
// Calculate discount percentage
discountPercent: {
$multiply: [
{ $divide: ['$discount', '$totalAmount'] },
100
]
}
}
}
]);$lookup - Join Collections
// Join orders with customer details
const Customer = mongoose.model('Customer', {
name: String,
email: String,
address: String
});
const ordersWithCustomers = await Order.aggregate([
{
$lookup: {
from: 'customers',
localField: 'customerId',
foreignField: '_id',
as: 'customerDetails'
}
},
{
$unwind: '$customerDetails'
},
{
$project: {
orderId: '$_id',
customerName: '$customerDetails.name',
customerEmail: '$customerDetails.email',
totalAmount: 1,
orderDate: 1
}
}
]);$unwind - Deconstruct Arrays
// Analyze individual items across all orders
const itemAnalysis = await Order.aggregate([
// Deconstruct items array
{ $unwind: '$items' },
// Group by product
{
$group: {
_id: '$items.product',
totalQuantity: { $sum: '$items.quantity' },
totalRevenue: {
$sum: {
$multiply: ['$items.quantity', '$items.price']
}
},
avgPrice: { $avg: '$items.price' },
orderCount: { $sum: 1 }
}
},
// Sort by revenue
{ $sort: { totalRevenue: -1 } }
]);Advanced Aggregation Patterns
Time-Based Analytics
// Daily sales report for the last 7 days
const dailySales = await Order.aggregate([
{
$match: {
orderDate: {
$gte: new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)
}
}
},
{
$group: {
_id: {
year: { $year: '$orderDate' },
month: { $month: '$orderDate' },
day: { $dayOfMonth: '$orderDate' }
},
totalOrders: { $sum: 1 },
totalRevenue: { $sum: '$totalAmount' },
avgOrderValue: { $avg: '$totalAmount' }
}
},
{
$sort: { '_id.year': 1, '_id.month': 1, '_id.day': 1 }
},
{
$project: {
_id: 0,
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day'
}
},
totalOrders: 1,
totalRevenue: { $round: ['$totalRevenue', 2] },
avgOrderValue: { $round: ['$avgOrderValue', 2] }
}
}
]);Customer Segmentation
// Segment customers by spending behavior
const customerSegments = await Order.aggregate([
{
$group: {
_id: '$customerId',
totalSpent: { $sum: '$totalAmount' },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: '$totalAmount' },
lastOrderDate: { $max: '$orderDate' }
}
},
{
$addFields: {
segment: {
$switch: {
branches: [
{
case: { $gte: ['$totalSpent', 1000] },
then: 'VIP'
},
{
case: { $gte: ['$totalSpent', 500] },
then: 'Gold'
},
{
case: { $gte: ['$totalSpent', 100] },
then: 'Silver'
}
],
default: 'Bronze'
}
},
daysSinceLastOrder: {
$divide: [
{ $subtract: [new Date(), '$lastOrderDate'] },
1000 * 60 * 60 * 24
]
}
}
},
{
$group: {
_id: '$segment',
customerCount: { $sum: 1 },
avgSpent: { $avg: '$totalSpent' },
avgOrders: { $avg: '$orderCount' }
}
},
{
$sort: { avgSpent: -1 }
}
]);Product Recommendations
// Find products frequently bought together
const productCombinations = await Order.aggregate([
{ $unwind: '$items' },
{
$group: {
_id: '$_id',
products: { $push: '$items.product' }
}
},
{ $unwind: '$products' },
{
$group: {
_id: '$products',
boughtWith: {
$push: {
$filter: {
input: '$products',
as: 'product',
cond: { $ne: ['$$product', '$_id'] }
}
}
}
}
},
{ $unwind: '$boughtWith' },
{ $unwind: '$boughtWith' },
{
$group: {
_id: {
product: '$_id',
boughtWith: '$boughtWith'
},
count: { $sum: 1 }
}
},
{ $sort: { count: -1 } },
{
$group: {
_id: '$_id.product',
recommendations: {
$push: {
product: '$_id.boughtWith',
frequency: '$count'
}
}
}
},
{
$project: {
_id: 0,
product: '$_id',
recommendations: { $slice: ['$recommendations', 5] }
}
}
]);Conditional Operations
// Calculate revenue with conditional logic
const revenueAnalysis = await Order.aggregate([
{
$project: {
customer: 1,
totalAmount: 1,
// Apply tiered discount
finalAmount: {
$cond: {
if: { $gte: ['$totalAmount', 1000] },
then: { $multiply: ['$totalAmount', 0.85] }, // 15% off
else: {
$cond: {
if: { $gte: ['$totalAmount', 500] },
then: { $multiply: ['$totalAmount', 0.90] }, // 10% off
else: { $multiply: ['$totalAmount', 0.95] } // 5% off
}
}
}
},
// Categorize order size
orderSize: {
$switch: {
branches: [
{ case: { $gte: ['$totalAmount', 1000] }, then: 'Large' },
{ case: { $gte: ['$totalAmount', 500] }, then: 'Medium' }
],
default: 'Small'
}
}
}
}
]);Text Search Aggregation
// Full-text search with aggregation
const searchResults = await Product.aggregate([
{
$match: {
$text: { $search: 'laptop gaming' }
}
},
{
$addFields: {
score: { $meta: 'textScore' }
}
},
{
$match: {
score: { $gt: 0.5 }
}
},
{
$sort: { score: -1 }
},
{
$limit: 10
}
]);Geospatial Aggregation
// Find nearby stores and aggregate orders
const Store = mongoose.model('Store', {
name: String,
location: {
type: { type: String, default: 'Point' },
coordinates: [Number]
}
});
const nearbyStoreStats = await Order.aggregate([
{
$lookup: {
from: 'stores',
localField: 'storeId',
foreignField: '_id',
as: 'store'
}
},
{ $unwind: '$store' },
{
$geoNear: {
near: {
type: 'Point',
coordinates: [-73.97, 40.77] // User location
},
distanceField: 'distance',
maxDistance: 5000, // 5km
spherical: true
}
},
{
$group: {
_id: '$store._id',
storeName: { $first: '$store.name' },
avgDistance: { $avg: '$distance' },
orderCount: { $sum: 1 },
totalRevenue: { $sum: '$totalAmount' }
}
}
]);Performance Optimization
// Use indexes effectively
await Order.createIndexes([
{ orderDate: 1 },
{ customerId: 1 },
{ status: 1 }
]);
// Place $match early in pipeline
const optimizedQuery = await Order.aggregate([
// Filter early to reduce documents processed
{ $match: { status: 'completed' } },
// Use $project to limit fields early
{
$project: {
customer: 1,
totalAmount: 1,
orderDate: 1
}
},
// Then perform expensive operations
{ $lookup: { /* ... */ } },
{ $group: { /* ... */ } }
]);
// Use allowDiskUse for large datasets
const largeAggregation = await Order.aggregate([
// Complex pipeline
], { allowDiskUse: true });Conclusion
MongoDB’s aggregation pipeline is essential for complex data analysis and transformations. Master these stages and patterns to efficiently process large datasets and extract valuable insights from your data. Always consider performance and use indexes appropriately.