map-reduce

Weighted Average Calculation in MapReduce

Hadoop already is and will likely become an even more major player in the technology landscape for healthcare data analysis as new data producers are integrated into the consumer marketplace. A perfect example of a “new data producer” is a wearable device that streams large quantities of data, typically from an embedded accelerometer and gyroscope. Big data technology is expected to play a big role in analyzing this type of data, and organizations are researching potential applications of big data technology when it is combined with wearable devices. Intel and the Michael J. Fox Foundation for Parkinson’s research made headlines when they announced their efforts to combine big data technologies and wearable devices in order to analyze Parkinson’s symptoms. The NIH is also funding research into wearable devices through a $536,000 grant that was awarded to UMass professor Benjamin Marlin, whose lab specializes in machine learning and the analysis of big data. As such, it seems reasonable that IT professionals should be learning about Hadoop and the MapReduce framework, as many interesting projects will likely revolve around this technology in the future.

Hadoop allows us to process and analyze data that is stored in files without first loading data into database structures. Any professional programmer will tell you that this is not new (files have long been sources of input to their programs) but Hadoop combines this file-based approach with parallelism and distributed processing, allowing processes to scale up without exponentially increasing processing time. Using this alternative processing paradigm, Hadoop avoids loading data into databases and passing data around between database structures. It also allows us to scale up our processing capacity by adding machines to the Hadoop “cluster”. Both of these benefits can buy a data processing shop time when processing large quantities of data.

But the downside to Hadoop is its relative complexity. In an IT landscape that has been dominated by relational databases, SQL and PL/SQL have become a standard skill-set for data processing. While PL/SQL is a robust programming language with many features (e.g. object-orientation), PL/SQL and SQL are both very different skills from Java programming and unique skills associated with the Hadoop framework. This has produced a skills gap within the industry, allowing Hadoop specialists to command very high salaries. But it has also led many companies to “spin their wheels” trying to implement complex Hadoop architectures and processes. This post, which explores how to implement a relatively simple SQL query in Hadoop, may help highlight the differences in skill-sets required to work in Hadoop over traditional SQL-based database technologies.

In another post, I wrote about a simple SQL query for calculating weighted average charges from the Medicare’s provider and utilization data by provider type. The provider utilization and charges dataset contains real Medicare billings and is clean and well-prepared. However, it is already aggregated at some level to protect patient confidentiality, so in order to calculate averages we need to calculate weighted averages.

For example, the SQL query we used in the last post was:

SELECT	PROVIDER_TYPE, 
	SUM(LINE_SERVICE_COUNT), 
	SUM(LINE_SERVICE_COUNT * AVERAGE_ALLOWED_AMOUNT) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_ALLOWED_AMOUNT,
	SUM(LINE_SERVICE_COUNT * AVERAGE_SUBMITTED_CHARGE) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_SUBMITTED_CHARGE,
	SUM(LINE_SERVICE_COUNT * AVERAGE_PAYMENT_AMOUNT) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_PAYMENT_AMOUNT
FROM UTILIZATION_AND_CHARGES
WHERE HCPCS_CODE = 93454
GROUP BY PROVIDER_TYPE

We will spend the remainder of this post performing the same calculation using Hadoop’s MapReduce framework.

It’s important to understand some basic concepts about MapReduce, which form the foundation of the MapReduce way-of-thinking. “Hadoop, the Definitive Guide” has a great section that clearly articulates the inner-workings of the MapReduce framework. Chapter two in this book is invaluable for gaining a basic understanding of the process.

Essentially, each program consists of a map phase and a reduce phase. Each phase accepts input and writes output, and the programmer determines the types of input and output data. Typically, the program starts by reading data from a file. This file becomes input to the map phase, which processes the file line by line. When processing lines from the file, the goal of the map phase is to generate a series of key-value pairs that will be used in the final calculation by the reduce phase of the program. Suppose, for example, that you want to perform some type of aggregate calculation. In that case, the key would be the value that you would use in your GROUP BY clause in a typical SQL statement and the values would be the data points that are being fed into the function to get the average.

Let’s use an example from the provider charges and utilization file to illustrate first how this key-value concept works before we actually write a MapReduce program. Here are the first 10 records of the Medicare provider utilization and charges file, just printing the 14th, 19th, and 23rd columns. These columns are provider type, average submitted charge, and line service count by provider/HCPCS code.

Internal Medicine 115 135.25
Internal Medicine 93 198.59
Internal Medicine 111 38.75
Internal Medicine 544 70.95
Internal Medicine 75 101.74
Internal Medicine 95 71.06
Internal Medicine 191 105.01
Pathology 226 11.64

We have two unique provider types in this example (“Internal Medicine” and “Pathology”), and these will serve as the keys that are included in the output from our map function. In our case, we want to calculate a weighted average, which will be the sum of the products of line service count and average submitted charges for each provider divided by the sum of line service counts across all of the providers of the same type. Therefore, for each record in the input, we need to output the provider type as a key, along with a “tuple” containing the submitted charge and the line service count, which will allow us to do the calculations in the reduce phase.

To illustrate, the map function will first produce the following output for these first 10 records:

(Internal Medicine, (115, 135.25))
(Internal Medicine, (93, 198.59))
(Internal Medicine, (111, 38.75))
(Internal Medicine, (544, 70.95))
(Internal Medicine, (75, 101.74))
(Internal Medicine, (95, 71.06))
(Internal Medicine, (191, 105.01))
(Pathology, (226, 11.64))

It will then process the output of the map function prior to sending output to the reduce function. The output of the map function thus becomes:

(Internal Medicine, [(115, 135.25), (93, 198.59), (111, 38.75), (544, 70.95), (75, 101.74), (95, 71.06), (191, 105.01)])
(Pathology, (226, 11.64))

The reduce function then iterates over the array for each key, allowing you to do your calculations for each key value. The output of the reduce process then consists of the key, along with whatever calculated value you’ve chosen to print. In our specific case, this calculation consists of the weighted average charges calculation.

Let’s see what this looks like in practice…

To begin, we create a class that will serve as our mapper. In our case, the class is “WeightedChargesMapper”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Java Document
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeightedAverageChargesMapper extends Mapper<Object, Text, Text, TotalChargeTotalServiceTuple> {
	private Text grouper = new Text();
	private TotalChargeTotalServiceTuple outTuple = new TotalChargeTotalServiceTuple();
	
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		String[] fields = value.toString().split("\t");
		String grouperVal = fields[13];
		outTuple.setTotalCharges(Float.parseFloat(fields[18])*Float.parseFloat(fields[21]));
		outTuple.setTotalServices(Float.parseFloat(fields[18]));
		outTuple.setCount(1);
		grouper.set(grouperVal);
		context.write(grouper, outTuple);
	}
}

This code takes care of the preliminary mapping phase of our program. First, we process the input text file line by line, splitting each line on the tab control character (\t). We assign the value in the 13th position of the resulting array (i.e. the 13th field in the file) to grouperVal, which we then assign to grouper, which is a Text object. We then assign the output variable, which is a custom class created for this purpose (more details on that below). The output variable in outTuple becomes the value portion of the key-value pairs that result from the mapping phase. Remember, in this case the first portion of the mapping process is to produce key-value pairs of the form (Internal Medicine, (191, 105.01)), where the first value in this tuple is held in grouper and the second value is held in outTuple. It’s important to note the classes that are used in the extends portion of the above code, as the last two classes must be the same as the output classes that get written to the context, which we can see in the last line of the map method above. The last line of the method writes the resulting key value pair to the context of the program execution, which will allow our map output to be consumed by the reducer.

We mentioned that in order to write a tuple as the value in the key-value output of the map phase, we needed to create a custom class, which we then used in the map phase. The code for this custom class is below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Java Document
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.io.DataOutput;
import java.io.DataInput;


public class TotalChargeTotalServiceTuple implements Writable {
	private float totalCharges;
	private float totalServices;
	private float count = 0;
	
	public float getTotalCharges() {
		return totalCharges;
	}
	
	public void setTotalCharges(float totalCharges) {
		this.totalCharges = totalCharges;
	}
	
	public float getTotalServices() {
		return totalServices;
	}
	
	public void setTotalServices(float totalServices) {
		this.totalServices = totalServices;
	}
	
	public float getCount() {
		return count;
	}	
	
	public void setCount(float count) {
		this.count = count;
	}
	
	public void readFields(DataInput in) throws IOException {
		totalCharges = in.readFloat();
		totalServices = in.readFloat();
		count = in.readFloat();
	}
	
	public void write(DataOutput out) throws IOException {
		out.writeFloat(totalCharges);
		out.writeFloat(totalServices);
		out.writeFloat(count);
	}
	
	public String toString() {
		return totalCharges + "\t" + totalServices + "\t" + count;
	}
}

This is a fairly simple class with methods for getting and setting the charges values from the provider charges and utilization file. The important methods to note here are the readFields and write methods, which are methods of Writable that are used internally to serialize and de-serialize this object when writing it as output for the map phase, and when reading it as input for the reduce phase.

Now that we have a mapper, we need to write a reducer. The reducer will read the input from the map phase and will process each tuple in its input one at a time, allowing us to calculate values for each key and write the reducer’s output to the context.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Java Document
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class WeightedAverageChargesReducer extends Reducer<Text, TotalChargeTotalServiceTuple, Text, FloatWritable> {
	private float weightedAverage;
	private FloatWritable result = new FloatWritable();
	
	public void reduce(Text key, Iterable<TotalChargeTotalServiceTuple> values, Context context) throws IOException, InterruptedException {
		float sum = 0;
		float sumCharges = 0;
		float sumServices = 0;
		
		for (TotalChargeTotalServiceTuple val : values) {
			sum += val.getCount();
			sumCharges += val.getTotalCharges();
			sumServices += val.getTotalServices();
		}
		weightedAverage = (sumCharges / sumServices);
		result.set(weightedAverage);
		
		context.write(key, result);
	}
}

Note that the first two parameters combined represent the key/value pair that was written to the context by the mapper, and the second two parameters are the values that are written to the context by the reducer. The types of objects used for these parameters must match the types that are produced and consumed by the reducer and the mapper. Also note that the reducer is processing each key/value pair in the result set from the mapper one at a time. This is similar to how one might process values stored in an array in any other programming language by iterating over the array, however, the actual iteration is all done behind the scenes by the MapReduce framework.

The general logic of the reducer is fairly simple. For each key, we sum up the product of total charges and line service count, as well as the line service count. After iterating over each value in the array of tuples, we divide the sum of charges by the sum of the line service count. We then write the key (provider type) and the value (weighted average charges) to the context.

The final step in writing a map reduce program is to write the class that will set up all the job-level parameters. This is what actually manages the job that we are running.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Java Document
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WeightedAverageCharge {
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: MaxSubmittedCharge <input path> <output path>");
			System.exit(-1);
		}
		
		Job job = new Job();
		job.setJarByClass(WeightedAverageCharge.class);
		job.setJobName("Weighted Average Charges");
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setMapperClass(WeightedAverageChargesMapper.class);
		job.setReducerClass(WeightedAverageChargesReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TotalChargeTotalServiceTuple.class);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

This class sets up the job-level attributes that are required to execute the job. Some interesting items to note are the line to set the path to the file that will be processed as input to the mapper (FileInputFormat.addInputPath(job, new Path(args[0]));) as well as the code that assigns the output path, which is where the reducer will write its output (FileOutputFormat.setOutputPath(job, new Path(args[1]));) on the HDFS (Hadoop Distributed File System). Finally, note the lines that set the mapper class, the reducer class, the output key class, and the output value class.

Finally we can compile all of our classes and run them using the taskrunner. Setting up your environment can be a bit of a pain, but once you have everything up and running, start up the Hadoop services, compile the classes, and run the program!

1
2
3
4
5
6
7
8
9
10
kelder86$ ./bin/start-all.sh
kelder86$ javac WeightedAverageCharge.java TotalChargeTotalServiceTuple.java WeightedAverageChargesMapper.java WeightedAverageChargesReducer.java
kelder86$ jar cvf WeightedAvgCharge.jar WeightedAverageCharge*class TotalChargeTotalServiceTuple.class
kelder86$ hadoop jar WeightedAvgCharge.jar WeightedAverageCharge ./input/physician_charges_10_lines_no_header.txt output12
kelder86$ hadoop fs -ls /user/kelder86/output12-rw-r--r--   1 kelder86 supergroup          0 2014-11-25 11:55 /user/kelder86/output12/_SUCCESS
drwxr-xr-x   - kelder86 supergroup          0 2014-11-25 11:54 /user/kelder86/output12/_logs
-rw-r--r--   1 kelder86 supergroup         43 2014-11-25 11:55 /user/kelder86/output12/part-r-00000
kelder86$ hadoop fs -cat /user/kelder86/output12/part-r-00000
Internal Medicine	90.97939
Pathology	11.64