In this blog we will use the Python interface to Spark to determine whether or not someone makes more or less than $50,000. The logistic regression model will be used to make this determination.
Please note that no prior knowledge of Python, Spark or logistic regression is required. However, it is assumed that you are a seasoned software developer.
There are various tutorials on Spark. There is the official documentation on Spark. However, if you are an experienced software professional and want to just jump in and kick the tires, there doesn't seem to be much available. Well, at least I couldn't find any.
Let me now explicitly define jumping in. Jumping in involves solving an almost trivial problem that demonstrates a good deal of the concepts and software. Also, it involves skipping steps. Links will be provided so that if someone is interested in the missing steps, they can look up the details. If skipping steps bothers you, immediately stop and go read a tutorial.
As mentioned earlier, we are going to use Python on Spark to address logistic regression for our jumping in. This is our almost trivial problem that demonstrates a good deal of the concepts and software.
The first thing that you need is a working environment. You could install and configure your own environment. However, that would not be in line with jumping in. Instead I recommend using
databricks Spark community edition (DSCE). If you are using DSCE, refer to "
Welcome to Databricks" on how to create a cluster, create a notebook, attach the notebook to a cluster and actually use the notebook.
Next, you need to connect to the Spark environment. In the database world, you create a
database connection. In the Spark world, you create a context (
SQLContext,
SparkContext/
SparkSession). If you are using DSCE, the following three variables will be predefined for you
- SparkContext: sc
- SparkSession: spark
- SQLContext: sqlContext
If you would like the IPython notebook associated with this blog, click here. If for some reason you don't have software to read the IPython notebook, you can download a pdf version of it by clicking here.
Row / Column
We are almost ready to code. First we have to talk about
Row and
DataFrame. A Row is just like a row in a spreadsheet or a row in a table. A Dataframe is just a collection of Row's. For the most part, you can think of a DataFrame as a table.
Code Snippet 1
from pyspark.sql import Row
from pyspark.mllib.linalg import DenseVector
row = Row("gender", "foo", "bar")
dataFrame = sc.parallelize([
row("0", 3.0, DenseVector([0, 2.1, 1.0])),
row("1", 1.0, DenseVector([0, 1.1, 1.0])),
row("1", -1.0, DenseVector([0, 3.4, 0.0])),
row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()
dataFrame.collect()
Nothing fancy. You create a row which has column names gender, foo and bar. You then create a bunch of row's with actual data. Lastly, you group the row's into a DataFrame.
DenseVector was used to demonstrate that a cell in a Row can have a complex data structure. If you are curious about parallelize and toDF, check the references at the end of the blog. This will be true for the rest of the blog. If you are not sure what some magic word means, go to the reference section at the end of the blog.
If things are working, you should get an output like that shown below.
Output of Code Snippet 1
[Row(gender=u'0', foo=3.0, bar=DenseVector([0.0, 2.1, 1.0])),
Row(gender=u'1', foo=1.0, bar=DenseVector([0.0, 1.1, 1.0])),
Row(gender=u'1', foo=-1.0, bar=DenseVector([0.0, 3.4, 0.0])),
Row(gender=u'0', foo=-3.0, bar=DenseVector([0.0, 4.1, 0.0]))]
Before going further, we need to take a little detour. Algorithms like numeric data. So we need to do things like convert a column for gender that has Male / Female / Unknown to binary values. We do this by creating two columns.
- The first column will be used to indicate whether the person is or is not male. A one means that the person is male and a zero indicates that the person is not male. StringIndexer will be used to convert categories to numerical values.
- The second column will be used to indicate whether the person is or is not female. A one means that the person is female and a zero indicates that the person is not female.
Notice that we don't need a third column for Unknown. If there is a zero in both the Male and Female columns, we know that the gender is Unknown. This process of taking a single category column and decomposing it to many columns of binary values will be accomplished by the
OneHotEncoder.
StringIndexer
A StringIndexer converts categories to numbers. The numbers have a range from 0 to number of categories minus one. The most frequent category gets a number of zero, the second most frequent category gets a number of 1 and so on. We are going to use the code snippets from
Preserve index-string correspondence spark string indexer from StackOverFlow.Com to demonstrate what the preceding English means.
Let's actually create a StringIndexer and use it to map/fit/transform categories to numbers.
Code Snippet 2
dataFrame = sqlContext.createDataFrame(
[(0, "a"), (1, "b"), (2, "b"), (3, "c"), (4, "c"), (5, "c"), (6,'d'), (7,'d'), (8,'d'), (9,'d')],
["id", "category"])
dataFrame.collect()
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
modelStringIndexer = stringIndexer.fit(dataFrame)
transformedDataFrame = modelStringIndexer.transform(dataFrame)
transformedDataFrame.collect()
Output of Code Snippet 2
[Row(id=0, category=u'a', categoryIndex=3.0),
Row(id=1, category=u'b', categoryIndex=2.0),
Row(id=2, category=u'b', categoryIndex=2.0),
Row(id=3, category=u'c', categoryIndex=1.0),
Row(id=4, category=u'c', categoryIndex=1.0),
Row(id=5, category=u'c', categoryIndex=1.0),
Row(id=6, category=u'd', categoryIndex=0.0),
Row(id=7, category=u'd', categoryIndex=0.0),
Row(id=8, category=u'd', categoryIndex=0.0),
Row(id=9, category=u'd', categoryIndex=0.0)]
Notice how d's got 0.0 because they are the most numerous. The letter c's got 1.0 because they are the second most numerous. And so on. The code snippet below will make this more clear.
Code Snippet 3
transformedDataFrame.select('category','categoryIndex').distinct().orderBy('categoryIndex').show()
Output of Code Snippet 3
+--------+-------------+
|category|categoryIndex|
+--------+-------------+
| d| 0.0|
| c| 1.0|
| b| 2.0|
| a| 3.0|
+--------+-------------+
A OneHotEncoder converts category numbers to binary vectors with at most a single one-value per row. For a true understanding of one-hot encoding, refer to the associated
Wikipedia page.
Next, let's use a OneHotEncoder it to transform the category index that we created earlier to a binary vector.
Code Snippet 4
from pyspark.ml.feature import OneHotEncoder
oneHotEncoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVector")
oneHotEncodedDataFrame = oneHotEncoder.transform(transformedDataFrame)
oneHotEncodedDataFrame.show()
Output of Code Snippet 4
+---+--------+-------------+--------------+
| id|category|categoryIndex|categoryVector|
+---+--------+-------------+--------------+
| 0| a| 3.0| (3,[],[])|
| 1| b| 2.0| (3,[2],[1.0])|
| 2| b| 2.0| (3,[2],[1.0])|
| 3| c| 1.0| (3,[1],[1.0])|
| 4| c| 1.0| (3,[1],[1.0])|
| 5| c| 1.0| (3,[1],[1.0])|
| 6| d| 0.0| (3,[0],[1.0])|
| 7| d| 0.0| (3,[0],[1.0])|
| 8| d| 0.0| (3,[0],[1.0])|
| 9| d| 0.0| (3,[0],[1.0])|
+---+--------+-------------+--------------+
The column categoryVector is a
SparseVector. It has 3 parts. The first part is the length of the vector. The second part are the indicies which contain values. The third part are the actual values. Below is a code snippet demonstrating this.
Code Snippet 5
from pyspark.mllib.linalg import SparseVector
v1 = SparseVector(5, [0,3], [10,9])
for x in v1:
print(x)
Output of Code Snippet 5
10.0
0.0
0.0
9.0
0.0
Notice how category a (categoryVector = (3,[],[])) is not included because it makes the vector entries sum to one and hence linearly dependent. The code snippet below will provide a better visual for this.
Code Snippet 6
oneHotEncodedDataFrame.select('category','categoryIndex', 'categoryVector').distinct().orderBy('categoryIndex').show()
Output of Code Snippet 6
+--------+-------------+--------------+
|category|categoryIndex|categoryVector|
+--------+-------------+--------------+
| d| 0.0| (3,[0],[1.0])|
| c| 1.0| (3,[1],[1.0])|
| b| 2.0| (3,[2],[1.0])|
| a| 3.0| (3,[],[])|
+--------+-------------+--------------+
By this point you are probably getting impatient. Luckly, we have just one more item to cover before we get to logistic regression. That one item is the VectorAssembler. A VectorAssembler just concatenates columns together. As usual, we will demonstrate what the words mean via a code snippet.
Code Snippet 7
from pyspark.ml.feature import VectorAssembler
dataFrame_1 = spark.createDataFrame([(1, 2, 3), (4,5,6)], ["a", "b", "c"])
vectorAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
dataFrame_2 = vectorAssembler.transform(dataFrame_1)
dataFrame_2.show()
Output of Code Snippet 7
+---+---+---+-------------+
| a| b| c| features|
+---+---+---+-------------+
| 1| 2| 3|[1.0,2.0,3.0]|
| 4| 5| 6|[4.0,5.0,6.0]|
+---+---+---+-------------+
We have now learned enough Spark to look at a specific problem involving logistic regression. We are going to work through the example provided in the
databricks documentation.
If you are interested in learning about logistic regression, recommend reading "
Chapter 6: The Grand daddy of Supervised Artificial Intelligence - Regression (
spreadsheet)" of the book
Data Smart: Using Data Science to Transform Information into Insight by John W. Foreman (2013). Personally, I like the book because it provides minimal math and an implementation of logistic regression in a spreadsheet.
Drop / Create Table
- Drop Table
%sql
DROP TABLE IF EXISTS adult
- Create Table
%sql
CREATE TABLE adult (
age DOUBLE,
workclass STRING,
fnlwgt DOUBLE,
education STRING,
education_num DOUBLE,
marital_status STRING,
occupation STRING,
relationship STRING,
race STRING,
sex STRING,
capital_gain DOUBLE,
capital_loss DOUBLE,
hours_per_week DOUBLE,
native_country STRING,
income STRING)
USING com.databricks.spark.csv
OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")
Convert table to a DataFrame
dataset = spark.table("adult")
Get a list of columns in original dataset
cols = dataset.columns
- This step has to be done here and not later. Unfortunately, the databricks examples re-uses the variable dataset.