December 1, 2017

1475 words 7 mins read

Using the map reduce facility in MongoDB

Continuing with the MongoDB series, now I would like to write about the built in mapReduce function. In order to have an example to illustrate the concept I’m going to download tweets into a MongoDB collection using pymongo and Twython.

For this to work you first need to register an application in apps.twitter.com, this will generate the needed keys to interact with the Twitter API.

from twython import Twython
from pymongo import MongoClient

consumer_key = '<YOUR CONSUMER KEY>'
consumer_secret = '<YOUR CONSUMER SECRET>'
access_token_key = '<YOUR ACCESS TOKEN>'
access_token_secret = '<YOUR SECRET KEY>'

twitter = Twython(consumer_key, consumer_secret,
access_token_key, access_token_secret)

conn = MongoClient()
db = conn.twitter  # We'll use a database called 'twitter'

# We look for an interesting topic
result = twitter.search(q='#bitcoin')
for status in result['statuses']:
db.statuses.insert_one(status)

This code will save 100 statuses, for illustrating the concept is enough but you can download a lot more if you want.

Now suppose we want to find what are the most used words in our tweets, first let’s do this using the aggregation framework as we saw in a previous post “Introduction to Mongo aggregations”.

db.statuses.aggregate([
  {
    '$project': {
      'tokens': { '$split': [ { '$toLower': '$text' }, ' ' ] }
    }
  },
  { '$unwind': '$tokens' },
  { '$sortByCount': '$tokens' }
])

Lets explain what’s happening here, first $project will create a new document per each retrieved document (status) which will consist of only one field tokens that is a list of the words found in the $text attribute of each status. You can see that we also use $toLower to convert the text to lower case before splitting, thus effectively bitcoin and BitCoin will become the same word and it will count just as one occurrence.

Next on the pipe we use $unwind to create a distinct document for every unique word in the $tokens list.

At this point we have something like this:

[
  {
    "_id" : ObjectId("59c55bd53989da08c81abfac"),
    "tokens" : "bitcoin"
  },
  {
    "_id" : ObjectId("59c55bd53989da08c81abfac"),
    "tokens" : "blockchain"
  },
  {
    "_id" : ObjectId("59c55bd53989da08c81abfac"),
    "tokens" : "crypto"
  },
  .
  .
  .
]

Finally the $sortByCount will group the collection by the $tokens field, count the elements in each group, and sort the results by this count field. We can do this operations one by one but $sortByCount performs all three in just one.

It works! The resulting collection contains the most common words found in the statuses:

[
  { "_id" : "the", "count" : 184 },
  { "_id" : "and", "count" : 116 },
  { "_id" : "in", "count" : 110 },
  .
  .
  .
]

But words like articles and prepositions aren’t very descriptive right? we can filter them out with $match and $nin:

var commonWords = [
  'the', 'be', 'to', 'of', 'and', 'in', 'a', 'that', 'have',
  'I', 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you',
  'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we'
];

db.statuses.aggregate([
{
  '$project': {
    'qty': 1,
    'tokens': { '$split': [ { '$toLower': '$text' }, ' ' ] }
  }
},
{ '$unwind': '$tokens' },
{ '$sortByCount': '$tokens' },
{ '$match': { '_id': { '$nin': commonWords } } }
])

Now this query results in more meaningful words that appear often in the statuses. But now imagine what if we have to deal with millions or billions of statuses to process? The aggregation framework will not cut it, since it needs to process every operation one after the other and this will take a long time to get our results back.

For this kind of tasks were we need a more distributed process we use map reduce. The concept is simple, we define a map function whose purpose is to apply a transformation to each element in a collection, and a reduce function whose purpose is to take batches of whatever the map function produced and perform a computation to simplify or generalize that batch. Both of these tasks can happen simultaneously and in practice this is how is done using tools like hadoop or spark, these data processing frameworks perform parallel data processing of massive amounts of data running a bunch of instances of both mappers and reducers across a computing cluster.

Now that we know what map reduce is about, let’s apply it to our exercise. The following code can be inserted in the MongoDB CLI client, but I recommend using https://robomongo.org/ since it’s easier to edit code there, which ever you choose is fine.

The map function will transform each document by splitting the $text attribute into words, while the reduce function will take batches of words (as produced by the mappers) and just count unique words. Okay, let’s define the mapper:

var mapper = function () {
  var words = this.text.toLowerCase().split(' ');  // Tokenize the status' $text field

  var commonWords = [
    'the', 'be', 'to', 'of', 'and', 'in', 'a', 'that', 'have',
    'I', 'it', 'for', 'not', 'on', 'with', 'he', 'as', 'you',
    'do', 'at', 'this', 'but', 'his', 'by', 'from', 'they', 'we'
  ];

  words.forEach(function (word) {
    // Filter out articles and prepositions
    if (word && !commonWords.includes(word)) {
    emit(word, 1);
    }
  });
}

Inside the map function this is a reference to the current processed document from the collection, so this.text refers to the status' $text field. We do the same thing as we did in the aggregation, we make it lower case and then split the text into words.

We iterate each word and check that it isn’t an empty string and that it doesn’t match any of the articles and prepositions that we want to ignore, then by using the emit function we put this data out there so that it can be processed by a reducer. Each bit of data is represented by a key and a value, in this case word is the key and the value is 1, because we want to count each word as just one occurrence.

At this point if we had emitted the word bitcoin and the word blockchain we have something like (bitcoin, 1), (blockchain, 1), but when we emit again the word bitcoin now we have something like (bitcoin, [1, 1]), (blockchain, [1]).

Let’s see now how the reduce function is defined:

var reducer = function (key, values) {
  return Array.sum(values);
}

This function is a lot simpler, it always takes two parameters, the key that is being processed right now and the values associated with that key, notice that at this stage each key has multiple values, and these values are the data that we want to reduce, that is, transform many things in to a few things. This is why in this function we just sum the values without caring much about the key, this in the end is adding up the multiple [1, 1, ...] associated with each word, and thus giving us the count of words.

The key thing to remember here is that these processes are continually happening over and over until all the data has been processed, so the reducer at some point will process the same key multiple times; if (bitcoin, [1, 1]), (blockchain, [1]) became (bitcoin, [2]), (blockchain, [1]) and later bitcoin is a word that is emitted again by a mapper it will become (bitcoin, [2, 1]), (blockchain, [1]) and of course later another reducer will take that and sum the values [2, 1] to 3 and add it to the accumulator for the next reduce iteration.

To use our mapper and reducer functions that we just have defined we call the mapReduce() function in our collection, here the first argument is the reference to the map function followed by a reference to the reduce function and finally a JASON object to specify some options, in this case we want to save the final output in a collection named trending_words, if you rather want to see the result without saving them, then use { 'out': { inline: 1 } } but of course this is just practical when “debugging” our functions definition.

db.statuses.mapReduce(mapper, reducer, { 'out': 'trending_words' })

Now we query the new collection trending_words and sort the results:

db.trending_words.aggregate([
  { '$match': { 'value': { '$gt': 10 } } },
  { '$sort': { 'value': -1 } }
])

And we get a similar output as the aggregation method. Notice that the sort of the values can’t happen in the map reduce stage, data is just crunched concurrently in no particular order.

Conclusion

Map reduce is a data processing architecture ideal for handling big amounts of data with ease, MongoDB has built-in support for this operation that will work well even when your data is scattered across multiple nodes. Here we explained the basic principles to be able to utilize this technique, needless to say if you’re really into the big-data thing you’ll be better of with hadoop, spark or fl ink for the data processing stage and using Mongo or other databases just for storage of such processed information.