MapReduce With MongoDB Aggregation Framework

Umesh Tyagi
8 min readJun 4, 2021

--

What is MongoDB

MongoDB is a document-oriented NoSQL database used for high-volume data storage. It stores data using collections and documents instead of the use of traditional relational databases approach rows and tables. Documents consist of key-value pairs which are similar to tables in SQL databases. It doesn't have the need to design pre-defined schema we can create schema on the that's why it is known as Schemeless databases. MongoDB uses an index feature that increases the performance of searching operations.

MongoDB has a lot of features that increase the performance of I/O operations. One of them is the Aggregation framework. We are going to discuss what is it and how to use it, In the last of this article, I'll create a python program for MapReduce using Aggregation Framework.

What is Aggregation Framework

MongoDB provides a framework that does group values from multiple documents and can perform a lot of operations of the grouped data.

MongoDB provides three ways to perform aggregation:

The aggregation pipeline is a way to use data aggregation, The aggregation pipeline is modeled on the concept of data processing pipelines. It has multiple stages and each stage do some operations and pass the results to the next stage.

db.orders.aggregate([ { $match: { status: “data” } }, { $group: { _id: “$cust_id”, total: { $sum: “$data” } } }])

Map-reduce operations can be rewritten using aggregation pipeline operators, such as $group, $merge, and others. For map-reduce operations that require custom functionality, MongoDB provides the $accumulator and $function aggregation operators.

Single-purpose aggregation methods aggregate documents from a single collection. While these operations provide simple access to common aggregation processes, they lack the flexibility and capabilities of an aggregation pipeline.

Installing MongoDB Locally

We have multiple ways to create MongoDB servers and clusters, But I’m going to install MongoDB locally in Windows. If we use MongoDB Atlas free tier it will through error while we’ll run MapReduce pipelines that it has the need to upgrade your free ties that’s why I chose locally.

Download the installer.

Download the MongoDB Community .msi the installer from the following link:

MongoDB Download Center

Follows these steps to download:

  1. In the Version dropdown, select the version of MongoDB to download.
  2. In the Platform dropdown, select Windows.
  3. In the Package dropdown, select msi.
  4. Click Download.

Run the MongoDB installer.

For example, from the File Explorer:

  1. Go to the directory where you downloaded the MongoDB installer (.msi file). By default, this is your Downloads directory.
  2. Double-click the .msi file.

Follow the MongoDB Community Edition installation wizard.

The wizard steps you through the installation of MongoDB and MongoDB Compass. MongoDB Compass is a GUI software.

  1. Choose Setup Type

You can choose either the Complete (recommended for most users) or Custom setup type. The Complete setup option installs MongoDB and the MongoDB tools to the default location. The Custom setup option allows you to specify which executables are installed and where.

  1. Service Configuration

Starting with MongoDB 4.0, you can set up MongoDB as a Windows service during the install or just install the binaries.

After successfully followed all these steps, you can run the “mongo” command in the windows terminal.

After installing MongoDB locally, we will be doing map-reduce operations on this local database. For this, we will insert data in the database or we can import the database.

I’m going to create a sample database.

MapReduce With MongoDB

MapReduce consists of two programs. The mapper program performs filtering and sorting of the data and the reducer program performs summarise operations like counting the number of words. It is used for reducing large volumes of raw data into meaningful aggregated results.

After installing MongoDB Locally, Follow these steps to create MapReduce with MongoDB.

1). Create a sample collection orders with these documents:

db.orders.insertMany([
{ _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" },
{ _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" },
{ _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"},
{ _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }])

After Running MapReduce, Return the Total Price Per Customer

I’m going to perform the map-reduce operation on the orders collection to group by the cust_id, and calculate the sum of the price for each cust_id:

1. Define the map function to process each input document:

  • In the function, this refers to the document that the map-reduce operation is processing.
  • The function maps the price to the cust_id for each document and emits the cust_id and price.
var   mapFunction1 = function() {
emit(this.cust_id, this.price);
};

2. Define the corresponding reduce function with two arguments keyCustId and valuesPrices:

  • The valuesPrices is an array whose elements are the price values emitted by the map function and grouped by keyCustId.
  • The function reduces the valuesPrice array to the sum of its elements.
var   reduceFunction1 = function(keyCustId,   valuesPrices) {
return Array.sum(valuesPrices);
};

3. Perform map-reduce on all documents in the orders collection using the mapFunction1 map function and the reduceFunction1 reduce function:

db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)

4. This operation outputs the results to a collection named map_reduce_example. If the map_reduce_example the collection already exists, the operation will replace the contents with the results of this map-reduce operation.

5. Query the map_reduce_example collection to verify the results:

db.map_reduce_example.find().sort( { _id:   1 } )

6. The operation returns these documents:

{ "_id"   : "Ant O. Knee",   "value" : 95   }
{ "_id" : "Busby Bee", "value" : 125 }
{ "_id" : "Cam Elot", "value" : 60 }
{ "_id" : "Don Quis", "value" : 155 }

Python Code for MapReduce With MongoDB Driver Pymongo

I will be writing a MapReduce program in Python language. So, for using the MongoDB database, we will require a MongoDB driver.The pymongo is one of the MongoDB drivers for using the MongoDB database from the python program.

Let’s install pymongo driver.

$ pip install pymongo

The mapper and reducer functions that we will be writing are in the JavaScript language. So, for using JavaScript code inside python, we need to install one library called bson.

Now, we are all set to write a MapReduce program.

import pymongo
from bson.code import Code
myclient = pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["arth34"]
mycol = mydb["orders"]
mylist = [{ "_id": 1, "cust_id": "Ant O. Knee", "price": 25, "items": [ { "sku": "oranges", "qty": 5, "price": 2.5 }, { "sku": "apples", "qty": 5, "price": 2.5 } ], "status": "A" },{ "_id": 2, "cust_id": "Ant O. Knee", "price": 70, "items": [ { "sku": "oranges", "qty": 8, "price": 2.5 }, { "sku": "chocolates", "qty": 5, "price": 10 } ], "status": "A" },{ "_id": 3, "cust_id": "Busby Bee", "price": 50, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 }, { "sku": "pears", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 4, "cust_id": "Busby Bee", "price": 25, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 5, "cust_id": "Busby Bee", "price": 50, "items": [ { "sku": "chocolates", "qty": 5, "price": 10 } ], "status": "A"},{ "_id": 6, "cust_id": "Cam Elot", "price": 35, "items": [ { "sku": "carrots", "qty": 10, "price": 1.0 }, { "sku": "apples", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 7, "cust_id": "Cam Elot", "price": 25, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 8, "cust_id": "Don Quis", "price": 75, "items": [ { "sku": "chocolates", "qty": 5, "price": 10 }, { "sku": "apples", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 9, "cust_id": "Don Quis", "price": 55, "items": [ { "sku": "carrots", "qty": 5, "price": 1.0 }, { "sku": "apples", "qty": 10, "price": 2.5 }, { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 10, "cust_id": "Don Quis", "price": 25, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" }]x = mycol.insert_many(mylist)for x in mycol.find():
print(x)
mapFunction1 = Code("function() { emit(this.cust_id, this.price);}")reduceFunction1 = Code("function(keyCustId, valuesPrices) {return Array.sum(valuesPrices)}")result = mycol.map_reduce(mapFunction1, reduceFunction1, "result")rescol = mydb["result"]for y in rescol.find():
print(y)

After running this code you’ll get this output.

If you have been seen the complete article, Hope you have understood what is MongoDB and, How a feature MapReduce of MongoDB Aggregation Framework helps in aggregate data from multiple document and process them.

Thanks For Reading, If this article helpful, Hit the Clap Button….!!!

--

--

Umesh Tyagi
Umesh Tyagi

Written by Umesh Tyagi

I write about my experiences and share insights on DevOps technologies, tools, and trends. Join me as I explore and break down the latest in the tech world!

No responses yet