test driving Amazon Web Services’ Elastic MapReduce

Hadoop provides software infrastructure for running MapReduce tasks, but it requires substantial setup time and availability of a compute cluster to take full advantage of. Amazon’s Elastic MapReduce (EMR) solves these problems; delivering pre-configured Hadoop virtual machines running on the cloud for only the time they are required, and billing only for the computation minutes used. Here I test drive the EMR by demonstrating its use on a Hadoop computation I previously wrote about.

Computation Overview

In February of 2012, I wrote about computation of each U.S. county’s “industrial diversity” index from the U.S. Census Bureau’s County Business Patterns (CBP) data (see http://badassdatascience.com/2012/02/21/industry-diversity/). The CBP dataset provides, for each NAICS industry code, the number of business establishments operating in each county. Using an equation similar to that used by ecologists to study species diversity in a region, we can derive industrial diversity for each county using the number of NAICS codes and establishments as input.

At its core the diversity equation is Shannon’s information entropy equation, and the per county computation is well suited to the MapReduce framework. I describe an adaption of the computation to Hadoop here, where I ran the calculation with a Hadoop instance configured for single node use (basically I ran it on my desktop). In this post I expand from running the computation on a single node to a cluster using EMR.

Source Code

First, I had to change the code from my previous post to reflect updates in Hadoop. The new map step code is:

import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;

public class MapToEstablishments extends Mapper<LongWritable, Text, Text, DoubleWritable> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

	String line = value.toString().replace("\"", "");
	line = line.trim();
	String[] line_split = line.split(",");

	if (line.contains("fipstate")) { return; }

	String naics = line_split[2];
	Integer total_establishments = Integer.parseInt(line_split[10]);
	String fipstate = line_split[0];
	String fipcounty = line_split[1];

	if (naics.contains("-")) { return;  }
	if (naics.contains("/")) { return;  }

	DoubleWritable total_establishments_writable = new DoubleWritable(total_establishments);
	context.write(new Text(fipstate + "_" + fipcounty), total_establishments_writable);
    }
}

The map step works by selecting the state/county FIPS numbers as the keys, and the business establishment counts for each NAICS code as the values. We do not need to save the NAICS codes themselves. When the map step is complete each state/county FIPS key will have multiple values, one for each establishment count.

The new reduce step code is:

import java.io.IOException;
import java.lang.InterruptedException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.util.ArrayList;

public class ReduceToFindIndustrialDiversity extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {

    public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

	ArrayList establishment_counts = new ArrayList();
	for (DoubleWritable val : values) {
	    establishment_counts.add(val.get());
	}

        // I'm sure there is a better way to do this:
        double sum = 0.0;
        for (double i : establishment_counts) { sum += i; }

        // I'm sure there is a better way to do this too:
        ArrayList probabilities = new ArrayList();
        for (double i : establishment_counts) { probabilities.add( i / sum ); }

        // Shannon entropy calculation
        double H = 0.0;
        for (double p : probabilities) {
	    H += p * Math.log(p) / Math.log(2.0);
        }
        H = H * -1.0;

	context.write(key, new DoubleWritable(H));
    }
}

equation

The reduce step works by calculating the Shannon entropy (equation above) across all the establishment count values per key, producing a new key/value pair consisting of state/county FIPS numbers as the key, and the resulting Shannon entropy as the value. By the end of the reduce step each state/county FIPS key will have only one value.

Finally, the new pilot code is:

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class IndustrialDiversity {

	public static void main(String[] args) throws Exception {

	    Configuration conf = new Configuration();
	    Job job = new Job(conf, "IndustrialDiversity");

	    job.setJarByClass(IndustrialDiversity.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(DoubleWritable.class);

	    job.setMapOutputKeyClass(Text.class);
	    job.setMapOutputValueClass(DoubleWritable.class);

	    job.setMapperClass(MapToEstablishments.class);
	    job.setReducerClass(ReduceToFindIndustrialDiversity.class);

	    job.setInputFormatClass(TextInputFormat.class);
	    job.setOutputFormatClass(TextOutputFormat.class);

	    FileInputFormat.addInputPath(job, new Path(args[0]));
	    FileOutputFormat.setOutputPath(job, new Path(args[1]));

	    job.waitForCompletion(true);
	}
}

This code contains the “main” function used to configure and run the analysis.

Uploading the Code and Data to S3

I next compiled these three classes into a JAR file called IndustrialDiversity.jar and uploaded the JAR file to Amazon Web Services’ S3 under the “code” folder of my bucket:

uploading_code_to_S3

Then I uploaded the 2011 County Business Patterns data file to S3 into the “input” folder of my bucket:

uploading_data_to_S3

Starting the Elastic MapReduce Cluster

At the Elastic MapReduce page available from the Amazon Web Services management console we see a list of previous clusters:

01_cluster_list__EMR

From here we press the “Create cluster” and get a form indicating options. Note that I set the log folder to point to my bucket on S3 in the image below:

02_create_cluster__EMR

At the bottom of the form we press the “Create cluster” button:

03_create_cluster_bottom__EMR

Next we see the new cluster page showing that it is starting up:

04_cluster_starting__EMR

Once the cluster is up, three configuration steps run:

05_cluster_starting_STEP_EMR

Finally, the cluster startup is complete:

06__cluster_startup_complete__EMR

Running the Hadoop Job

We run the Hadoop job by pressing the “Add step” button to add a new step, configuring it to run the JAR file we uploaded previously. Note that the class containing the “main” function is the first argument:

07_add_step_EMR

The step immediately begins running once “Add” is pressed:

08_running_new_step__EMR

Finally, the step completes:

09_new_step_completed_EMR

Output Files

The output files may now be viewed on S3:

10__output_file_list_EMR

Looking at one of the results files in particular shows per county industrial diversity indices:

11_results_EMR

Terminating the Cluster

We now terminate the cluster:

12_terminating_cluster_EMR

The process takes a bit, but finally completes:

13__terminated_cluster_EMR

Cost

This procedure cost $1.36 USD.

One thought on “test driving Amazon Web Services’ Elastic MapReduce

Leave a Reply

Your email address will not be published.