Realtime Language Predictive Analytics with scikit-learn, Flask and RabbitMQ – Part 2 Exploration

In this part of the tutorial, we continue with exploring the languages scraped from Wikipedia’s most popular languages data dump.

Read Part 1 of this tutorial here. We attach the IPython notebook model-exploration.ipynb here to make aid with the exploration:

In [1]:
# -*- coding: utf-8 -*-
%matplotlib inline
import bz2
import io
import matplotlib.pyplot as plt
import numpy as np
import os
import cPickle as pickle
import sys
import re
import seaborn as sns

from lang_map import code_lang_map
from pandas import DataFrame
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import pairwise_distances
from collections import Counter
:0: FutureWarning: IPython widgets are experimental and may change in the future.

Calculate the percentages from a pandas dataframe of letter counts and add the percentages as new columns to the given dataframe

In [2]:
def percentages(df):
    df2 = df.join(df.div(df['letters_count'], axis='index'), rsuffix='_perc')
    percs = [col for col in df2.columns if col.endswith('_perc')]
    return df2[percs]

Count the number of times each character occurs in each language and grab the top 2000 from each of them

In [3]:
def get_top_letters():
    files = os.listdir('articles')
    top_letters = []    
    for f in files:        
        c = Counter()
        for article in parse('articles/'+f):
            c['articles_count'] += 1
            for letter in article['content']:
                c[letter] += 1
                c['letters_count'] += 1
        d = dict(c.most_common(2000))
        d['lang'] = os.path.splitext(f)[0]
        top_letters.append(d)
    return top_letters

Parse data from the scrubbed wikipedia articles using regex and return a dictionary

In [4]:
def parse(filename):
    data = ""
    # regex pattern for scrubbing extracted wikipedia article 
    article_rgx = re.compile(
    r'<doc id="(?P<id>\d+)" url="(?P<url>[^"]+)" title="(?P<title>[^"]+)">\n(?P<content>.+)\n<\/doc>', re.S | re.U)
    with io.open(filename, 'r', encoding='utf8') as f:
        for line in f:
            #line = line.decode('utf-8')
            data += line
            if line.count('</doc>'):
                m = article_rgx.search(data)
                if m:
                    yield m.groupdict()
                data = ""

Load the articles dictionary back from pickle

In [5]:
def load_data():
    f = open('letters2.pkl', 'rb')
    data = pickle.load(f)
    f.close()
    return data

Save the articles dictionary into pickle

In [6]:
def dump_data():   
    top_letters = get_top_letters()  
    with open('letters2.pkl', 'wb') as handle:
        pickle.dump(top_letters, handle, protocol=2) 
In [7]:
data = load_data() 
In [8]:
df = DataFrame(data)
df.fillna(0, inplace=True)
df = df.set_index('lang')
In [9]:
df.head(7)
Out[9]:
! # $ % & (
lang
ar 83595 3852786 551 41465 94 138 3897 46 552 24992 0 0 0 0 0 0 0 0 0 0
bg 63516 2695979 430 27985 25 214 4013 134 701 26644 0 0 0 0 0 0 0 0 1 0
ca 94031 5139533 858 91075 148 101 4348 185 227845 42403 0 0 0 0 0 0 0 0 1 0
cs 73349 2981587 421 44276 42 57 4313 188 1423 39133 0 0 0 0 0 0 0 0 0 0
da 62393 2577203 308 55627 98 321 3096 729 6793 18269 0 0 0 0 0 0 0 0 0 0
de 158733 7488125 1118 144548 253 209 8685 1284 1826 69541 0 0 0 0 0 0 0 0 0 0
en 156617 9991721 2380 361454 1794 10915 9058 3350 127764 59698 0 0 0 0 0 0 0 0 0 1

7 rows × 6992 columns

In [10]:
df3 = percentages(df)
df3.values[np.isnan(df3.values)] = np.median(df3.values[~np.isnan(df3.values)])
df3.head(7)
Out[10]:
_perc _perc !_perc “_perc #_perc $_perc %_perc &_perc ‘_perc (_perc m_perc n_perc p_perc z_perc ~_perc 「_perc 」_perc 、_perc _perc �_perc
lang
ar 0.003595 0.165686 0.000024 0.001783 0.000004 0.000006 0.000168 0.000002 0.000024 0.001075 0 0 0 0 0 0 0 0 0.000000e+00 0.000000e+00
bg 0.003506 0.148829 0.000024 0.001545 0.000001 0.000012 0.000222 0.000007 0.000039 0.001471 0 0 0 0 0 0 0 0 5.520417e-08 0.000000e+00
ca 0.002943 0.160856 0.000027 0.002850 0.000005 0.000003 0.000136 0.000006 0.007131 0.001327 0 0 0 0 0 0 0 0 3.129786e-08 0.000000e+00
cs 0.003517 0.142946 0.000020 0.002123 0.000002 0.000003 0.000207 0.000009 0.000068 0.001876 0 0 0 0 0 0 0 0 0.000000e+00 0.000000e+00
da 0.003642 0.150450 0.000018 0.003247 0.000006 0.000019 0.000181 0.000043 0.000397 0.001066 0 0 0 0 0 0 0 0 0.000000e+00 0.000000e+00
de 0.002860 0.134936 0.000020 0.002605 0.000005 0.000004 0.000157 0.000023 0.000033 0.001253 0 0 0 0 0 0 0 0 0.000000e+00 0.000000e+00
en 0.002512 0.160279 0.000038 0.005798 0.000029 0.000175 0.000145 0.000054 0.002049 0.000958 0 0 0 0 0 0 0 0 0.000000e+00 1.604116e-08

7 rows × 6992 columns

In [11]:
num_clusters = 4
palette = sns.color_palette('colorblind', num_clusters)

Run KMeans clustering algorithm on letter percentages

KMeans automatically groups the data together based on similarity

In [12]:
est = KMeans(num_clusters, max_iter=30000)
est.fit(df3.values)
y_kmeans = est.predict(df3.values)

Run Principal Component Analysis to reduce the number of columns from 10000 to 2

In [13]:
pca = PCA(n_components=2)
pca.fit(df3.values)
X_trans = pca.transform(df3.values)

Plot the results

In [14]:
plt.scatter(X_trans[:, 0], X_trans[:, 1], c=[palette[y] for y in y_kmeans], s=50)
Out[14]:
<matplotlib.collections.PathCollection at 0x18f500f0>

Helper function for printing similarity distance details in clusters

In [15]:
def print_sim(dist, x, y, langs):
    print("{0} ({1}, {2})".format(dist, code_lang_map[langs[x]], code_lang_map[langs[y]]))

Find the languages that are most similar

In [16]:
cluster_dfs = {}
cluster_langs = {}
cluster_distances = {}
langs = list(code_lang_map.keys())
In [17]:
for cluster_num in range(4):
    indexes = [i for i in range(y_kmeans.shape[0]) if y_kmeans[i] == cluster_num]
    cluster_langs[cluster_num] = [langs[i] for i in indexes]
    cluster_dfs[cluster_num] = df3.loc[cluster_langs[cluster_num], :]

    # Calculate pairwise distances and display
    print('Cluster #{0}'.format(cluster_num))
    
    cluster_distances[cluster_num] = pairwise_distances(cluster_dfs[cluster_num].values)
    n, m = cluster_distances[cluster_num].shape
    distances = set([])
    for i in range(n):
        for j in range(m):
            if i == j:
                continue
            distances.add((cluster_distances[cluster_num][i, j], tuple(sorted([i, j]))))
    for a in sorted(distances)[:20]:
        print_sim(a[0], a[1][0], a[1][1], langs)
Cluster #0
0.0134930582372 (Arabic, Persian)
0.0134930582373 (Arabic, Persian)
0.0369898458834 (Croatian, Finnish)
0.0382281250592 (Finnish, Czech)
0.0448013395959 (Croatian, German)
0.0485874241609 (Finnish, Kazakh)
0.050683479432 (Kazakh, Serbian)
0.0529485885806 (German, Finnish)
0.054227525429 (Vietnamese, Indonesian)
0.0548448505622 (Croatian, Kazakh)
0.05681091513 (German, Portuguese)
0.0588135128334 (Chinese, Dutch)
0.0591008339765 (Croatian, Czech)
0.0593707398764 (Croatian, Serbian)
0.0598196627665 (Finnish, Portuguese)
0.0604485418978 (Czech, Kazakh)
0.0606184195518 (Turkish, Spanish)
0.0607610860892 (Portuguese, Kazakh)
0.0620393067053 (Croatian, Portuguese)
0.0634290019112 (German, Spanish)
Cluster #1
Cluster #2
0.0475935968993 (Esperanto, Croatian)
0.0835127759886 (Chinese, Turkish)
0.103037792081 (Esperanto, Chinese)
0.110142539949 (Chinese, Croatian)
0.114259149551 (Bulgarian, Chinese)
0.117867182303 (Bulgarian, Turkish)
0.11799613275 (Croatian, Turkish)
0.118815437758 (Esperanto, Turkish)
0.130928888087 (Esperanto, Bulgarian)
0.135120992536 (Bulgarian, Croatian)
0.149082823968 (Vietnamese, Chinese)
0.149082823968 (Vietnamese, Chinese)
0.165697588003 (Vietnamese, Turkish)
0.180844318911 (Vietnamese, Esperanto)
0.180844318911 (Vietnamese, Esperanto)
0.185589982372 (Vietnamese, Croatian)
0.185589982372 (Vietnamese, Croatian)
0.196770170763 (Vietnamese, Bulgarian)
0.246625874182 (Vietnamese, Ukrainian)
0.252329806433 (Chinese, Ukrainian)
Cluster #3
0.0426958098309 (Vietnamese, Ukrainian)
0.0738316630839 (Vietnamese, Bulgarian)
0.0791920221606 (Bulgarian, Ukrainian)
0.24455033328 (Esperanto, Chinese)
0.248370778111 (Bulgarian, Chinese)
0.249425940244 (Chinese, Ukrainian)
0.25283853232 (Vietnamese, Chinese)
0.266300246387 (Esperanto, Bulgarian)
0.267594494349 (Esperanto, Ukrainian)
0.271414734034 (Vietnamese, Esperanto)
0.271414734034 (Vietnamese, Esperanto)
In [ ]:
 

Realtime Language Predictive Analytics with scikit-learn, Flask and RabbitMQ – Part 1

This is my attempt at developing and deploying the machine learning engine used by Michael Becker in his PyCon 2014 talk on Realtime Predictive Analytics with scikit-learn & RabbitMQ. This will be a series of tutorials as an opportunity to share my implementation with you.

To make this series more intuitive, it is broken down into three parts. I will start with the OSEMN (Obtain, Scrub, Explore, Model, and iNterpret) process tutorial as part 1, part 2 will comprise of the model development and then finally part 3 will cover model distribution (i.e. deployment and scaling).

NB – This is my first attempt as a machine learning practitioner so suggestions and any helpful advice are most welcome. If you also have a question about the code or the hypotheses I made, do not hesitate to post a comment in the comment section below.

All code for this tutorial is available in a GitHub repo which you can go ahead and fork off!

Getting and processing the data

Obtain

For us to build a real-time predictive machine learning algorithm, we need to obtain the training data that should be mainly text from different languages. Wikipedia is a natural choice for this data source and has abundant supply of articles covered in multiple languages thorugh their datasets dumps available online. The extraction process follows the one Michael Becker implemented where we will use the Wikimedia Pageview API to get the top 1000 articles per month of pageview count timeseries for a wikipedia project of each language for the last 7 months.

We use MongoDB to persist the data from the JSON response of the API call and the aggregation framework to then aggregate the top 5000 articles from each language. We do this by starting a MongoDB instance locally and connect to it on the default port 27017 using pymongo, the official driver for MongoDB:

# -*- coding: utf-8 -*-
import requests
import sys

from lang_map import code_lang_map
from pymongo import InsertOne, MongoClient


# connect to database
connection = MongoClient('localhost', 27017)

# use test database
db = connection.test

# handle to wikipedia collection
wikipedia = db.wikipedia

def insert_top_articles():  
	"""
	Insert in mongo the top 1000 articles per month of pageview count timeseries for a wikipedia project of each language for the first 6 months.

	"""
	# initialize bulk insert operations list
	ops = []

	# clear existing wikipedia collection
	wikipedia.remove()

	for lang in code_lang_map.keys():
		for month in range(1,7):   
			try:
				url = 'https://wikimedia.org/api/rest_v1/metrics/pageviews/top/{0}.wikipedia/all-access/2016/{1}/all-days'.format(lang, str(month).zfill(2))        
				result = requests.get(url).json()
				
				if 'items' in result and len(result['items']) == 1:
					r = result['items'][0]
					for article in r['articles']:
						article['lang'] = r['project'][:2]
						ops.append(InsertOne(article))

			except:
				print('ERROR while fetching or parsing ' + url)

	wikipedia.bulk_write(ops)


def get_top_articles(lang):
	"""
	Aggregate top 5000 articles from a daily pageview count timeseries of all projects for the last 6 months.

	"""

	# initialize aggregation pipeline 
	pipeline = [    
		{ "$match": { "lang": lang } },
		{
			"$group": {
				"_id": "$article",
				"lang": { "$first": "$lang" },  
				"max_views": { "$max": "$views" }            
			}
		},
		{
			"$project": {
				"page": "$_id", "_id": 0,
				"lang": 1,
				"max_views": 1
			}
		},
		{ "$sort": { "max_views": -1 } },
		{ "$limit": 5000 }
	]

	result = list(wikipedia.aggregate(pipeline))
	return result

Now, having generated the list of articles above, we use the Wikipedia Special:Export page for each article to export the data and we use this simple Python script to execute queries against the Wikipedia Special:Export page:

# -*- coding: utf-8 -*-
import bz2
import requests
import sys
from shutil import copyfileobj

def load_articles(lang, pagelist):
    
    url = "https://{0}.wikipedia.org/w/index.php?title=Special:Export&amp;action=submit".format(lang)
    origin = "https://{0}.wikipedia.org".format(lang)
    referer = "https://{0}.wikipedia.org/wiki/Special:Export".format(lang)
    filename = "dumps/wikipedia-{0}.xml".format(lang)
    pages = '\n'.join(pagelist)

    headers = {
        "Origin": origin,
        "Accept-Encoding": "gzip,deflate,sdch",
        "User-Agent": "Mozilla/5.0 Chrome/35.0",
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
        "Cache-Control": "max-age=0",
        "Referer": referer,
        "Connection": "keep-alive",
        "DNT": "1"
    }
    payload = {
        'catname': '',
        'pages': pages,
        'curonly': '1',
        'wpDownload': '1',
    }

    res = requests.post(url, data=payload, headers=headers)
    with open(filename, 'wb') as f:
        f.write(res.content)

    with open(filename, 'rb') as input:
        with bz2.BZ2File(filename + '.bz2', 'wb', compresslevel=9) as output:
            copyfileobj(input, output)
    os.remove(filename)

    return filename + '.bz2'

Example xml file downloaded:

<mediawiki xml:lang="en">
	<page>
		<title>Page title</title>
		<!-- page namespace code -->
		<ns>0</ns>
		<id>2</id>
		<!-- If page is a redirection, element "redirect" contains title of the page redirect to -->
		<redirect title="Redirect page title" />
		<restrictions>edit=sysop:move=sysop</restrictions>
		<revision>
			<timestamp>2001-01-15T13:15:00Z</timestamp>
			<contributor>
				<username>Foobar</username>
				<id>65536</id>
			</contributor>
			<comment>I have just one thing to say!</comment>
			<text>A bunch of [text] here.</text>
			<minor />
		</revision>
		<revision>
			<timestamp>2001-01-15T13:10:27Z</timestamp>
			<contributor><ip>10.0.0.2</ip></contributor>
			<comment>new!</comment>
			<text>An earlier [[revision]].</text>
		</revision>
		<revision>
			<!-- deleted revision example -->
			<id>4557485</id>
			<parentid>1243372</parentid>
			<timestamp>2010-06-24T02:40:22Z</timestamp>
			<contributor deleted="deleted" />
			<model>wikitext</model>
			<format>text/x-wiki</format>
			<text deleted="deleted" />
			<sha1/>
		</revision>
	</page>

	<page>
		<title>Talk:Page title</title>
		<revision>
			<timestamp>2001-01-15T14:03:00Z</timestamp>
			<contributor><ip>10.0.0.2</ip></contributor>
			<comment>hey</comment>
			<text>WHYD YOU LOCK PAGE??!!! i was editing that jerk</text>
		</revision>
	</page>
</mediawiki>

Scrub

The dumps from above export process are in XML format and for scrubbing the Wikipedia markup converting it to plaintext, the WikiExtractor.py Python script was used since it extracts and cleans most of the xml markup from a Wikipedia database dump.

Example of Use

The following commands illustrate how to apply the script to a Wikipedia dump:

> python WikiExtractor.py 'dumps/wikipedia-en.xml.bz2' -cb 250K -o extracted -

In order to combine the whole extracted text into a single file one can issue:

> find extracted -name '*bz2' -exec bunzip2 -c {} \; > articles.xml
> rm -rf extracted

The output is stored in a number of files of similar size in a given directory. Each file will contain several documents in the format:

    <doc id="" revid="" url="" title="">
      ...
    </doc>

The output from the above tool needs to be cleaned further by using a parser that uses regex to strip the xml markup and return a dictionary with the data that we need that has four keys id (an integer tracking the version of the page), the url (a permanent link to this version of the page), the title, and the plain text content of the page:

import bz2
import re

article = re.compile(r'[^"]+)" title="(?P[^"]+)"&gt;\n(?P.+)\n&lt;\/doc&gt;', re.S|re.U)

def parse(filename):
    data = ""
    with bz2.BZ2File(filename, 'r') as f:
	for line in f:
	    line = line.decode('utf-8')
	    data += line
	    if line.count(''):
		m = article.search(data)
		if m:
		    yield m.groupdict()
		data = ""

Explore

Now that we have scrubbed our data, we need to resist the urge to dive in and immediately start building models and getting answers explore this data with the overall view of summarizing the data’s main characteristics i.e seeing what the data can tell us beyond the formal modeling.

We can start off with analyzing the number of times each character occurs in each language. We use a Counter object to achieve this task as it turns a sequence of values into a defaultdict(int)-like object mapping keys to counts. For example

from collections import Counter
c = Counter([0, 1, 2, 0])
# c is basically { 0: 2, 1: 1, 2: 1}

An instance of a Counter object has a most_common method which will be useful to get the top 2000 characters per language.

This Python data structure will be ideal for generating the counts for each character in a language article:

from collections import Counter

files = [f for f in os.listdir('.') if os.path.isfile(f)]
top_letters = []
for f in files:
	print(f)
	c = Counter()
	for article in parse(f):
		c['articles_count'] += 1
		for letter in article['content']:
			c[letter] += 1
			c['letters_count'] += 1
	d = dict(c.most_common(2000))
	top_letters.append(d)

Once we get our dictionary list top_letters, we can load it into a pandas DataFrame, which represents a tabular, spreadsheet like data structure containing an ordered collection of columns, each of which can be a different value type:

from pandas import DataFrame

df = DataFrame(top_letters)
df.fillna(0, inplace=True)
df = df.set_index('lang')

We will continue the series in the next part where we will go one step further and use visualization tools to explore the data and then develop the models for our language classifiers.

Converting a field to another type and updating the entire collection in Bulk

Here, we look at MongoDB’s Bulk Operations and how they can be used to update a large collection more efficiently. We look at a specific update case where one wants to update all the documents in the collection by converting a field to another type, for instance the original collection may have “numerical” or “date” fields saved as strings:

{
    "name": "Alice",
    "salary": "57871",
    "dob": "1986-08-21"
},
{
    "name": "Bob",
    "salary": "48974",
    "dob": "1990-11-04"
}

The objective would be to update a humongous collection like the above to

{
    "name": "Alice",
    "salary": 57871,
    "dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
    "name": "Bob",
    "salary": 48974,
    "dob": ISODate("1990-11-04T00:00:00.000Z")
}

For relatively small data, one can achieve the above by iterating the collection using a snapshot with the cursor’s forEach() method and updating each document as follows:

db.test.find({
    "salary": { "$exists": true, "$type": 2 },
    "dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    db.test.updateOne(
        { "_id": doc._id },
        { "$set": { "salary": newSalary, "dob": newDob } }
    );
});

Whilst this is optimal for small collections, performance with large collections is greatly reduced since looping through a large dataset and sending each update operation per request to the server incurs a computational penalty.

The Bulk() API comes to the rescue and greatly improves performance since write operations are sent to the server only once in bulk. Efficiency is achieved since the method does not send every write request to the server (as with the current update statement within the forEach() loop) but just once in every 1000 requests, thus making updates more efficient and quicker than currently is.

Using the same concept above with the forEach() loop to create the batches, we can update the collection in bulk as follows. In this demonstration the Bulk() API available in MongoDB versions >= 2.6and < 3.2 uses the initializeUnorderedBulkOp() method to execute in parallel, as well as in a nondeterministic order, the write operations in the batches.

It updates all the documents in the clients collection by changing the salary and dob fields to numerical and datetime values respectively:

var bulk = db.test.initializeUnorderedBulkOp(),
    counter = 0; // counter to keep track of the batch update size

db.test.find({
    "salary": { "$exists": true, "$type": 2 },
    "dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    bulk.find({ "_id": doc._id }).updateOne({ 
        "$set": { "salary": newSalary, "dob": newDob }
    });

    counter++; // increment counter
    if (counter % 1000 == 0) {
        bulk.execute(); // Execute per 1000 operations 
        // and re-initialize every 1000 update statements
        bulk = db.test.initializeUnorderedBulkOp();
    }
});

The next example applies to the new MongoDB version 3.2 which has since deprecated the Bulk() API and provided a newer set of apis using bulkWrite().

It uses the same cursors as above but creates the arrays with the bulk operations using the same forEach() cursor method to push each bulk write document to the array. Because write commands can accept no more than 1000 operations, there’s need to group operations to have at most 1000 operations and re-intialise the array when the loop hits the 1000 iteration:

var cursor = db.test.find({
        "salary": { "$exists": true, "$type": 2 },
        "dob": { "$exists": true, "$type": 2 }
    }),
    bulkUpdateOps = [];

cursor.snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    bulkUpdateOps.push({ 
        "updateOne": {
            "filter": { "_id": doc._id },
            "update": { "$set": { "salary": newSalary, "dob": newDob } }
         }
    });

    if (bulkUpdateOps.length === 1000) {
        db.test.bulkWrite(bulkUpdateOps);
        bulkUpdateOps = [];
    }
});         

if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }

Getting Counts of Distinct Key Values

The following example shows how to get results of an aggregate operation where the counts of distinct key values are in the format { key: count }.

Suppose you have a test collection with the sample documents:

db.test.insert([
    { "_id": 1, "name" : "t1", "loglevel" : "ERROR" },
    { "_id": 2, "name" : "t1", "loglevel" : "ERROR" },
    { "_id": 3, "name" : "t1", "loglevel" : "WARNING" },
    { "_id": 4, "name" : "t1", "loglevel" : "WARNING" },
    { "_id": 5, "name" : "t1", "loglevel" : "WARNING" },
    { "_id": 6, "name" : "t1", "loglevel" : "INFO" },
    { "_id": 7, "name" : "t2", "loglevel" : "INFO" },
    { "_id": 8, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 9, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 10, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 11, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 12, "name" : "t3", "loglevel" : "INFO" }
])

and would like to aggregate the collection to return the following result

/* 1 */
{
    "_id" : "t1",
    "error" : 2,
    "warning" : 3,
    "info" : 1
}

/* 2 */
{
    "_id" : "t2",
    "error" : 4,
    "warning" : 0,
    "info" : 1
}

/* 3 */
{
    "_id" : "t3",
    "error" : 0,
    "warning" : 0,
    "info" : 1
}

Accomplishing the above would require using the $cond operator in the $sum accumulator operator. The $cond operator will evaluate a logical condition based on its first argument (if) and then returns the second argument where the evaluation is true (then) or the third argument where false (else). This converts the true/false logic into 1 and 0 numerical values that feed into $sum respectively:

{
    "$sum": {
        "$cond": [ { "$eq": [ "$loglevel", "WARNING" ] }, 1, 0 ]
    }
}

As a resulting pipeline, one needs to run the aggregation operation

db.test.aggregate([
    {
        "$group": {
            "_id": "$name",
            "error": {
                "$sum": {
                   "$cond": [ { "$eq": [ "$loglevel",  "ERROR" ] }, 1, 0]
                }
            },
            "warning":{
                "$sum": {
                   "$cond": [ { "$eq": [ "$loglevel", "WARNING" ] }, 1, 0 ]
                }
            },
            "info": {
                "$sum": {
                   "$cond": [ { "$eq": [ "$loglevel",  "INFO" ] }, 1, 0 ]
                }
            }
        }
    }
])

The above pipeline can also be dynamically generated, given an array of possible statuses:

var statuses = ["ERROR", "WARNING", "INFO"],
    groupOperator = { "$group": { "_id": "$name" } };

statuses.forEach(function (status){
    groupOperator["$group"][status.toLowerCase()] = {
       "$sum": {
           "$cond": [ { "$eq": [ "$loglevel",  status ] }, 1, 0]
       }
   }
});

db.test.aggregate([groupOperator]);

If the possible key values are not known in advance, an initial step which queries for this list is necessary by running the distinct command on the loglevel field. This will give you an object that contains a list of the distinct roles:

var result = db.runCommand ( { distinct: "test", key: "loglevel" } )
var statuses = result.values;
printjson(statuses); // this will print ["ERROR", "WARNING", "INFO"]

Now given the list above, you can assemble your pipeline by creating an object that will have its properties set using JavaScript’s reduce() method. The following demonstrates this:


var groupObj = { "$group": { "_id": "$name" } };
var groupPipeline = statuses.reduce(function(obj, status) {  
    obj["$group"][status.toLowerCase()] = {
        "$sum": {
            "$cond": [ { "$eq": [ "$loglevel", status ] }, 1, 0 ]
        }
    };
    return obj;
}, groupObj );

Use the resulting document in the final aggregation pipeline as:

db.test.aggregate([groupPipeline]);

For a more flexible and better performant approach which executes much faster than the above, consider running an alternative pipeline as follows

db.test.aggregate([
    { 
        "$group": {
            "_id": { 
                "name": "$name",
                "status": { "$toLower": "$loglevel" }
            },
            "count": { "$sum": 1 }
        }
    },
    { 
        "$group": {
            "_id": "$_id.name",
            "counts": {
                "$push": {
                    "loglevel": "$_id.status",
                    "count": "$count"
                }
            }
        }
    }
])

Sample Output

/* 1 */
{
    "_id" : "t2",
    "counts" : [ 
        {
            "loglevel" : "error",
            "count" : 4
        }, 
        {
            "loglevel" : "info",
            "count" : 1
        }
    ]
}

/* 2 */
{
    "_id" : "t1",
    "counts" : [ 
        {
            "loglevel" : "error",
            "count" : 2
        }, 
        {
            "loglevel" : "warning",
            "count" : 3
        }, 
        {
            "loglevel" : "info",
            "count" : 1
        }
    ]
}

/* 3 */
{
    "_id" : "t3",
    "counts" : [ 
        {
            "loglevel" : "info",
            "count" : 1
        }
    ]
}

Chain of Responsibility Pattern

This type of pattern falls in the behavioural design patterns category. It is a design pattern where a sender sends a request to a chain of objects, the sender does not know which object in the chain will handle the request and the objects in the chain decide themselves who to honour the request. Every object in the chain has the responsibility to decide the request, if they cannot serve the request then it may forward it to the next node in the chain, hence the chain of responsibility. This pattern of delegation is often encountered in the real world where there’s one interface for the client (sender) to go through. A good example is a loan application where your loan request may be channeled and handled by one particular department. Another concrete example is an expenses/requisitions request approval in a company setup where each head of department has the approval authority and depending on the value of the expense/requisition, his authority may approve the request or forward it to the next authority in the chain. Other good examples which come to mind are Coin sorting machine, rank poker hands and ATM money dispensers.

Now let’s see how we might implement the Chain of Responsibility with some code example. Let’s use expenses/requisitions request approval explained above. We’ll start by defining our Requisition interface, which simply has the Amount property (the requisition value)


public interface IRequisition
{
    Decimal Amount { get; }
}

Its concrete implementation is as follows


public class Requisition : IRequisition
{
    public Requisition(Decimal amount) {
        Amount = amount;
    }

    public Decimal Amount { get; private set; }
}

For the requisition approval authority, an employee who has the responsibility to approve a requisition

public interface IRequisitionApprover
{
    ApprovalResponse ApproveRequisition(IRequisition requisition);
}

public enum ApprovalResponse
{
    Denied,
    Approved,
    BeyondApprovalLimit,
}

The implementation follows:

public class Employee : IRequisitionApprover
{
    public Employee(string name, Decimal approvalLimit)
	{
		Name = name;
		this.approvalLimit = approvalLimit;
	}

	public string Name { get; private set; }

	public ApprovalResponse ApproveExpense(IRequisition requisition)
	{
		return (requisition.Amount > _approvalLimit)
			? ApprovalResponse.BeyondApprovalLimit: ApprovalResponse.Approved;
	}

	private readonly Decimal approvalLimit;
}

Next we define the RequisitionHandler class which represents a single link in the chain of responsibility

public interface IRequisitionHandler
{
	ApprovalResponse Approve(IRequisition requisition);
	void RegisterNext(IRequisitionHandler next);
}

public class RequisitionHandler: IRequisitionHandler
{
	public ExpenseHandler(IRequisitionApprover requisitionApprover)
	{
		approver = requisitionApprover;
		next = EndOfChainExpenseHandler.Instance;
	}

	public ApprovalResponse Approve(IRequisition requisition)
	{
		ApprovalResponse response = approver.ApproveExpense(requisition);
		if(response == ApprovalResponse.BeyondApprovalLimit)
		{
			return next.Approve(requisition);
		}

		return response;
	}

	// Register the next link in the chain i.e. if the current handler
	// cannot approve the requisition request then forward it to the next node in the chain
	public void RegisterNext(IRequisitionHandler next)
	{
		this.next = next;
	}

	private readonly IRequisitionApprover approver;
	private IRequisitionHandler next;
}

We need to define an end of chain handler that handles requisition values beyond the approval limit of the last member of the chain, this class exposes a singleton instance

class EndOfChainRequisitionHandler : IRequisitionHandler
{
    private EndOfChainRequisitionHandler() { }
    public static EndOfChainRequisitionHandler Instance
	{
		get { return instance; }
	}

	public ApprovalResponse Approve(IExpenseReport expenseReport)
	{
		// return denied, always and only since we've made it to this point and nobody has
		// approved the requisition and we shouldn't approve it here
		return ApprovalResponse.Denied;
	}

	public void RegisterNext(IExpenseHandler next)
	{
		throw new InvalidOperationException("End of chain handler must be the end of the chain!");
	}

	private static readonly EndOfChainRequisitionHandler instance = new EndOfChainRequisitionHandler();
}

For the application logic

class Approval
{
	static void Main()
	{
		RequisitionHandler josh = new RequisitionHandler(new Employee("Josh SeniorDeveloper", Decimal.Zero));
		RequisitionHandler scott = new RequisitionHandler(new Employee("Scott LineManager", new Decimal(1000)));
		RequisitionHandler rudo = new RequisitionHandler(new Employee("Rudo ViceChair", new Decimal(5000)));
		RequisitionHandler chris = new RequisitionHandler(new Employee("Chris Chairman", new Decimal(20000)));

		josh.RegisterNext(scott);
		scott.RegisterNext(rudo);
		rudo.RegisterNext(chris);

		Decimal requisitionAmount;
		if (ConsoleInput.TryReadDecimal("Expense requisition amount:", out requisitionAmount))
		{
			// Create a requisition object
			IRequisition requisition = new Requisition(requisitionAmount);

			// Got to the head of the chain Josh and ask if he can approve this requisition
			ApprovalResponse response = josh.Approve(requisition);

			Console.WriteLine("The request was {0}.", response);

			Console.ReadKey();
		}
	}
}

Helper class

public static class ConsoleInput
{
	public static bool TryReadDecimal(string prompt, out Decimal value)
	{
		value = default(Decimal);

		while (true)
		{
			Console.WriteLine(prompt);
			string input = Console.ReadLine();
			if (string.IsNullOrEmpty(input))
			{
				return false;
			}

			try
			{
				value = Convert.ToDecimal(input);
				return true;
			}
			catch (FormatException)
			{
				Console.WriteLine("The input is not a valid decimal.");
			}
			catch (OverflowException)
			{
				Console.WriteLine("The input is not a valid decimal.");
			}
		}
	}
}