MapReduce With MongoDB Aggregation Framework
What is MongoDB
MongoDB is a document-oriented NoSQL database used for high-volume data storage. It stores data using collections and documents instead of the use of traditional relational databases approach rows and tables. Documents consist of key-value pairs which are similar to tables in SQL databases. It doesn't have the need to design pre-defined schema we can create schema on the that's why it is known as Schemeless databases. MongoDB uses an index feature that increases the performance of searching operations.
MongoDB has a lot of features that increase the performance of I/O operations. One of them is the Aggregation framework. We are going to discuss what is it and how to use it, In the last of this article, I'll create a python program for MapReduce using Aggregation Framework.
What is Aggregation Framework
MongoDB provides a framework that does group values from multiple documents and can perform a lot of operations of the grouped data.
MongoDB provides three ways to perform aggregation:
The aggregation pipeline is a way to use data aggregation, The aggregation pipeline is modeled on the concept of data processing pipelines. It has multiple stages and each stage do some operations and pass the results to the next stage.
db.orders.aggregate([ { $match: { status: “data” } }, { $group: { _id: “$cust_id”, total: { $sum: “$data” } } }])
Map-reduce operations can be rewritten using aggregation pipeline operators, such as $group
, $merge
, and others. For map-reduce operations that require custom functionality, MongoDB provides the $accumulator
and $function
aggregation operators.
Single-purpose aggregation methods aggregate documents from a single collection. While these operations provide simple access to common aggregation processes, they lack the flexibility and capabilities of an aggregation pipeline.
Installing MongoDB Locally
We have multiple ways to create MongoDB servers and clusters, But I’m going to install MongoDB locally in Windows. If we use MongoDB Atlas free tier it will through error while we’ll run MapReduce pipelines that it has the need to upgrade your free ties that’s why I chose locally.
Download the installer.
Download the MongoDB Community .msi
the installer from the following link:
Follows these steps to download:
- In the Version dropdown, select the version of MongoDB to download.
- In the Platform dropdown, select Windows.
- In the Package dropdown, select msi.
- Click Download.
Run the MongoDB installer.
For example, from the File Explorer:
- Go to the directory where you downloaded the MongoDB installer (
.msi
file). By default, this is yourDownloads
directory. - Double-click the
.msi
file.
Follow the MongoDB Community Edition installation wizard.
The wizard steps you through the installation of MongoDB and MongoDB Compass. MongoDB Compass is a GUI software.
- Choose Setup Type
You can choose either the Complete (recommended for most users) or Custom setup type. The Complete setup option installs MongoDB and the MongoDB tools to the default location. The Custom setup option allows you to specify which executables are installed and where.
- Service Configuration
Starting with MongoDB 4.0, you can set up MongoDB as a Windows service during the install or just install the binaries.
After successfully followed all these steps, you can run the “mongo” command in the windows terminal.
After installing MongoDB locally, we will be doing map-reduce operations on this local database. For this, we will insert data in the database or we can import the database.
I’m going to create a sample database.
MapReduce With MongoDB
MapReduce consists of two programs. The mapper program performs filtering and sorting of the data and the reducer program performs summarise operations like counting the number of words. It is used for reducing large volumes of raw data into meaningful aggregated results.
After installing MongoDB Locally, Follow these steps to create MapReduce with MongoDB.
1). Create a sample collection orders
with these documents:
db.orders.insertMany([
{ _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" },
{ _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" },
{ _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"},
{ _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
{ _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }])
After Running MapReduce, Return the Total Price Per Customer
I’m going to perform the map-reduce operation on the orders
collection to group by the cust_id
, and calculate the sum of the price
for each cust_id
:
1. Define the map function to process each input document:
- In the function,
this
refers to the document that the map-reduce operation is processing. - The function maps the
price
to thecust_id
for each document and emits thecust_id
andprice
.
var mapFunction1 = function() {
emit(this.cust_id, this.price);
};
2. Define the corresponding reduce function with two arguments keyCustId
and valuesPrices
:
- The
valuesPrices
is an array whose elements are theprice
values emitted by the map function and grouped bykeyCustId
. - The function reduces the
valuesPrice
array to the sum of its elements.
var reduceFunction1 = function(keyCustId, valuesPrices) {
return Array.sum(valuesPrices);
};
3. Perform map-reduce on all documents in the orders
collection using the mapFunction1
map function and the reduceFunction1
reduce function:
db.orders.mapReduce(
mapFunction1,
reduceFunction1,
{ out: "map_reduce_example" }
)
4. This operation outputs the results to a collection named map_reduce_example
. If the map_reduce_example
the collection already exists, the operation will replace the contents with the results of this map-reduce operation.
5. Query the map_reduce_example
collection to verify the results:
db.map_reduce_example.find().sort( { _id: 1 } )
6. The operation returns these documents:
{ "_id" : "Ant O. Knee", "value" : 95 }
{ "_id" : "Busby Bee", "value" : 125 }
{ "_id" : "Cam Elot", "value" : 60 }
{ "_id" : "Don Quis", "value" : 155 }
Python Code for MapReduce With MongoDB Driver Pymongo
I will be writing a MapReduce program in Python language. So, for using the MongoDB database, we will require a MongoDB driver.The pymongo is one of the MongoDB drivers for using the MongoDB database from the python program.
Let’s install pymongo driver.
$ pip install pymongo
The mapper and reducer functions that we will be writing are in the JavaScript language. So, for using JavaScript code inside python, we need to install one library called bson.
Now, we are all set to write a MapReduce program.
import pymongo
from bson.code import Codemyclient = pymongo.MongoClient("mongodb://localhost:27017/")
mydb = myclient["arth34"]
mycol = mydb["orders"]mylist = [{ "_id": 1, "cust_id": "Ant O. Knee", "price": 25, "items": [ { "sku": "oranges", "qty": 5, "price": 2.5 }, { "sku": "apples", "qty": 5, "price": 2.5 } ], "status": "A" },{ "_id": 2, "cust_id": "Ant O. Knee", "price": 70, "items": [ { "sku": "oranges", "qty": 8, "price": 2.5 }, { "sku": "chocolates", "qty": 5, "price": 10 } ], "status": "A" },{ "_id": 3, "cust_id": "Busby Bee", "price": 50, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 }, { "sku": "pears", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 4, "cust_id": "Busby Bee", "price": 25, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 5, "cust_id": "Busby Bee", "price": 50, "items": [ { "sku": "chocolates", "qty": 5, "price": 10 } ], "status": "A"},{ "_id": 6, "cust_id": "Cam Elot", "price": 35, "items": [ { "sku": "carrots", "qty": 10, "price": 1.0 }, { "sku": "apples", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 7, "cust_id": "Cam Elot", "price": 25, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 8, "cust_id": "Don Quis", "price": 75, "items": [ { "sku": "chocolates", "qty": 5, "price": 10 }, { "sku": "apples", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 9, "cust_id": "Don Quis", "price": 55, "items": [ { "sku": "carrots", "qty": 5, "price": 1.0 }, { "sku": "apples", "qty": 10, "price": 2.5 }, { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" },{ "_id": 10, "cust_id": "Don Quis", "price": 25, "items": [ { "sku": "oranges", "qty": 10, "price": 2.5 } ], "status": "A" }]x = mycol.insert_many(mylist)for x in mycol.find():
print(x)mapFunction1 = Code("function() { emit(this.cust_id, this.price);}")reduceFunction1 = Code("function(keyCustId, valuesPrices) {return Array.sum(valuesPrices)}")result = mycol.map_reduce(mapFunction1, reduceFunction1, "result")rescol = mydb["result"]for y in rescol.find():
print(y)
After running this code you’ll get this output.
If you have been seen the complete article, Hope you have understood what is MongoDB and, How a feature MapReduce of MongoDB Aggregation Framework helps in aggregate data from multiple document and process them.
Thanks For Reading, If this article helpful, Hit the Clap Button….!!!