In this blog we will use the Python interface to Spark to determine whether or not someone makes more or less than $50,000. The logistic regression model will be used to make this determination.
Please note that no prior knowledge of Python, Spark or logistic regression is required. However, it is assumed that you are a seasoned software developer.
Please note that no prior knowledge of Python, Spark or logistic regression is required. However, it is assumed that you are a seasoned software developer.
There are various tutorials on Spark. There is the official documentation on Spark. However, if you are an experienced software professional and want to just jump in and kick the tires, there doesn't seem to be much available. Well, at least I couldn't find any.
Yes, there is Spark's Quick Start. Also, there are artifacts like databricks User Guide. Unfortunately, they have a smattering of stuff. You really don't get a chance to jump in.
Let me now explicitly define jumping in. Jumping in involves solving an almost trivial problem that demonstrates a good deal of the concepts and software. Also, it involves skipping steps. Links will be provided so that if someone is interested in the missing steps, they can look up the details. If skipping steps bothers you, immediately stop and go read a tutorial.
As mentioned earlier, we are going to use Python on Spark to address logistic regression for our jumping in. This is our almost trivial problem that demonstrates a good deal of the concepts and software.
The first thing that you need is a working environment. You could install and configure your own environment. However, that would not be in line with jumping in. Instead I recommend using databricks Spark community edition (DSCE). If you are using DSCE, refer to "Welcome to Databricks" on how to create a cluster, create a notebook, attach the notebook to a cluster and actually use the notebook.
Next, you need to connect to the Spark environment. In the database world, you create a database connection. In the Spark world, you create a context (SQLContext, SparkContext/SparkSession). If you are using DSCE, the following three variables will be predefined for you
- SparkContext: sc
- SparkSession: spark
- SQLContext: sqlContext
If you would like the IPython notebook associated with this blog, click here. If for some reason you don't have software to read the IPython notebook, you can download a pdf version of it by clicking here.
We are almost ready to code. First we have to talk about Row and DataFrame. A Row is just like a row in a spreadsheet or a row in a table. A Dataframe is just a collection of Row's. For the most part, you can think of a DataFrame as a table.
Row / Column
Now for the first code snippet. Please note that I took the code snippet from Encode and assemble multiple features in PySpark at StackOverFlow.Com.
Code Snippet 1
from pyspark.sql import Row from pyspark.mllib.linalg import DenseVector row = Row("gender", "foo", "bar") dataFrame = sc.parallelize([ row("0", 3.0, DenseVector([0, 2.1, 1.0])), row("1", 1.0, DenseVector([0, 1.1, 1.0])), row("1", -1.0, DenseVector([0, 3.4, 0.0])), row("0", -3.0, DenseVector([0, 4.1, 0.0])) ]).toDF() dataFrame.collect()
Nothing fancy. You create a row which has column names gender, foo and bar. You then create a bunch of row's with actual data. Lastly, you group the row's into a DataFrame. DenseVector was used to demonstrate that a cell in a Row can have a complex data structure. If you are curious about parallelize and toDF, check the references at the end of the blog. This will be true for the rest of the blog. If you are not sure what some magic word means, go to the reference section at the end of the blog.
If things are working, you should get an output like that shown below.
Output of Code Snippet 1
[Row(gender=u'0', foo=3.0, bar=DenseVector([0.0, 2.1, 1.0])),
Row(gender=u'1', foo=1.0, bar=DenseVector([0.0, 1.1, 1.0])),
Row(gender=u'1', foo=-1.0, bar=DenseVector([0.0, 3.4, 0.0])),
Row(gender=u'0', foo=-3.0, bar=DenseVector([0.0, 4.1, 0.0]))]
Before going further, we need to take a little detour. Algorithms like numeric data. So we need to do things like convert a column for gender that has Male / Female / Unknown to binary values. We do this by creating two columns.
- The first column will be used to indicate whether the person is or is not male. A one means that the person is male and a zero indicates that the person is not male. StringIndexer will be used to convert categories to numerical values.
- The second column will be used to indicate whether the person is or is not female. A one means that the person is female and a zero indicates that the person is not female.
StringIndexer
Let's actually create a StringIndexer and use it to map/fit/transform categories to numbers.
Code Snippet 2
dataFrame = sqlContext.createDataFrame( [(0, "a"), (1, "b"), (2, "b"), (3, "c"), (4, "c"), (5, "c"), (6,'d'), (7,'d'), (8,'d'), (9,'d')], ["id", "category"]) dataFrame.collect() from pyspark.ml.feature import StringIndexer stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex") modelStringIndexer = stringIndexer.fit(dataFrame) transformedDataFrame = modelStringIndexer.transform(dataFrame) transformedDataFrame.collect()
Output of Code Snippet 2
[Row(id=0, category=u'a', categoryIndex=3.0), Row(id=1, category=u'b', categoryIndex=2.0), Row(id=2, category=u'b', categoryIndex=2.0), Row(id=3, category=u'c', categoryIndex=1.0), Row(id=4, category=u'c', categoryIndex=1.0), Row(id=5, category=u'c', categoryIndex=1.0), Row(id=6, category=u'd', categoryIndex=0.0), Row(id=7, category=u'd', categoryIndex=0.0), Row(id=8, category=u'd', categoryIndex=0.0), Row(id=9, category=u'd', categoryIndex=0.0)]
Notice how d's got 0.0 because they are the most numerous. The letter c's got 1.0 because they are the second most numerous. And so on. The code snippet below will make this more clear.
Code Snippet 3
transformedDataFrame.select('category','categoryIndex').distinct().orderBy('categoryIndex').show()
Output of Code Snippet 3
+--------+-------------+ |category|categoryIndex| +--------+-------------+ | d| 0.0| | c| 1.0| | b| 2.0| | a| 3.0| +--------+-------------+
OneHotEncoder
Next, let's use a OneHotEncoder it to transform the category index that we created earlier to a binary vector.
Code Snippet 4
from pyspark.ml.feature import OneHotEncoder oneHotEncoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVector") oneHotEncodedDataFrame = oneHotEncoder.transform(transformedDataFrame) oneHotEncodedDataFrame.show()
Output of Code Snippet 4
+---+--------+-------------+--------------+ | id|category|categoryIndex|categoryVector| +---+--------+-------------+--------------+ | 0| a| 3.0| (3,[],[])| | 1| b| 2.0| (3,[2],[1.0])| | 2| b| 2.0| (3,[2],[1.0])| | 3| c| 1.0| (3,[1],[1.0])| | 4| c| 1.0| (3,[1],[1.0])| | 5| c| 1.0| (3,[1],[1.0])| | 6| d| 0.0| (3,[0],[1.0])| | 7| d| 0.0| (3,[0],[1.0])| | 8| d| 0.0| (3,[0],[1.0])| | 9| d| 0.0| (3,[0],[1.0])| +---+--------+-------------+--------------+
SparseVector
The column categoryVector is a SparseVector. It has 3 parts. The first part is the length of the vector. The second part are the indicies which contain values. The third part are the actual values. Below is a code snippet demonstrating this.
Code Snippet 5
from pyspark.mllib.linalg import SparseVector v1 = SparseVector(5, [0,3], [10,9]) for x in v1: print(x)
Output of Code Snippet 5
10.0 0.0 0.0 9.0 0.0
Notice how category a (categoryVector = (3,[],[])) is not included because it makes the vector entries sum to one and hence linearly dependent. The code snippet below will provide a better visual for this.
Code Snippet 6
Output of Code Snippet 6
By this point you are probably getting impatient. Luckly, we have just one more item to cover before we get to logistic regression. That one item is the VectorAssembler. A VectorAssembler just concatenates columns together. As usual, we will demonstrate what the words mean via a code snippet.
Code Snippet 7
from pyspark.ml.feature import VectorAssembler
dataFrame_1 = spark.createDataFrame([(1, 2, 3), (4,5,6)], ["a", "b", "c"])
vectorAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
dataFrame_2 = vectorAssembler.transform(dataFrame_1)
dataFrame_2.show()
Code Snippet 6
oneHotEncodedDataFrame.select('category','categoryIndex', 'categoryVector').distinct().orderBy('categoryIndex').show()
+--------+-------------+--------------+ |category|categoryIndex|categoryVector| +--------+-------------+--------------+ | d| 0.0| (3,[0],[1.0])| | c| 1.0| (3,[1],[1.0])| | b| 2.0| (3,[2],[1.0])| | a| 3.0| (3,[],[])| +--------+-------------+--------------+
VectorAssembler
Code Snippet 7
from pyspark.ml.feature import VectorAssembler
dataFrame_1 = spark.createDataFrame([(1, 2, 3), (4,5,6)], ["a", "b", "c"])
vectorAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
dataFrame_2 = vectorAssembler.transform(dataFrame_1)
dataFrame_2.show()
Output of Code Snippet 7
+---+---+---+-------------+ | a| b| c| features| +---+---+---+-------------+ | 1| 2| 3|[1.0,2.0,3.0]| | 4| 5| 6|[4.0,5.0,6.0]| +---+---+---+-------------+
Logistic Regression
We have now learned enough Spark to look at a specific problem involving logistic regression. We are going to work through the example provided in the databricks documentation.
If you are interested in learning about logistic regression, recommend reading "Chapter 6: The Grand daddy of Supervised Artificial Intelligence - Regression (spreadsheet)" of the book Data Smart: Using Data Science to Transform Information into Insight by John W. Foreman (2013). Personally, I like the book because it provides minimal math and an implementation of logistic regression in a spreadsheet.
Drop / Create Table
If you are interested in learning about logistic regression, recommend reading "Chapter 6: The Grand daddy of Supervised Artificial Intelligence - Regression (spreadsheet)" of the book Data Smart: Using Data Science to Transform Information into Insight by John W. Foreman (2013). Personally, I like the book because it provides minimal math and an implementation of logistic regression in a spreadsheet.
Drop / Create Table
- Drop Table
%sql DROP TABLE IF EXISTS adult
- Create Table
%sql CREATE TABLE adult ( age DOUBLE, workclass STRING, fnlwgt DOUBLE, education STRING, education_num DOUBLE, marital_status STRING, occupation STRING, relationship STRING, race STRING, sex STRING, capital_gain DOUBLE, capital_loss DOUBLE, hours_per_week DOUBLE, native_country STRING, income STRING) USING com.databricks.spark.csv OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")
Convert table to a DataFrame
dataset = spark.table("adult")Get a list of columns in original dataset
cols = dataset.columns
- This step has to be done here and not later. Unfortunately, the databricks examples re-uses the variable dataset.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"] stages = [] # stages in our Pipeline for categoricalCol in categoricalColumns: stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index") encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec") # Add stages. These are not run here, but will run all at once later on. stages += [stringIndexer, encoder]Create a StringIndexer on income
label_stringIdx = StringIndexer(inputCol = "income", outputCol = "label") stages += [label_stringIdx]Combine feature columns into a single vector column using VectorAssembler
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"] assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") stages += [assembler]Put data through all of the feature transformations using the stages in the pipeline
pipeline = Pipeline(stages=stages) pipelineModel = pipeline.fit(dataset) dataset = pipelineModel.transform(dataset)Keep relevant columns
selectedcols = ["label", "features"] + cols dataset = dataset.select(selectedcols) display(dataset)Split data into training and test sets
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100) print trainingData.count() print testData.count()
- Set seed for reproducibility
Train logistic regression model and then make predictions
from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10) lrModel = lr.fit(trainingData) predictions = lrModel.transform(testData) selected = predictions.select("prediction", "age", "occupation") display(selected)
- Output
prediction age occupation ---------- --- -------------- ... 0 20 Prof-specialty 1 35 Prof-specialty ...
- So, a prediction of 0 means that the person earns <=50K. While a prediction of 1 means that the person earns >50K
Summary
The advantage of jumping in is that you learn by solving a problem. Also, you don't spend weeks or months learning material like that listed in the references below before you can do anything. The disadvantage is that steps are skipped and full understanding of even the provided steps won't be present. It is realized that jumping in is not for everybody. For some people, standard tutorials are the way to begin.
References
- Book: Data Smart by John W. Foreman
- john-foreman.com
- O'Reilly
- Chapter 6: The Granddaddy of Supervised Artificial Intelligence - Regression
- Cartoon from New Yorker Magazine: Does your car have any ide why my car pulled it over?
- databricks
- sklearn.linear_model.LogisticRegression
- Spark.Apache.Org
- Spark Python API Docs [pyspark]
- pyspark.ml package [ML: Machine Learning]
- pyspark.sql module
- SparkContext
- pyspark.ml package [ML: Machine Learning]
- MLlib (Machine Learning)
- Spark Python API Docs [pyspark]
- StackOverFlow.Com
- Wikipedia