Table of Contents
Tutorial Notes on Web-Scale Information Analytics

MapReduce Using Hadoop Streaming

MapReduce Using Hadoop Streaming

After this tutorial, student can script in Python and know where to find documentation/ support. Student will also write their first MapReduce program in Python using Hadoop streaming.

NOTE: We are using hadoop-1.0.3 in this tutorial. We assume your environment follows the setup in Tutorial 2. If not, change path, options, and names where necessary.

Python 101

You should have Python 2.7 installed by default. If not, try sudo apt-get install python.

Basic interactive environment and notation

This is a REPL.

azureuser@test-hpl:/opt$ python
Python 2.7.3 (default, Sep 26 2013, 20:03:06) 
[GCC 4.6.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> print 'hello world'
hello world
>>> quit()
azureuser@test-hpl:/opt$

Explained:

Following subsections should be straightforward. Just type and watch. You'll figure out what they mean.

EXERCISE: Exercise the official Python tutorial: http://docs.python.org/2/tutorial/index.html

Simple arithmetics

>>> 1+5
6
>>> 3 * 4
12
>>> 3 / 2
1
>>> 3 // 2
1
>>> 3.0 / 2
1.5
>>> 3.0 // 2
1.0
>>> 3 % 2
1
>>> 3.0 % 2
1.0

More

String manipulation

>>> 'a' + "b"
'ab'
>>> 'a' * 5
'aaaaa'
>>> ' aaaa '
' aaaa '
>>> ' aaaa '.strip()
'aaaa'
>>> 'a b c d'.split()
['a', 'b', 'c', 'd']
>>> '%d,%.2f,%s' % (1, 1.12345, 'hello')
'1,1.12,hello'

More

List

>>> range(1,5)
[1, 2, 3, 4]
>>> [1] + [2, 3, 4]
[1, 2, 3, 4]
>>> l = [1, 2, 'a', 'b']
>>> l
[1, 2, 'a', 'b']
>>> len(l)
4
>>> l[0]
1
>>> l[-1]
'b'
>>> l[0:3]
[1, 2, 'a']
>>> l[:3]
[1, 2, 'a']
>>> l[1:]
[2, 'a', 'b']

More

Dictionary

>>> d = {'k1': 'v1'}
>>> d
{'k1': 'v1'}
>>> d['k2'] = 'v2'
>>> d
{'k2': 'v2', 'k1': 'v1'}
>>> d.update({'k1': 1, 'k3': 3})
>>> d
{'k3': 3, 'k2': 'v2', 'k1': 1}

More

Set

>>> a = set([1,2,3])
>>> b = set([2,3,4])
>>> a
set([1, 2, 3])
>>> b
set([2, 3, 4])
>>> a & b
set([2, 3])
>>> a - b
set([1])
>>> b - a
set([4])
>>> a | b
set([1, 2, 3, 4])

More

Functions

>>> sum([1,2,3])
6
>>> ord('A')
65
>>> chr(66)
'B'
>>> int('1')
1
>>> str(1)
'1'

More

Control flow

Create control.py:

for i in range(0, 5):
    if i < 3:
        print i, "is smaller than 3"
    elif i == 3:
        print i, "is 3"
    else:
        print i, "is greater than 3"

Execute:

$python control.py 
0 is smaller than 3
1 is smaller than 3
2 is smaller than 3
3 is 3
4 is greater than 3

More

The schema of learning Python

Find help:

>>> help(sum)

Help on built-in function sum in module __builtin__:

sum(...)
    sum(sequence[, start]) -> value

    Returns the sum of a sequence of numbers (NOT strings) plus the value
    of parameter 'start' (which defaults to 0).  When the sequence is
    empty, returns start.

Everything in Python is object. You can not only call help on functions, but also anything else:

>>> help('string')
...
    split(s, sep=None, maxsplit=-1)
        split(s [,sep [,maxsplit]]) -> list of strings

        Return a list of the words in the string s, using sep as the
        delimiter string.  If maxsplit is given, splits at no more than
        maxsplit places (resulting in at most maxsplit+1 words).  If sep
        is not specified or is None, any whitespace string is a separator.
...

Or, official doc: http://docs.python.org/2/

Or, Google, Stackoverflow, ...

Learn by write; learn by modify; learn by read.

iPython, bPython and IDEs (optional)

The default python interpreter only provides a basic interactive shell. To make your work more efficient, you can try:

sudo apt-get install ipython
sudo apt-get install bpython

My use case:

Except for general IDEs, you may consider those tailored for Python:

Environment Preparation

Export path variables upon login

Put the following in the end of ~/.bashrc:

export HADOOP_PREFIX=/opt/hadoop
export JAVA_HOME=/usr/lib/jvm/java-6-openjdk-amd64
export PATH=$PATH:$HADOOP_PREFIX/bin
export HADOOP_CONF_DIR=$HADOOP_PREFIX/conf

After this, you don't have to type the full path of hadoop executable anymore.

After you finish editing .bashrc file, use one of the following ways to apply the changes:

Port mapping

Login your machine as follows:

ssh [email protected] -L50030:localhost:50030

-L50030:localhost:50030 tells SSH to do a port from your server to your local desktop. 50030 is the default job traker port we will see later.

If you are using Putty, try to find the configuration options for port mapping, as follows:

Check whether port mapping works by visiting: http://localhost:50030/. If your single-node Hadoop is running, you should be able to see job information from the web UI.

MapReduce for Word Counting

Hadoop streaming

Basic counting

Check whether you have the Shakespeare data:

azureuser@test-hpl:~$ hadoop dfs -ls /user/azureuser/input
Found 1 items
-rw-r--r--   3 azureuser supergroup    6460232 2014-01-21 11:59 /user/azureuser/input/bigfile

We use the combined bigfile, because it works faster under our current settings.

Write your mapper.py:

#!/usr/bin/env python

import sys

for line in sys.stdin:
    for word in line.split():
        print '%s\t%s' % (word, 1)

Write your reducer.py:

#!/usr/bin/env python

import sys

cur_key = None
cur_count = 0

for line in sys.stdin:
    key, value = line.split()
    if key == cur_key:
        cur_count += int(value)
    else:
        if cur_key:
            print '%s\t%s' % (cur_key, cur_count)
        cur_key = key
        cur_count = int(value)

print '%s\t%s' % (cur_key, cur_count)

Submit job:

$hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.0.3.jar -mapper mapper.py -reducer reducer.py -file mapper.py -file reducer.py -input input -output output

The parameters -mapper, -reducer, -input, -output are straightforward as their names. The -file parameter specifies those local files to upload. The convention:

The most frequent words?

Although we don't have to bother Hadoop on this simple question, we use Hadoop this time for demonstration purpose.

Our output from last job:

$hadoop dfs -cat /user/azureuser/output/part-00000 | head -n 5
"A  1
"B  1
"C  1
"D  1
"E  1

Since Hadoop has built-in sorting function, we just let it sort second column for us. How to sort second column? We'll see later how to do it in a simpler way. Now let's assume we know nothing more than the previous Hadoop streaming command. Try to leverage the feature of Hadoop.

Make swap.py:

#!/usr/bin/env python

import sys

for line in sys.stdin:
    word, count = line.split()
    print '%07d\t%s' % (int(count), word)

Launch Hadoop job:

$hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.0.3.jar -mapper swap.py -file swap.py -input output -output output-count-sorted

Explained:

Check the output:

$hadoop dfs -cat /user/azureuser/output-count-sorted/part-00000 | tail -n 5
0019155 of
0019248 and
0020085 I
0025512 the
0030984 *

NOTE: This can cause a lot traffic in practice if you have a really large data set. Don't -cat and tail in real works (and homeworks). If you want to find most frequent words, just add a reducer to the above example and only keep the top results (instead of a full list of sorted items).

Sort reduce output (optional)

We noted above that there is an easier way to sort by count.

$hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.0.3.jar -D stream.num.map.output.key.fields=2 -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options=-k2,2nr -mapper cat -reducer cat -input output -output output-count-sorted4

Check result:

$hadoop dfs -cat /user/azureuser/output-count-sorted4/part-00000 | head -n 5
* 30984 
the 25512 
I 20085 
and 19248 
of  19155

NOTE: You don't have to run word count and sort as two jobs. Note that we used dummy mapper and reducer (i.e. cat) in this section. You can simply add the Hadoop MapReduce options to our first word counting example.

Monitor jobs

Find the job tracker: http://localhost:50030/

Navigate around the figure out the meaning yourself. It's useful to:

You can find the same information in $HADOOP_PREFIX/logs/userlogs.

A simple optimization

Instead of emitting word<tab>1 pairs, we can aggregate them in mapper first.

The modified mapper2.py:

#!/usr/bin/env python

import sys
from collections import defaultdict

cache = defaultdict(int)
for line in sys.stdin:
    for word in line.split():
        cache[word] += 1

for (word, count) in cache.iteritems():
    print '%s\t%s' % (word, count)

Run:

$hadoop jar $HADOOP_PREFIX/contrib/streaming/hadoop-streaming-1.0.3.jar -mapper mapper2.py -reducer reducer.py -file mapper2.py -file reducer.py -input input -output output2

The improvement is not so significant. On my machine, it reduces from 42s to 38s. This is because our input data is too small.

EXERCISE: Upload another 3 copies of the bigfile you created in last tutorial. Compare the performance of mapper.py and mapper2.py (example difference 1min v.s. 48s). How about running it on the original ~200 small files?

General take-away:

Reference

Outcome of This Tutorial


Download the archive of codes used in this tutorial: example.tar.gz.

comments powered by Disqus
▶ Back ▲ Top