Realtime Language Predictive Analytics with scikit-learn, Flask and RabbitMQ – Part 1

This is my attempt at developing and deploying the machine learning engine used by Michael Becker in his PyCon 2014 talk on Realtime Predictive Analytics with scikit-learn & RabbitMQ. This will be a series of tutorials as an opportunity to share my implementation with you.

To make this series more intuitive, it is broken down into three parts. I will start with the OSEMN (Obtain, Scrub, Explore, Model, and iNterpret) process tutorial as part 1, part 2 will comprise of the model development and then finally part 3 will cover model distribution (i.e. deployment and scaling).

NB – This is my first attempt as a machine learning practitioner so suggestions and any helpful advice are most welcome. If you also have a question about the code or the hypotheses I made, do not hesitate to post a comment in the comment section below.

All code for this tutorial is available in a GitHub repo which you can go ahead and fork off!

Getting and processing the data

Obtain

For us to build a real-time predictive machine learning algorithm, we need to obtain the training data that should be mainly text from different languages. Wikipedia is a natural choice for this data source and has abundant supply of articles covered in multiple languages thorugh their datasets dumps available online. The extraction process follows the one Michael Becker implemented where we will use the Wikimedia Pageview API to get the top 1000 articles per month of pageview count timeseries for a wikipedia project of each language for the last 7 months.

We use MongoDB to persist the data from the JSON response of the API call and the aggregation framework to then aggregate the top 5000 articles from each language. We do this by starting a MongoDB instance locally and connect to it on the default port 27017 using pymongo, the official driver for MongoDB:

# -*- coding: utf-8 -*-
import requests
import sys

from lang_map import code_lang_map
from pymongo import InsertOne, MongoClient


# connect to database
connection = MongoClient('localhost', 27017)

# use test database
db = connection.test

# handle to wikipedia collection
wikipedia = db.wikipedia

def insert_top_articles():  
	"""
	Insert in mongo the top 1000 articles per month of pageview count timeseries for a wikipedia project of each language for the first 6 months.

	"""
	# initialize bulk insert operations list
	ops = []

	# clear existing wikipedia collection
	wikipedia.remove()

	for lang in code_lang_map.keys():
		for month in range(1,7):   
			try:
				url = 'https://wikimedia.org/api/rest_v1/metrics/pageviews/top/{0}.wikipedia/all-access/2016/{1}/all-days'.format(lang, str(month).zfill(2))        
				result = requests.get(url).json()
				
				if 'items' in result and len(result['items']) == 1:
					r = result['items'][0]
					for article in r['articles']:
						article['lang'] = r['project'][:2]
						ops.append(InsertOne(article))

			except:
				print('ERROR while fetching or parsing ' + url)

	wikipedia.bulk_write(ops)


def get_top_articles(lang):
	"""
	Aggregate top 5000 articles from a daily pageview count timeseries of all projects for the last 6 months.

	"""

	# initialize aggregation pipeline 
	pipeline = [    
		{ "$match": { "lang": lang } },
		{
			"$group": {
				"_id": "$article",
				"lang": { "$first": "$lang" },  
				"max_views": { "$max": "$views" }            
			}
		},
		{
			"$project": {
				"page": "$_id", "_id": 0,
				"lang": 1,
				"max_views": 1
			}
		},
		{ "$sort": { "max_views": -1 } },
		{ "$limit": 5000 }
	]

	result = list(wikipedia.aggregate(pipeline))
	return result

Now, having generated the list of articles above, we use the Wikipedia Special:Export page for each article to export the data and we use this simple Python script to execute queries against the Wikipedia Special:Export page:

# -*- coding: utf-8 -*-
import bz2
import requests
import sys
from shutil import copyfileobj

def load_articles(lang, pagelist):
    
    url = "https://{0}.wikipedia.org/w/index.php?title=Special:Export&action=submit".format(lang)
    origin = "https://{0}.wikipedia.org".format(lang)
    referer = "https://{0}.wikipedia.org/wiki/Special:Export".format(lang)
    filename = "dumps/wikipedia-{0}.xml".format(lang)
    pages = '\n'.join(pagelist)

    headers = {
        "Origin": origin,
        "Accept-Encoding": "gzip,deflate,sdch",
        "User-Agent": "Mozilla/5.0 Chrome/35.0",
        "Content-Type": "application/x-www-form-urlencoded",
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
        "Cache-Control": "max-age=0",
        "Referer": referer,
        "Connection": "keep-alive",
        "DNT": "1"
    }
    payload = {
        'catname': '',
        'pages': pages,
        'curonly': '1',
        'wpDownload': '1',
    }

    res = requests.post(url, data=payload, headers=headers)
    with open(filename, 'wb') as f:
        f.write(res.content)

    with open(filename, 'rb') as input:
        with bz2.BZ2File(filename + '.bz2', 'wb', compresslevel=9) as output:
            copyfileobj(input, output)
    os.remove(filename)

    return filename + '.bz2'

Example xml file downloaded:

<mediawiki xml:lang="en">
	<page>
		<title>Page title</title>
		<!-- page namespace code -->
		<ns>0</ns>
		<id>2</id>
		<!-- If page is a redirection, element "redirect" contains title of the page redirect to -->
		<redirect title="Redirect page title" />
		<restrictions>edit=sysop:move=sysop</restrictions>
		<revision>
			<timestamp>2001-01-15T13:15:00Z</timestamp>
			<contributor>
				<username>Foobar</username>
				<id>65536</id>
			</contributor>
			<comment>I have just one thing to say!</comment>
			<text>A bunch of [text] here.</text>
			<minor />
		</revision>
		<revision>
			<timestamp>2001-01-15T13:10:27Z</timestamp>
			<contributor><ip>10.0.0.2</ip></contributor>
			<comment>new!</comment>
			<text>An earlier [[revision]].</text>
		</revision>
		<revision>
			<!-- deleted revision example -->
			<id>4557485</id>
			<parentid>1243372</parentid>
			<timestamp>2010-06-24T02:40:22Z</timestamp>
			<contributor deleted="deleted" />
			<model>wikitext</model>
			<format>text/x-wiki</format>
			<text deleted="deleted" />
			<sha1/>
		</revision>
	</page>

	<page>
		<title>Talk:Page title</title>
		<revision>
			<timestamp>2001-01-15T14:03:00Z</timestamp>
			<contributor><ip>10.0.0.2</ip></contributor>
			<comment>hey</comment>
			<text>WHYD YOU LOCK PAGE??!!! i was editing that jerk</text>
		</revision>
	</page>
</mediawiki>

Scrub

The dumps from above export process are in XML format and for scrubbing the Wikipedia markup converting it to plaintext, the WikiExtractor.py Python script was used since it extracts and cleans most of the xml markup from a Wikipedia database dump.

Example of Use

The following commands illustrate how to apply the script to a Wikipedia dump:

> python WikiExtractor.py 'dumps/wikipedia-en.xml.bz2' -cb 250K -o extracted -

In order to combine the whole extracted text into a single file one can issue:

> find extracted -name '*bz2' -exec bunzip2 -c {} \; > articles.xml
> rm -rf extracted

The output is stored in a number of files of similar size in a given directory. Each file will contain several documents in the format:

    <doc id="" revid="" url="" title="">
      ...
    </doc>

The output from the above tool needs to be cleaned further by using a parser that uses regex to strip the xml markup and return a dictionary with the data that we need that has four keys id (an integer tracking the version of the page), the url (a permanent link to this version of the page), the title, and the plain text content of the page:

import bz2
import re

article = re.compile(r'[^"]+)" title="(?P[^"]+)"&gt;\n(?P.+)\n&lt;\/doc&gt;', re.S|re.U)

def parse(filename):
    data = ""
    with bz2.BZ2File(filename, 'r') as f:
	for line in f:
	    line = line.decode('utf-8')
	    data += line
	    if line.count(''):
		m = article.search(data)
		if m:
		    yield m.groupdict()
		data = ""

Explore

Now that we have scrubbed our data, we need to resist the urge to dive in and immediately start building models and getting answers explore this data with the overall view of summarizing the data’s main characteristics i.e seeing what the data can tell us beyond the formal modeling.

We can start off with analyzing the number of times each character occurs in each language. We use a Counter object to achieve this task as it turns a sequence of values into a defaultdict(int)-like object mapping keys to counts. For example

from collections import Counter
c = Counter([0, 1, 2, 0])
# c is basically { 0: 2, 1: 1, 2: 1}

An instance of a Counter object has a most_common method which will be useful to get the top 2000 characters per language.

This Python data structure will be ideal for generating the counts for each character in a language article:

from collections import Counter

files = [f for f in os.listdir('.') if os.path.isfile(f)]
top_letters = []
for f in files:
	print(f)
	c = Counter()
	for article in parse(f):
		c['articles_count'] += 1
		for letter in article['content']:
			c[letter] += 1
			c['letters_count'] += 1
	d = dict(c.most_common(2000))
	top_letters.append(d)

Once we get our dictionary list top_letters, we can load it into a pandas DataFrame, which represents a tabular, spreadsheet like data structure containing an ordered collection of columns, each of which can be a different value type:

from pandas import DataFrame

df = DataFrame(top_letters)
df.fillna(0, inplace=True)
df = df.set_index('lang')

We will continue the series in the next part where we will go one step further and use visualization tools to explore the data and then develop the models for our language classifiers.

Converting a field to another type and updating the entire collection in Bulk

Here, we look at MongoDB’s Bulk Operations and how they can be used to update a large collection more efficiently. We look at a specific update case where one wants to update all the documents in the collection by converting a field to another type, for instance the original collection may have “numerical” or “date” fields saved as strings:

{
    "name": "Alice",
    "salary": "57871",
    "dob": "1986-08-21"
},
{
    "name": "Bob",
    "salary": "48974",
    "dob": "1990-11-04"
}

The objective would be to update a humongous collection like the above to

{
    "name": "Alice",
    "salary": 57871,
    "dob": ISODate("1986-08-21T00:00:00.000Z")
},
{
    "name": "Bob",
    "salary": 48974,
    "dob": ISODate("1990-11-04T00:00:00.000Z")
}

For relatively small data, one can achieve the above by iterating the collection using a snapshot with the cursor’s forEach() method and updating each document as follows:

db.test.find({
    "salary": { "$exists": true, "$type": 2 },
    "dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    db.test.updateOne(
        { "_id": doc._id },
        { "$set": { "salary": newSalary, "dob": newDob } }
    );
});

Whilst this is optimal for small collections, performance with large collections is greatly reduced since looping through a large dataset and sending each update operation per request to the server incurs a computational penalty.

The Bulk() API comes to the rescue and greatly improves performance since write operations are sent to the server only once in bulk. Efficiency is achieved since the method does not send every write request to the server (as with the current update statement within the forEach() loop) but just once in every 1000 requests, thus making updates more efficient and quicker than currently is.

Using the same concept above with the forEach() loop to create the batches, we can update the collection in bulk as follows. In this demonstration the Bulk() API available in MongoDB versions >= 2.6and < 3.2 uses the initializeUnorderedBulkOp() method to execute in parallel, as well as in a nondeterministic order, the write operations in the batches.

It updates all the documents in the clients collection by changing the salary and dob fields to numerical and datetime values respectively:

var bulk = db.test.initializeUnorderedBulkOp(),
    counter = 0; // counter to keep track of the batch update size

db.test.find({
    "salary": { "$exists": true, "$type": 2 },
    "dob": { "$exists": true, "$type": 2 }
}).snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    bulk.find({ "_id": doc._id }).updateOne({ 
        "$set": { "salary": newSalary, "dob": newDob }
    });

    counter++; // increment counter
    if (counter % 1000 == 0) {
        bulk.execute(); // Execute per 1000 operations 
        // and re-initialize every 1000 update statements
        bulk = db.test.initializeUnorderedBulkOp();
    }
});

The next example applies to the new MongoDB version 3.2 which has since deprecated the Bulk() API and provided a newer set of apis using bulkWrite().

It uses the same cursors as above but creates the arrays with the bulk operations using the same forEach() cursor method to push each bulk write document to the array. Because write commands can accept no more than 1000 operations, there’s need to group operations to have at most 1000 operations and re-intialise the array when the loop hits the 1000 iteration:

var cursor = db.test.find({
        "salary": { "$exists": true, "$type": 2 },
        "dob": { "$exists": true, "$type": 2 }
    }),
    bulkUpdateOps = [];

cursor.snapshot().forEach(function(doc){ 
    var newSalary = parseInt(doc.salary),
        newDob = new ISODate(doc.dob);
    bulkUpdateOps.push({ 
        "updateOne": {
            "filter": { "_id": doc._id },
            "update": { "$set": { "salary": newSalary, "dob": newDob } }
         }
    });

    if (bulkUpdateOps.length === 1000) {
        db.test.bulkWrite(bulkUpdateOps);
        bulkUpdateOps = [];
    }
});         

if (bulkUpdateOps.length > 0) { db.test.bulkWrite(bulkUpdateOps); }

Getting Counts of Distinct Key Values

The following example shows how to get results of an aggregate operation where the counts of distinct key values are in the format { key: count }.

Suppose you have a test collection with the sample documents:

db.test.insert([
    { "_id": 1, "name" : "t1", "loglevel" : "ERROR" },
    { "_id": 2, "name" : "t1", "loglevel" : "ERROR" },
    { "_id": 3, "name" : "t1", "loglevel" : "WARNING" },
    { "_id": 4, "name" : "t1", "loglevel" : "WARNING" },
    { "_id": 5, "name" : "t1", "loglevel" : "WARNING" },
    { "_id": 6, "name" : "t1", "loglevel" : "INFO" },
    { "_id": 7, "name" : "t2", "loglevel" : "INFO" },
    { "_id": 8, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 9, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 10, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 11, "name" : "t2", "loglevel" : "ERROR" },
    { "_id": 12, "name" : "t3", "loglevel" : "INFO" }
])

and would like to aggregate the collection to return the following result

/* 1 */
{
    "_id" : "t1",
    "error" : 2,
    "warning" : 3,
    "info" : 1
}

/* 2 */
{
    "_id" : "t2",
    "error" : 4,
    "warning" : 0,
    "info" : 1
}

/* 3 */
{
    "_id" : "t3",
    "error" : 0,
    "warning" : 0,
    "info" : 1
}

Accomplishing the above would require using the $cond operator in the $sum accumulator operator. The $cond operator will evaluate a logical condition based on its first argument (if) and then returns the second argument where the evaluation is true (then) or the third argument where false (else). This converts the true/false logic into 1 and 0 numerical values that feed into $sum respectively:

{
    "$sum": {
        "$cond": [ { "$eq": [ "$loglevel", "WARNING" ] }, 1, 0 ]
    }
}

As a resulting pipeline, one needs to run the aggregation operation

db.test.aggregate([
    {
        "$group": {
            "_id": "$name",
            "error": {
                "$sum": {
                   "$cond": [ { "$eq": [ "$loglevel",  "ERROR" ] }, 1, 0]
                }
            },
            "warning":{
                "$sum": {
                   "$cond": [ { "$eq": [ "$loglevel", "WARNING" ] }, 1, 0 ]
                }
            },
            "info": {
                "$sum": {
                   "$cond": [ { "$eq": [ "$loglevel",  "INFO" ] }, 1, 0 ]
                }
            }
        }
    }
])

The above pipeline can also be dynamically generated, given an array of possible statuses:

var statuses = ["ERROR", "WARNING", "INFO"],
    groupOperator = { "$group": { "_id": "$name" } };

statuses.forEach(function (status){
    groupOperator["$group"][status.toLowerCase()] = {
       "$sum": {
           "$cond": [ { "$eq": [ "$loglevel",  status ] }, 1, 0]
       }
   }
});

db.test.aggregate([groupOperator]);

If the possible key values are not known in advance, an initial step which queries for this list is necessary by running the distinct command on the loglevel field. This will give you an object that contains a list of the distinct roles:

var result = db.runCommand ( { distinct: "test", key: "loglevel" } )
var statuses = result.values;
printjson(statuses); // this will print ["ERROR", "WARNING", "INFO"]

Now given the list above, you can assemble your pipeline by creating an object that will have its properties set using JavaScript’s reduce() method. The following demonstrates this:


var groupObj = { "$group": { "_id": "$name" } };
var groupPipeline = statuses.reduce(function(obj, status) {  
    obj["$group"][status.toLowerCase()] = {
        "$sum": {
            "$cond": [ { "$eq": [ "$loglevel", status ] }, 1, 0 ]
        }
    };
    return obj;
}, groupObj );

Use the resulting document in the final aggregation pipeline as:

db.test.aggregate([groupPipeline]);

For a more flexible and better performant approach which executes much faster than the above, consider running an alternative pipeline as follows

db.test.aggregate([
    { 
        "$group": {
            "_id": { 
                "name": "$name",
                "status": { "$toLower": "$loglevel" }
            },
            "count": { "$sum": 1 }
        }
    },
    { 
        "$group": {
            "_id": "$_id.name",
            "counts": {
                "$push": {
                    "loglevel": "$_id.status",
                    "count": "$count"
                }
            }
        }
    }
])

Sample Output

/* 1 */
{
    "_id" : "t2",
    "counts" : [ 
        {
            "loglevel" : "error",
            "count" : 4
        }, 
        {
            "loglevel" : "info",
            "count" : 1
        }
    ]
}

/* 2 */
{
    "_id" : "t1",
    "counts" : [ 
        {
            "loglevel" : "error",
            "count" : 2
        }, 
        {
            "loglevel" : "warning",
            "count" : 3
        }, 
        {
            "loglevel" : "info",
            "count" : 1
        }
    ]
}

/* 3 */
{
    "_id" : "t3",
    "counts" : [ 
        {
            "loglevel" : "info",
            "count" : 1
        }
    ]
}