Category Archives: Data Analysis

The Mechanics of a Decision Tree

It has long been a goal of mine to learn more about machine learning techniques. Machine learning encapsulates a host of interesting solutions to common problems. If you work in tech and are interested in developing a deeper knowledge of computer science topics, statistics/probability, and general programming skills, then a machine learning side-project can be a fun way to achieve your goals.

One of the machine learning methods used in predictive statistics is the “learning decision tree”. A decision tree is used to predict an outcome from a set of observations based on the set of values present in each observation. A decision tree is much like that little pinball-like machine you may have seen on “The Price is Right”, however, while the path of the little disk in that show was entirely random, the path formed by a decision tree is not random, it is formed through an application of statistical probability. A decision tree “learns” the path for each outcome variable from the set of data that it is built from.

In order to understand how this works, one needs to understand two concepts in information science. These concepts are information gain, and entropy.

Entropy is essentially what it sounds like… randomness. When we apply the concept of entropy to information, or data, we are talking about the organization of the data. More specifically, entropy is a way of quantifying the likelihood that one will select the correct value of a random variable, in other words, entropy is the probability that a given outcome will be true in a set of data. A set of data has low entropy if the chance of selecting a random variable in that set is particularly high.

For example, imagine that we have a set of 100 students and we are interesting in selecting a student who majors in political science. Imagine that 50% of the students major in science and 50% of the students major in the humanities. Now imagine that 50% of the students who major in humanities major in political science. The percentage of students who major in political science is therefore 25%. The chance that we will select a student who majors in political science from the total student body is pretty low (25%). The entropy of the entire set of students, therefore, is fairly high. However, if we limit the set of students to just students who study humanities, we can increase our chance of selecting a student who majors in political science significantly (to 50%). We thereby reduce the entropy of the overall set by limiting the set of students to just the students who major in humanities. If you were to repeat this process over and over again, you might begin to see how a decision tree eventually leads us to a prediction, or classification, of an observation, based on the different “splits” that occurred in order to reduce entropy.

The second concept to understand is the concept of information gain. If we think of entropy as the likelihood that we will select the correct value of a random variable, then information gain is the increase in likelihood that we will select the correct value of a random variable following some type of action taken on the data. You can think of information gain as a quantification of the decrease in entropy caused by taking some action on the data. For example, when we reduced the set of students to just the students who majored in humanity, we increased the odds that we would select a student who majored in political science. We therefore “gained information” by reducing the entropy of the set of data. We use the concept of information gain in a decision tree to select the “node”, or the attribute, that we use to make each split in the tree.

To form a decision tree, these two concepts are recursively applied to the set of data. A decision tree recursively reduces the entropy in any given set of observations until it has eventually decreased the entropy to the point where one can be guaranteed to select the proper classification based on the values in the observation. This guarantee (i.e. perfectly “fitting” the data) leads to one of the key problems of decision trees, over-fitting, which can be addressed by reducing the specificity of the tree through a practice called pruning.

For more information on decision trees, particularly the algorithm that we will explore here (called ID3), you can refer to the following link:

http://www.doc.ic.ac.uk/~sgc/teaching/pre2012/v231/lecture11.html

This link does a great job of explaining the ID3 algorithm and is written by Simon Colton of the University of London.

There are several different implementations of decision tree algorithms. Searching will turn up algorithms in R and python’s scikit-learn. While these algorithms are obviously of very high quality, using them does not necessarily teach us how the inner workings of a decision tree operate. For example, the calculations used to determine entropy and information gain, as well as each node in the decision tree are all hidden behind the scenes. To learn more about the algorithm, I wrote a python class that allows me to iteratively build a decision tree. The class does not implement the recursive element of the algorithm (I hope to update it at a later point to include this functionality), however, it does allow us to think through the process of building a decision tree, including entropy and information gain.

I’ve transcribed the algorithm from Simon Colton’s website below:

1) Choose the attribute of the full set which has the highest information gain, and set this as the root node
2) For each value that the attribute can take, draw a branch from the root node
3) For each branch, calculate the set of observations for that branch (i.e. limiting the set to just values that satisfy the criterion of the branch
3A) If the set is empty, choose the default category, which will be the majority of observations from the total set
3B) If the set only contains observations from a single category, then set a leaf node and make the value of that leaf node the category
3C) Otherwise, remove the attribute from the set of attributes that are candidates to be a new node. Then determine which attribute scores the highest on information gain with respect to the set calculated in step 3. The new node will then start the cycle again from step 2.

While a full blown implementation of this algorithm would implement the recursion to fully build out the tree, I’ve opted to take a simpler approach and simply implement several methods that allow me to step through the algorithm in “interactive mode”, so to speak.

I’ve decided to use a dataset from UCI’s machine learning repository. The dataset contains data on 470 surgeries that occurred on patients suffering from lung cancer. The outcome variable is whether or not the patient dies within one year of receiving the surgery (‘Risk1Y’) and the variables are various attributes of the patient and their illness, including age, diagnosis, tumor size, the outcomes of various pulmonary function tests, whether the patient smokes, whether they experienced a cough before surgery, and more. The full dataset can be found here:

https://archive.ics.uci.edu/ml/datasets/Thoracic+Surgery+Data

The first step in the algorithm is to calculate the entropy of the entire set of data with respect to the outcome variable (‘Risk1Y’). Since we are using a binary classification (the patient either died or lived), we can use the formula to calculate entropy for a binary classification:

Entropy(s) = -p_{+} log_2(p_{+})-p_{-}log_2(p_{-})

Where p_{+} is the proportion of outcomes for which the value is positive (i.e. the classification is true) and p_{-} is the proportion of outcomes for which the value is negative (i.e. the classification is false).

To do this using the python class:

1
2
3
4
5
6
7
8
learningTree = InteractiveDecisionTree("/Users/kelder86/decision_tree/thoracic_surgery.csv", "Risk1Y")
learningTree.setFieldNames(['Diagnosis', 'FVC', 'FEV1', 'PerformanceStatus', 'Pain', 'Haemoptyisis', 'Dyspnoea', 'Cough', 'Weakness', 'SizeOfTumor', 'Diabetes', 'MI', 'PAD', 'Smokes', 'Asthma', 'Age', 'Risk1Y'])
learningTree.loadFile()
learningTree.printExcludedFields()
learningTree.printSetConditions()
learningTree.setCurrentSet()
learningTree.calculateSetEntropy()
learningTree.printEntropy()

This gives us the following output:

Positive outcomes in given set: 70.0
Negative outcomes in given set: 400.0
Entropy of given set is: 0.607171654871

You can see that the formula for entropy in this case would be:

Entropy(s) = -(70/470)log_2(70/470)-(400/470)log_2(400/470) = 0.607171654871

Now we need to cycle through each one of the attributes, except for the outcome variable, and determine which one of the attributes will reduce entropy by the greatest amount. In other words, which one of the attributes gives us the greatest information gain.

To calculate information gain for a binary classification problem, we use the following calculation:

Gain(S, A) = Entropy(S) - \sum_{v \in Values(v)} (S_{v}/S)Entropy(S_{v})

To do this with the class, we do the following:

1
2
learningTree.getHighestGain()
learningTree.printMaxGain()

This produces the following output:

Attribute with maximum gain is: {‘name’: ‘Diagnosis’, ‘gain’: 0.024284175235364205}

Remember, at this point we are processing the entire set of data (all 470 records) and we are not filtering the data in any way. The output above suggests that the diagnosis field will reduce the entropy of the entire set of data by the greatest amount out of all of the available attributes in the file.

Let’s get a little more output to see how this works. Below is the additional output from the getHighestGain() method for the first attribute processed:

Total records in set: 470.0
Field is: Diagnosis
Total number with value DGN1: 1
Total number with value DGN1 and outcome F: 1
Entropy of the set with value: DGN1: 0.0
Total number with value DGN2: 52
Total number with value DGN2 and outcome F: 40
Total number with value DGN2 and outcome T: 12
Entropy of the set with value: DGN2: 0.779349837292
Total number with value DGN3: 349
Total number with value DGN3 and outcome F: 306
Total number with value DGN3 and outcome T: 43
Entropy of the set with value: DGN3: 0.538515706679
Total number with value DGN4: 47
Total number with value DGN4 and outcome F: 40
Total number with value DGN4 and outcome T: 7
Entropy of the set with value: DGN4: 0.607171654871
Total number with value DGN5: 15
Total number with value DGN5 and outcome F: 8
Total number with value DGN5 and outcome T: 7
Entropy of the set with value: DGN5: 0.996791631982
Total number with value DGN6: 4
Total number with value DGN6 and outcome F: 4
Entropy of the set with value: DGN6: 0.0
Total number with value DGN8: 2
Total number with value DGN8 and outcome F: 1
Total number with value DGN8 and outcome T: 1
Entropy of the set with value: DGN8: 1.0
Gain for this attribute: 0.0242841752354

We can see from this (knowing the calculation for gain) that the final formula for information gain for this attribute would be:

Gain(S, A) = 0.607171654871 - (1/470)(0) -  (52/470)(0.779349837292) - (349/470)(0.538515706679) - (47/470)(0.607171654871) - (15/470)(0.996791631982) - (4/470)(0) - (2/470)(1.0) = 0.0242841752354

Now we find the attribute with the maximum gain:

Attribute is: Diagnosis
Gain for this attribute: 0.0242841752354
Attribute is: FVC
Gain for this attribute: 0.0203096548951
Attribute is: FEV1
Gain for this attribute: 0.00305944453271
Attribute is: PerformanceStatus
Gain for this attribute: 0.00652051279898
Attribute is: Pain
Gain for this attribute: 0.00212629826488
Attribute is: Haemoptyisis
Gain for this attribute: 0.00289481096538
Attribute is: Dyspnoea
Gain for this attribute: 0.0067109933434
Attribute is: Cough
Gain for this attribute: 0.00603793299662
Attribute is: Weakness
Gain for this attribute: 0.00495240811225
Attribute is: SizeOfTumor
Gain for this attribute: 0.0209567260041
Attribute is: Diabetes
Gain for this attribute: 0.00720382164832
Attribute is: MI
Gain for this attribute: 0.000992338695742
Attribute is: PAD
Gain for this attribute: 0.000868693043524
Attribute is: Smokes
Gain for this attribute: 0.00600290579821
Attribute is: Asthma
Gain for this attribute: 0.000992338695742
Attribute is: Age
Gain for this attribute: 0.0036530835234
Attribute with maximum gain is: {‘name’: ‘Diagnosis’, ‘gain’: 0.024284175235364205}

Out of all of the available attributes, the maximum gain is for the Diagnosis variable. We therefore set this attribute as the root node and we get all of the possible values for this attribute.

1
2
learningTree.setNodeBranches('Diagnosis')
learningTree.printBranches()

['DGN8', 'DGN1', 'DGN3', 'DGN2', 'DGN5', 'DGN4', 'DGN6']

These are the possible values of diagnosis. Now in a fully recursive implementation, the process would then cycle through each one of these branches, setting a node for each branch, and performing all of the calculations until the tree is exhausted. In our case we are going to manually traverse just one more level down for explanation.

We first need to calculate the new set for the branch. We can do this by excluding the values that we have already used by traversing the branch from the total set. For example, to calculate

S_{v}

for the DGN3 branch:

1
2
3
4
5
learningTree.setSetConditions([{ 'name' : 'Diagnosis', 'value' : 'DGN3' }])
learningTree.setExcludedFields(['Risk1Y', 'Diagnosis'])
learningTree.processBranch('DGN3')
learningTree.printEntropy()
learningTree.printMaxGain()

The conditions will filter the set to only contain observations where ‘Diagnosis’ is equal to ‘DGN3′. As we traverse down the tree we continue to add these conditions to represent the path that we took down to the leaf node. In the case below, the next node following the ‘Diagnosis’ node down branch ‘DGN3′ is the ‘SizeOfTumor’ attribute.

Calculating nodes for branch: DGN3
Fields currently excluded as potential nodes: ['Risk1Y', 'Diagnosis']
Conditions placed on current set:
[{'name': 'Diagnosis', 'value': 'DGN3'}]
Attribute is: FVC
Gain for this attribute: 0.0178882581378
Attribute is: FEV1
Gain for this attribute: 0.00440246814813
Attribute is: PerformanceStatus
Gain for this attribute: 0.00889217478759
Attribute is: Pain
Gain for this attribute: 0.0119114583755
Attribute is: Haemoptyisis
Gain for this attribute: 0.00515530395553
Attribute is: Dyspnoea
Gain for this attribute: 0.00720521042675
Attribute is: Cough
Gain for this attribute: 0.00429029313854
Attribute is: Weakness
Gain for this attribute: 0.0032716018855
Attribute is: SizeOfTumor
Gain for this attribute: 0.0258570293306
Attribute is: Diabetes
Gain for this attribute: 0.00582417248134
Attribute is: MI
Gain for this attribute: 0.00109042213585
Attribute is: PAD
Gain for this attribute: 0.00273871968586
Attribute is: Smokes
Gain for this attribute: 0.00443986103654
Attribute is: Asthma
Gain for this attribute: 0.00109042213585
Attribute is: Age
Gain for this attribute: 0.0032465590173
Positive outcomes in given set: 43.0
Negative outcomes in given set: 306.0
Entropy of given set is: 0.538515706679
Attribute with maximum gain is: {‘name’: ‘SizeOfTumor’, ‘gain’: 0.025857029330573714}

I haven’t yet implemented the recursive elements but to illustrate I’ve manually followed this process down two branches of the tree and I’ve visualized the tree using D3. See the chart below for an example of hwo this tree might eventually start to construct itself:

I’ll fill the tree out once I can get the recursion working properly!

Hope you enjoyed this post! Until next time…

Weighted Average Calculation in MapReduce

Hadoop already is and will likely become an even more major player in the technology landscape for healthcare data analysis as new data producers are integrated into the consumer marketplace. A perfect example of a “new data producer” is a wearable device that streams large quantities of data, typically from an embedded accelerometer and gyroscope. Big data technology is expected to play a big role in analyzing this type of data, and organizations are researching potential applications of big data technology when it is combined with wearable devices. Intel and the Michael J. Fox Foundation for Parkinson’s research made headlines when they announced their efforts to combine big data technologies and wearable devices in order to analyze Parkinson’s symptoms. The NIH is also funding research into wearable devices through a $536,000 grant that was awarded to UMass professor Benjamin Marlin, whose lab specializes in machine learning and the analysis of big data. As such, it seems reasonable that IT professionals should be learning about Hadoop and the MapReduce framework, as many interesting projects will likely revolve around this technology in the future.

Hadoop allows us to process and analyze data that is stored in files without first loading data into database structures. Any professional programmer will tell you that this is not new (files have long been sources of input to their programs) but Hadoop combines this file-based approach with parallelism and distributed processing, allowing processes to scale up without exponentially increasing processing time. Using this alternative processing paradigm, Hadoop avoids loading data into databases and passing data around between database structures. It also allows us to scale up our processing capacity by adding machines to the Hadoop “cluster”. Both of these benefits can buy a data processing shop time when processing large quantities of data.

But the downside to Hadoop is its relative complexity. In an IT landscape that has been dominated by relational databases, SQL and PL/SQL have become a standard skill-set for data processing. While PL/SQL is a robust programming language with many features (e.g. object-orientation), PL/SQL and SQL are both very different skills from Java programming and unique skills associated with the Hadoop framework. This has produced a skills gap within the industry, allowing Hadoop specialists to command very high salaries. But it has also led many companies to “spin their wheels” trying to implement complex Hadoop architectures and processes. This post, which explores how to implement a relatively simple SQL query in Hadoop, may help highlight the differences in skill-sets required to work in Hadoop over traditional SQL-based database technologies.

In another post, I wrote about a simple SQL query for calculating weighted average charges from the Medicare’s provider and utilization data by provider type. The provider utilization and charges dataset contains real Medicare billings and is clean and well-prepared. However, it is already aggregated at some level to protect patient confidentiality, so in order to calculate averages we need to calculate weighted averages.

For example, the SQL query we used in the last post was:

SELECT	PROVIDER_TYPE, 
	SUM(LINE_SERVICE_COUNT), 
	SUM(LINE_SERVICE_COUNT * AVERAGE_ALLOWED_AMOUNT) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_ALLOWED_AMOUNT,
	SUM(LINE_SERVICE_COUNT * AVERAGE_SUBMITTED_CHARGE) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_SUBMITTED_CHARGE,
	SUM(LINE_SERVICE_COUNT * AVERAGE_PAYMENT_AMOUNT) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_PAYMENT_AMOUNT
FROM UTILIZATION_AND_CHARGES
WHERE HCPCS_CODE = 93454
GROUP BY PROVIDER_TYPE

We will spend the remainder of this post performing the same calculation using Hadoop’s MapReduce framework.

It’s important to understand some basic concepts about MapReduce, which form the foundation of the MapReduce way-of-thinking. “Hadoop, the Definitive Guide” has a great section that clearly articulates the inner-workings of the MapReduce framework. Chapter two in this book is invaluable for gaining a basic understanding of the process.

Essentially, each program consists of a map phase and a reduce phase. Each phase accepts input and writes output, and the programmer determines the types of input and output data. Typically, the program starts by reading data from a file. This file becomes input to the map phase, which processes the file line by line. When processing lines from the file, the goal of the map phase is to generate a series of key-value pairs that will be used in the final calculation by the reduce phase of the program. Suppose, for example, that you want to perform some type of aggregate calculation. In that case, the key would be the value that you would use in your GROUP BY clause in a typical SQL statement and the values would be the data points that are being fed into the function to get the average.

Let’s use an example from the provider charges and utilization file to illustrate first how this key-value concept works before we actually write a MapReduce program. Here are the first 10 records of the Medicare provider utilization and charges file, just printing the 14th, 19th, and 23rd columns. These columns are provider type, average submitted charge, and line service count by provider/HCPCS code.

Internal Medicine 115 135.25
Internal Medicine 93 198.59
Internal Medicine 111 38.75
Internal Medicine 544 70.95
Internal Medicine 75 101.74
Internal Medicine 95 71.06
Internal Medicine 191 105.01
Pathology 226 11.64

We have two unique provider types in this example (“Internal Medicine” and “Pathology”), and these will serve as the keys that are included in the output from our map function. In our case, we want to calculate a weighted average, which will be the sum of the products of line service count and average submitted charges for each provider divided by the sum of line service counts across all of the providers of the same type. Therefore, for each record in the input, we need to output the provider type as a key, along with a “tuple” containing the submitted charge and the line service count, which will allow us to do the calculations in the reduce phase.

To illustrate, the map function will first produce the following output for these first 10 records:

(Internal Medicine, (115, 135.25))
(Internal Medicine, (93, 198.59))
(Internal Medicine, (111, 38.75))
(Internal Medicine, (544, 70.95))
(Internal Medicine, (75, 101.74))
(Internal Medicine, (95, 71.06))
(Internal Medicine, (191, 105.01))
(Pathology, (226, 11.64))

It will then process the output of the map function prior to sending output to the reduce function. The output of the map function thus becomes:

(Internal Medicine, [(115, 135.25), (93, 198.59), (111, 38.75), (544, 70.95), (75, 101.74), (95, 71.06), (191, 105.01)])
(Pathology, (226, 11.64))

The reduce function then iterates over the array for each key, allowing you to do your calculations for each key value. The output of the reduce process then consists of the key, along with whatever calculated value you’ve chosen to print. In our specific case, this calculation consists of the weighted average charges calculation.

Let’s see what this looks like in practice…

To begin, we create a class that will serve as our mapper. In our case, the class is “WeightedChargesMapper”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Java Document
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeightedAverageChargesMapper extends Mapper<Object, Text, Text, TotalChargeTotalServiceTuple> {
	private Text grouper = new Text();
	private TotalChargeTotalServiceTuple outTuple = new TotalChargeTotalServiceTuple();
	
	public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		String[] fields = value.toString().split("\t");
		String grouperVal = fields[13];
		outTuple.setTotalCharges(Float.parseFloat(fields[18])*Float.parseFloat(fields[21]));
		outTuple.setTotalServices(Float.parseFloat(fields[18]));
		outTuple.setCount(1);
		grouper.set(grouperVal);
		context.write(grouper, outTuple);
	}
}

This code takes care of the preliminary mapping phase of our program. First, we process the input text file line by line, splitting each line on the tab control character (\t). We assign the value in the 13th position of the resulting array (i.e. the 13th field in the file) to grouperVal, which we then assign to grouper, which is a Text object. We then assign the output variable, which is a custom class created for this purpose (more details on that below). The output variable in outTuple becomes the value portion of the key-value pairs that result from the mapping phase. Remember, in this case the first portion of the mapping process is to produce key-value pairs of the form (Internal Medicine, (191, 105.01)), where the first value in this tuple is held in grouper and the second value is held in outTuple. It’s important to note the classes that are used in the extends portion of the above code, as the last two classes must be the same as the output classes that get written to the context, which we can see in the last line of the map method above. The last line of the method writes the resulting key value pair to the context of the program execution, which will allow our map output to be consumed by the reducer.

We mentioned that in order to write a tuple as the value in the key-value output of the map phase, we needed to create a custom class, which we then used in the map phase. The code for this custom class is below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Java Document
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.io.DataOutput;
import java.io.DataInput;


public class TotalChargeTotalServiceTuple implements Writable {
	private float totalCharges;
	private float totalServices;
	private float count = 0;
	
	public float getTotalCharges() {
		return totalCharges;
	}
	
	public void setTotalCharges(float totalCharges) {
		this.totalCharges = totalCharges;
	}
	
	public float getTotalServices() {
		return totalServices;
	}
	
	public void setTotalServices(float totalServices) {
		this.totalServices = totalServices;
	}
	
	public float getCount() {
		return count;
	}	
	
	public void setCount(float count) {
		this.count = count;
	}
	
	public void readFields(DataInput in) throws IOException {
		totalCharges = in.readFloat();
		totalServices = in.readFloat();
		count = in.readFloat();
	}
	
	public void write(DataOutput out) throws IOException {
		out.writeFloat(totalCharges);
		out.writeFloat(totalServices);
		out.writeFloat(count);
	}
	
	public String toString() {
		return totalCharges + "\t" + totalServices + "\t" + count;
	}
}

This is a fairly simple class with methods for getting and setting the charges values from the provider charges and utilization file. The important methods to note here are the readFields and write methods, which are methods of Writable that are used internally to serialize and de-serialize this object when writing it as output for the map phase, and when reading it as input for the reduce phase.

Now that we have a mapper, we need to write a reducer. The reducer will read the input from the map phase and will process each tuple in its input one at a time, allowing us to calculate values for each key and write the reducer’s output to the context.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Java Document
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class WeightedAverageChargesReducer extends Reducer<Text, TotalChargeTotalServiceTuple, Text, FloatWritable> {
	private float weightedAverage;
	private FloatWritable result = new FloatWritable();
	
	public void reduce(Text key, Iterable<TotalChargeTotalServiceTuple> values, Context context) throws IOException, InterruptedException {
		float sum = 0;
		float sumCharges = 0;
		float sumServices = 0;
		
		for (TotalChargeTotalServiceTuple val : values) {
			sum += val.getCount();
			sumCharges += val.getTotalCharges();
			sumServices += val.getTotalServices();
		}
		weightedAverage = (sumCharges / sumServices);
		result.set(weightedAverage);
		
		context.write(key, result);
	}
}

Note that the first two parameters combined represent the key/value pair that was written to the context by the mapper, and the second two parameters are the values that are written to the context by the reducer. The types of objects used for these parameters must match the types that are produced and consumed by the reducer and the mapper. Also note that the reducer is processing each key/value pair in the result set from the mapper one at a time. This is similar to how one might process values stored in an array in any other programming language by iterating over the array, however, the actual iteration is all done behind the scenes by the MapReduce framework.

The general logic of the reducer is fairly simple. For each key, we sum up the product of total charges and line service count, as well as the line service count. After iterating over each value in the array of tuples, we divide the sum of charges by the sum of the line service count. We then write the key (provider type) and the value (weighted average charges) to the context.

The final step in writing a map reduce program is to write the class that will set up all the job-level parameters. This is what actually manages the job that we are running.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Java Document
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WeightedAverageCharge {
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage: MaxSubmittedCharge <input path> <output path>");
			System.exit(-1);
		}
		
		Job job = new Job();
		job.setJarByClass(WeightedAverageCharge.class);
		job.setJobName("Weighted Average Charges");
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		job.setMapperClass(WeightedAverageChargesMapper.class);
		job.setReducerClass(WeightedAverageChargesReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(TotalChargeTotalServiceTuple.class);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

This class sets up the job-level attributes that are required to execute the job. Some interesting items to note are the line to set the path to the file that will be processed as input to the mapper (FileInputFormat.addInputPath(job, new Path(args[0]));) as well as the code that assigns the output path, which is where the reducer will write its output (FileOutputFormat.setOutputPath(job, new Path(args[1]));) on the HDFS (Hadoop Distributed File System). Finally, note the lines that set the mapper class, the reducer class, the output key class, and the output value class.

Finally we can compile all of our classes and run them using the taskrunner. Setting up your environment can be a bit of a pain, but once you have everything up and running, start up the Hadoop services, compile the classes, and run the program!

1
2
3
4
5
6
7
8
9
10
kelder86$ ./bin/start-all.sh
kelder86$ javac WeightedAverageCharge.java TotalChargeTotalServiceTuple.java WeightedAverageChargesMapper.java WeightedAverageChargesReducer.java
kelder86$ jar cvf WeightedAvgCharge.jar WeightedAverageCharge*class TotalChargeTotalServiceTuple.class
kelder86$ hadoop jar WeightedAvgCharge.jar WeightedAverageCharge ./input/physician_charges_10_lines_no_header.txt output12
kelder86$ hadoop fs -ls /user/kelder86/output12-rw-r--r--   1 kelder86 supergroup          0 2014-11-25 11:55 /user/kelder86/output12/_SUCCESS
drwxr-xr-x   - kelder86 supergroup          0 2014-11-25 11:54 /user/kelder86/output12/_logs
-rw-r--r--   1 kelder86 supergroup         43 2014-11-25 11:55 /user/kelder86/output12/part-r-00000
kelder86$ hadoop fs -cat /user/kelder86/output12/part-r-00000
Internal Medicine	90.97939
Pathology	11.64

Getting Averages from the CMS Provider Utilization and Charges Data

I’ve mentioned that one of my goals is to find time to work on a side project that utilizes CMS’s open data to create a data visualization application.  I’m moving a bit slowly but, as they say, better late than never!

This post describes a method for getting accurate average values from CMS’s provider utilization and payment data. The key to calculating an accurate average is to use the weighted average formula to weight each provider’s observed values by the provider’s “line service count” for the procedure.

I’ve organized my project into six requirements.  Essentially, the requirements describe different charts and visualizations that a user might want to see when deciding on which hospital or physician to visit for a particular procedure.  Because of the structure of the CMS data, many of these requirements need to use the weighted average formula. In this post, I’ll work on the “Average Charges by Provider Type” requirement.

Here is how I’ve phrased that requirement:

“As a patient, I want to look up procedures and view charts of the potential costs of the procedures, as well as how much costs vary within certain segments. I want to filter the data by HCPCS code, state, city, zip code, payer type, and/or place of service. I also want to be able to group the results by the above attributes. I want to be able to select which measures to visualize.”

Here is a rough wireframe that displays a potential design for the user interface:

A wireframe for the Average Charges by Provider view / requirement

A wireframe for the Average Charges by Provider view / requirement

My initial thought was that I could just use an avg() aggregate query to get average submitted charges, average allowed amount, and average cost to patient grouped by provider type, state, zip code, and other attributes. For example, I thought that maybe something like the below query could work:

SELECT	PROVIDER_TYPE,
	AVG(AVERAGE_ALLOWED_AMOUNT),
	AVG(AVERAGE_SUBMITTED_CHARGE),
	AVG(AVERAGE_PAYMENT_AMOUNT)
FROM UTILIZATION_AND_CHARGES
WHERE HCPCS_CODE = 93454
GROUP BY PROVIDER_TYPE;

This would work just fine if we knew that every provider represented the same proportion of total procedure performed for HCPCS_CODE 93454 (a cardiac catheterization procedure).  But this is not the case…  Some providers represent a greater proportion of the total number of cardiac catheterization procedures while other providers represent a smaller proportion.  We are taking an average of averages, and each average value is taken from a set of observations whose quantity is not consistent across all providers.  

Since the number of observations for each provider within a provider type subset might vary we need to take a weighted average. The formula for a weighted average is simple and is shown below:

(w_1a_1 + w_2a_2 + ... + w_na_n)/(w_1+w_2+...+w_n)

w_i is the number of observations in the group (or the weight) and a_i is the average value of the group.

So in our case, we need to find the weighted average of the allowed amount, submitted charge, and payment amount by provider type (or whatever GROUP BY variable we are using). We need to use each provider/procedure’s line service count as the weight. The actual value is the average for each provider and procedure (e.g. average allowed charges). Fortunately these values are already provided to us in the summarized data that has been provided by CMS.

To illustrate the structure of the CMS data, here are the first 10 records returned for HCPCS_CODE 93454:

FIRST_NAME LAST_NAME PROVIDER_TYPE LINE_SERVICE_COUNT AVERAGE_ALLOWED_AMOUNT
MARK STELLINGWORTH Cardiology 32.0 233.9396875000
JOHN FRANK Cardiology 42.0 219.4111904800
BRIAN CAMBI Cardiology 29.0 201.0451724100
LONNIE HARRISON Cardiology 20.0 155.7900000000
DANIEL DONOHUE Interventional Pain Management 14.0 263.5814285700
RICARDO CORDIDO Cardiology 22.0 168.6618181800
JAIME BENREY Cardiology 19.0 127.4600000000
JONATHAN BOLLES Internal Medicine 11.0 254.3900000000
HENOCK ZABHER Emergency Medicine 13.0 204.8707692300
RANDALL BREAM Cardiology 15.0 208.4993333300

So in this case, if we were to find a weighted average allowed amount among all cardiologists in just these first 10 records the equation would be:

\frac{32*233.94+42*219.41+29*201.05+20*155.79+22*168.66+19*127.46+15*208.5}{32+42+29+20+22+19+15}

So how do we write the query for this? It’s easy to get sidetracked and to start playing around with analytic functions. I myself went down this route at first using the excellent tutorials written by Quassnoi on Explain Extended. The exercise was interesting but the query turned out to be 73 lines long.

Sometimes all it takes is stepping back and rethinking the query. The following query yields the same results and performs just as well as my 73 line long query:

SELECT	PROVIDER_TYPE, 
	SUM(LINE_SERVICE_COUNT), 
	SUM(LINE_SERVICE_COUNT * AVERAGE_ALLOWED_AMOUNT) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_ALLOWED_AMOUNT,
	SUM(LINE_SERVICE_COUNT * AVERAGE_SUBMITTED_CHARGE) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_SUBMITTED_CHARGE,
	SUM(LINE_SERVICE_COUNT * AVERAGE_PAYMENT_AMOUNT) / SUM(LINE_SERVICE_COUNT) AS AVERAGE_PAYMENT_AMOUNT
FROM UTILIZATION_AND_CHARGES
WHERE HCPCS_CODE = 93454
GROUP BY PROVIDER_TYPE

Now that we have this query we can put it into a web service that will relay the results back to a calling object so that the results can be displayed in a bar chart. This query takes about 60 seconds to execute, however, so it might be worthwhile spending some upfront time pre-calculating these values for all of the HCPCS codes in the dataset so that the end user experience is better. We could even consider using a “big data” technology to pre-calculate these values. For example, a Hadoop scripting language like Pig could be useful for this purpose.

Until next time…