Using the map reduce facility in MongoDB
Continuing with the MongoDB series, now I would like to write about the built
in mapReduce
function. In order to have an example to illustrate the concept
I’m going to download tweets into a MongoDB collection using pymongo
and
Twython
.
For this to work you first need to register an application in apps.twitter.com, this will generate the needed keys to interact with the Twitter API.
from twython import Twython
from pymongo import MongoClient
consumer_key = '<YOUR CONSUMER KEY>'
consumer_secret = '<YOUR CONSUMER SECRET>'
access_token_key = '<YOUR ACCESS TOKEN>'
access_token_secret = '<YOUR SECRET KEY>'
twitter = Twython(consumer_key, consumer_secret,
access_token_key, access_token_secret)
conn = MongoClient()
db = conn.twitter # We'll use a database called 'twitter'
# We look for an interesting topic
result = twitter.search(q='#bitcoin')
for status in result['statuses']:
db.statuses.insert_one(status)
This code will save 100 statuses, for illustrating the concept is enough but you can download a lot more if you want.
Now suppose we want to find what are the most used words in our tweets, first let’s do this using the aggregation framework as we saw in a previous post “Introduction to Mongo aggregations”.
db.statuses.aggregate([
{
'$project': {
'tokens': { '$split': [ { '$toLower': '$text' }, ' ' ] }
}
},
{ '$unwind': '$tokens' },
{ '$sortByCount': '$tokens' }
])
Lets explain what’s happening here, first $project
will create a new
document per each retrieved document (status) which will consist of only one
field tokens
that is a list of the words found in the $text
attribute of
each status. You can see that we also use $toLower
to convert the text to
lower case before splitting, thus effectively bitcoin
and BitCoin
will
become the same word and it will count just as one occurrence.
Next on the pipe we use $unwind
to create a distinct document for every
unique word in the $tokens
list.
At this point we have something like this:
[
{
"_id" : ObjectId("59c55bd53989da08c81abfac"),
"tokens" : "bitcoin"
},
{
"_id" : ObjectId("59c55bd53989da08c81abfac"),
"tokens" : "blockchain"
},
{
"_id" : ObjectId("59c55bd53989da08c81abfac"),
"tokens" : "crypto"
},
.
.
.
]
Finally the $sortByCount
will group the collection by the $tokens
field,
count the elements in each group, and sort the results by this count
field. We can do this operations one by one but $sortByCount
performs all
three in just one.
It works! The resulting collection contains the most common words found in the statuses:
[
{ "_id" : "the", "count" : 184 },
{ "_id" : "and", "count" : 116 },
{ "_id" : "in", "count" : 110 },
.
.
.
]
But words like articles and prepositions aren’t very descriptive right? we can
filter them out with $match
and $nin
:
var commonWords = [
'the', 'be', 'to', 'of', 'and', 'in', 'a', 'that', 'have',
'I', 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you',
'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we'
];
db.statuses.aggregate([
{
'$project': {
'qty': 1,
'tokens': { '$split': [ { '$toLower': '$text' }, ' ' ] }
}
},
{ '$unwind': '$tokens' },
{ '$sortByCount': '$tokens' },
{ '$match': { '_id': { '$nin': commonWords } } }
])
Now this query results in more meaningful words that appear often in the statuses. But now imagine what if we have to deal with millions or billions of statuses to process? The aggregation framework will not cut it, since it needs to process every operation one after the other and this will take a long time to get our results back.
For this kind of tasks were we need a more distributed process we use map
reduce. The concept is simple, we define a map
function whose purpose is to
apply a transformation to each element in a collection, and a reduce
function whose purpose is to take batches of whatever the map
function
produced and perform a computation to simplify or generalize that batch. Both
of these tasks can happen simultaneously and in practice this is how is done
using tools like hadoop or spark, these data processing frameworks perform
parallel data processing of massive amounts of data running a bunch of
instances of both mappers and reducers across a computing cluster.
Now that we know what map reduce is about, let’s apply it to our exercise. The following code can be inserted in the MongoDB CLI client, but I recommend using https://robomongo.org/ since it’s easier to edit code there, which ever you choose is fine.
The map
function will transform each document by splitting the $text
attribute into words, while the reduce
function will take batches of words
(as produced by the mappers) and just count unique words. Okay, let’s define
the mapper:
var mapper = function () {
var words = this.text.toLowerCase().split(' '); // Tokenize the status' $text field
var commonWords = [
'the', 'be', 'to', 'of', 'and', 'in', 'a', 'that', 'have',
'I', 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you',
'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we'
];
words.forEach(function (word) {
// Filter out articles and prepositions
if (word && !commonWords.includes(word)) {
emit(word, 1);
}
});
}
Inside the map
function this
is a reference to the current processed
document from the collection, so this.text
refers to the status' $text
field. We do the same thing as we did in the aggregation, we make it lower
case and then split the text into words
.
We iterate each word and check that it isn’t an empty string and that it
doesn’t match any of the articles and prepositions that we want to ignore,
then by using the emit
function we put this data out there so that it can be
processed by a reducer. Each bit of data is represented by a key and a
value, in this case word
is the key and the value is 1
, because we want
to count each word as just one occurrence.
At this point if we had emitted the word bitcoin
and the word blockchain
we have something like (bitcoin, 1), (blockchain, 1)
, but when we emit
again the word bitcoin
now we have something like (bitcoin, [1, 1]), (blockchain, [1])
.
Let’s see now how the reduce
function is defined:
var reducer = function (key, values) {
return Array.sum(values);
}
This function is a lot simpler, it always takes two parameters, the key
that
is being processed right now and the values
associated with that key
,
notice that at this stage each key has multiple values, and these values are
the data that we want to reduce, that is, transform many things in to a
few things. This is why in this function we just sum
the values without
caring much about the key, this in the end is adding up the multiple [1, 1, ...]
associated with each word, and thus giving us the count of words.
The key thing to remember here is that these processes are continually
happening over and over until all the data has been processed, so the reducer
at some point will process the same key multiple times; if (bitcoin, [1, 1]), (blockchain, [1])
became (bitcoin, [2]), (blockchain, [1])
and later
bitcoin
is a word that is emitted again by a mapper
it will become
(bitcoin, [2, 1]), (blockchain, [1])
and of course later another reducer
will take that and sum the values [2, 1]
to 3
and add it to the
accumulator for the next reduce iteration.
To use our mapper and reducer functions that we just have defined we call
the mapReduce()
function in our collection, here the first argument is the
reference to the map
function followed by a reference to the reduce
function and finally a JASON object to specify some options, in this case we
want to save the final output in a collection named trending_words
, if you
rather want to see the result without saving them, then use { 'out': { inline: 1 } }
but of course this is just practical when “debugging” our
functions definition.
db.statuses.mapReduce(mapper, reducer, { 'out': 'trending_words' })
Now we query the new collection trending_words
and sort the results:
db.trending_words.aggregate([
{ '$match': { 'value': { '$gt': 10 } } },
{ '$sort': { 'value': -1 } }
])
And we get a similar output as the aggregation method. Notice that the sort of the values can’t happen in the map reduce stage, data is just crunched concurrently in no particular order.
Conclusion
Map reduce is a data processing architecture ideal for handling big amounts of data with ease, MongoDB has built-in support for this operation that will work well even when your data is scattered across multiple nodes. Here we explained the basic principles to be able to utilize this technique, needless to say if you’re really into the big-data thing you’ll be better of with hadoop, spark or fl ink for the data processing stage and using Mongo or other databases just for storage of such processed information.