MapReduce on Python is better with MRJob and EMR

Yelp’s MRJob is a fantastic way of interfacing with Hadoop MapReduce in Python. It has built-in support for many options of running Hadoop jobs — AWS’s EMR, GCP’s Dataproc, local execution, and normal Hadoop.

The cool thing about MRJob is that you can write and test your MapReduce jobs locally, and then just add the -r hadoop flag to ship your job to Hadoop (on a local cluster). It’ll even copy input data to HDFS for you, if necessary.

For a bit of context, the previous “state-of-the-art” of running Hadoop jobs on Python is described best in Michael Noll’s “Writing an Hadoop MapReduce Program in Python”. In essence, you write 2 Python scripts: one that handles the map phase and another than handles the reduce phase.

Unfortunately, in doing this you give up many of the niceties of the MapReduce framework, and you end up writing code that looks like this:

# mapper.py
import sys
for line in sys.stdin:
    line = line.strip()
    words = line.split()
    for word in words:
        print '\t'.join(word, '1')

current_word = None
current_count = 0
word = None

# reducer.py
for line in sys.stdin:
    line = line.strip()

    word, count = line.split('\t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if current_word == word:
        current_count += count
    else:
        if current_word:
            print('{}\t{}'.format(current_word, current_count))
        current_count = count
        current_word = word

if current_word == word:
    print('{}\t{}'.format(current_word, current_count))

(from Michael Noll’s article)

In contrast, with MRJob you write your jobs as classes and you don’t have to worry (as much) about the actual “streaming” nature of your job. MRJob handles key/value serialization/deserialization. It also comes with a variety of useful wrappers that allow for submitting jobs in a much cleaner than with the STDIN/STDOUT based approach.

That same code chunk compresses to this:

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")


class MRWordFreqCount(MRJob):

    def mapper(self, _, line):
        for word in WORD_RE.findall(line):
            yield word.lower(), 1

    def reducer(self, word, counts):
        yield word, sum(counts)

if __name__ == '__main__':
    MRWordFreqCount.run()

(from the MRJob documentation)

For one of the research projects I’m working on, we needed to run Hadoop Streaming jobs on a persistent EMR cluster. For various reasons (mostly due to how users would submit jobs) we needed to use MRJob’s default Hadoop runner, instead of its dedicated EMR runner (which can spin up ephemeral clusters, neat!)

Mostly, everything *JustWorked*™. EMR is good about placing Hadoop “stuff” in the places that most libraries expect. However, there are a couple tweaks I found necessary to get it to work on: With the latest version the Amazon EMR AMI (emr-5.11.1) and the latest version of MRJob (v0.6.1), jobs would consistently fail due to issues with the internal bash scripts that MRJob uses to wrap the streaming job:

/bin/sh: run_prestart: line 1: syntax error: unexpected end of file

I traced down the issue to a couple Github issues12. Seemingly, this issue was already fixed, but for whatever reason I was still encountering this bug on the stock EMR AMI.

Fortunately, there’s a fix. Adding this configuration to your MRJob config (either at ~/.mrjob.conf or /etc/mrjob.conf) avoids the bug:

runners:
  hadoop:
    setup:
      - 'set -e'
    sh_bin: '/bin/bash -x'

Et voilà! MRJob now successfully runs Hadoop Streaming jobs using Python on AWS EMR.

If you’re interested in learning more about MRJob / MapReduce, the MRJob documentation really is an amazing resource.