Thursday, December 22, 2016

Jumping into Spark (JIS): Python / Spark / Logistic Regression (Update 3)

In this blog we will use the Python interface to Spark to determine whether or not someone makes more or less than $50,000. The logistic regression model will be used to make this determination.

Please note that no prior knowledge of Python, Spark or logistic regression is required. However, it is assumed that you are a seasoned software developer. 

There are various tutorials on Spark. There is the official documentation on Spark. However, if you are an experienced software professional and want to just jump in and kick the tires, there doesn't seem to be much available. Well, at least I couldn't find any.

Yes, there is Spark's Quick Start. Also, there are artifacts like databricks User Guide. Unfortunately, they have a smattering of stuff. You really don't get a chance to jump in.

Let me now explicitly define jumping in. Jumping in involves solving an almost trivial problem that demonstrates a good deal of the concepts and software. Also, it involves skipping steps. Links will be provided so that if someone is interested in the missing steps, they can look up the details. If skipping steps bothers you, immediately stop and go read a tutorial.

As mentioned earlier, we are going to use Python on Spark to address logistic regression for our jumping in. This is our almost trivial problem that demonstrates a good deal of the concepts and software.

The first thing that you need is a working environment. You could install and configure your own environment. However, that would not be in line with jumping in. Instead I recommend using databricks Spark community edition (DSCE). If you are using DSCE, refer to "Welcome to Databricks" on how to create a cluster, create a notebook, attach the notebook to a cluster and actually use the notebook.

Next, you need to connect to the Spark environment. In the database world, you create a database connection. In the Spark world, you create a context (SQLContextSparkContext/SparkSession). If you are using DSCE, the following three variables will be predefined for you
  1. SparkContext: sc
    1. SparkSession: spark
      1. SQLContext: sqlContext
      If you would like the IPython notebook associated with this blog, click here. If for some reason you don't have software to read the IPython notebook, you can download a pdf version of it by clicking here.


      Row / Column


      We are almost ready to code. First we have to talk about Row and DataFrame. A Row is just like a row in a spreadsheet or a row in a table. A Dataframe is just a collection of Row's. For the most part, you can think of a DataFrame as a table.

      Now for the first code snippet. Please note that I took the code snippet from Encode and assemble multiple features in PySpark at StackOverFlow.Com.

      Code Snippet 1
      
      
      from pyspark.sql import Row
      from pyspark.mllib.linalg import DenseVector
      
      row = Row("gender", "foo", "bar")
      
      dataFrame = sc.parallelize([
        row("0",  3.0, DenseVector([0, 2.1, 1.0])),
        row("1",  1.0, DenseVector([0, 1.1, 1.0])),
        row("1", -1.0, DenseVector([0, 3.4, 0.0])),
        row("0", -3.0, DenseVector([0, 4.1, 0.0]))
      ]).toDF()
      
      dataFrame.collect()

      Nothing fancy. You create a row which has column names gender, foo and bar. You then create a bunch of row's with actual data. Lastly, you group the row's into a DataFrame. DenseVector was used to demonstrate that a cell in a Row can have a complex data structure. If you are curious about parallelize and toDF, check the references at the end of the blog. This will be true for the rest of the blog. If you are not sure what some magic word means, go to the reference section at the end of the blog.

      If things are working, you should get an output like that shown below.

      Output of Code Snippet 1

      [Row(gender=u'0', foo=3.0, bar=DenseVector([0.0, 2.1, 1.0])),
       Row(gender=u'1', foo=1.0, bar=DenseVector([0.0, 1.1, 1.0])),
       Row(gender=u'1', foo=-1.0, bar=DenseVector([0.0, 3.4, 0.0])),
      Row(gender=u'0', foo=-3.0, bar=DenseVector([0.0, 4.1, 0.0]))]

      Before going further, we need to take a little detour. Algorithms like numeric data. So we need to do things like convert a column for gender that has Male / Female / Unknown to binary values. We do this by creating two columns.
      1. The first column will be used to indicate whether the person is or is not male. A one means that the person is male and a zero indicates that the person is not male. StringIndexer will be used to convert categories to numerical values.
      2. The second column will be used to indicate whether the person is or is not female. A one means that the person is female and a zero indicates that the person is not female.
      Notice that we don't need a third column for Unknown. If there is a zero in both the Male and Female columns, we know that the gender is Unknown. This process of taking a single category column and decomposing it to many columns of binary values will be accomplished by the OneHotEncoder.

       

      StringIndexer


      A StringIndexer converts categories to numbers. The numbers have a range from 0 to number of categories minus one. The most frequent category gets a number of zero, the second most frequent category gets a number of 1 and so on. We are going to use the code snippets from Preserve index-string correspondence spark string indexer from StackOverFlow.Com to demonstrate what the preceding English means.

      Let's actually create a StringIndexer and use it to map/fit/transform categories to numbers.

      Code Snippet 2

      dataFrame = sqlContext.createDataFrame(
          [(0, "a"), (1, "b"), (2, "b"), (3, "c"), (4, "c"), (5, "c"), (6,'d'), (7,'d'), (8,'d'), (9,'d')],
          ["id", "category"])
      
      dataFrame.collect()
      
      from pyspark.ml.feature import StringIndexer
      stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
      modelStringIndexer = stringIndexer.fit(dataFrame)
      transformedDataFrame = modelStringIndexer.transform(dataFrame)
      transformedDataFrame.collect()
      

      Output of Code Snippet 2

      [Row(id=0, category=u'a', categoryIndex=3.0),
       Row(id=1, category=u'b', categoryIndex=2.0),
       Row(id=2, category=u'b', categoryIndex=2.0),
       Row(id=3, category=u'c', categoryIndex=1.0),
       Row(id=4, category=u'c', categoryIndex=1.0),
       Row(id=5, category=u'c', categoryIndex=1.0),
       Row(id=6, category=u'd', categoryIndex=0.0),
       Row(id=7, category=u'd', categoryIndex=0.0),
       Row(id=8, category=u'd', categoryIndex=0.0),
       Row(id=9, category=u'd', categoryIndex=0.0)]
      

      Notice how d's got 0.0 because they are the most numerous. The letter c's got 1.0 because they are the second most numerous. And so on. The code snippet below will make this more clear.

      Code Snippet 3

      transformedDataFrame.select('category','categoryIndex').distinct().orderBy('categoryIndex').show()
      

      Output of Code Snippet 3

      +--------+-------------+
      |category|categoryIndex|
      +--------+-------------+
      |       d|          0.0|
      |       c|          1.0|
      |       b|          2.0|
      |       a|          3.0|
      +--------+-------------+

      OneHotEncoder


      A OneHotEncoder converts category numbers to binary vectors with at most a single one-value per row. For a true understanding of one-hot encoding, refer to the associated Wikipedia page.

      Next, let's use a OneHotEncoder it to transform the category index that we created earlier to a binary vector.

      Code Snippet 4

      from pyspark.ml.feature import OneHotEncoder
      oneHotEncoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVector")
      oneHotEncodedDataFrame = oneHotEncoder.transform(transformedDataFrame)
      oneHotEncodedDataFrame.show()
      

      Output of Code Snippet 4

      +---+--------+-------------+--------------+
      | id|category|categoryIndex|categoryVector|
      +---+--------+-------------+--------------+
      |  0|       a|          3.0|     (3,[],[])|
      |  1|       b|          2.0| (3,[2],[1.0])|
      |  2|       b|          2.0| (3,[2],[1.0])|
      |  3|       c|          1.0| (3,[1],[1.0])|
      |  4|       c|          1.0| (3,[1],[1.0])|
      |  5|       c|          1.0| (3,[1],[1.0])|
      |  6|       d|          0.0| (3,[0],[1.0])|
      |  7|       d|          0.0| (3,[0],[1.0])|
      |  8|       d|          0.0| (3,[0],[1.0])|
      |  9|       d|          0.0| (3,[0],[1.0])|
      +---+--------+-------------+--------------+

      SparseVector

      The column categoryVector is a SparseVector. It has 3 parts. The first part is the length of the vector. The second part are the indicies which contain values. The third part are the actual values. Below is a code snippet demonstrating this.

      Code Snippet 5

      from pyspark.mllib.linalg import SparseVector
      v1 = SparseVector(5, [0,3], [10,9])
      for x in v1:
        print(x)
      

      Output of Code Snippet 5

      10.0
      0.0
      0.0
      9.0
      0.0
      

      Notice how category a (categoryVector = (3,[],[])) is not included because it makes the vector entries sum to one and hence linearly dependent. The code snippet below will provide a better visual for this.

      Code Snippet 6

      oneHotEncodedDataFrame.select('category','categoryIndex', 'categoryVector').distinct().orderBy('categoryIndex').show()
      

      Output of Code Snippet 6

      +--------+-------------+--------------+
      |category|categoryIndex|categoryVector|
      +--------+-------------+--------------+
      |       d|          0.0| (3,[0],[1.0])|
      |       c|          1.0| (3,[1],[1.0])|
      |       b|          2.0| (3,[2],[1.0])|
      |       a|          3.0|     (3,[],[])|
      +--------+-------------+--------------+
      

      VectorAssembler

      By this point you are probably getting impatient. Luckly, we have just one more item to cover before we get to logistic regression. That one item is the VectorAssembler. A VectorAssembler just concatenates columns together. As usual, we will demonstrate what the words mean via a code snippet.

      Code Snippet 7

      from pyspark.ml.feature import VectorAssembler
      dataFrame_1 = spark.createDataFrame([(1, 2, 3), (4,5,6)], ["a", "b", "c"])
      vectorAssembler = VectorAssembler(inputCols=["a", "b", "c"], outputCol="features")
      dataFrame_2 = vectorAssembler.transform(dataFrame_1)
      dataFrame_2.show()

      Output of Code Snippet 7

      +---+---+---+-------------+
      |  a|  b|  c|     features|
      +---+---+---+-------------+
      |  1|  2|  3|[1.0,2.0,3.0]|
      |  4|  5|  6|[4.0,5.0,6.0]|
      +---+---+---+-------------+

      Logistic Regression


      We have now learned enough Spark to look at a specific problem involving logistic regression. We are going to work through the example provided in the databricks documentation.

      If you are interested in learning about logistic regression, recommend reading "Chapter 6: The Grand daddy of Supervised Artificial Intelligence - Regression (spreadsheet)" of  the book Data Smart: Using Data Science to Transform Information into Insight by John W. Foreman (2013). Personally, I like the book because it provides minimal math and an implementation of logistic regression in a spreadsheet.

      Drop / Create Table
      1. Drop Table
        %sql 
        DROP TABLE IF EXISTS adult
        
        1. Create Table
          %sql
          
          CREATE TABLE adult (
            age               DOUBLE,
            workclass         STRING,
            fnlwgt            DOUBLE,
            education         STRING,
            education_num     DOUBLE,
            marital_status    STRING,
            occupation        STRING,
            relationship      STRING,
            race              STRING,
            sex               STRING,
            capital_gain      DOUBLE,
            capital_loss      DOUBLE,
            hours_per_week    DOUBLE,
            native_country    STRING,
            income            STRING)
          USING com.databricks.spark.csv
          OPTIONS (path "/databricks-datasets/adult/adult.data", header "true")
          
          
        Convert table to a DataFrame
        dataset = spark.table("adult")
        Get a list of columns in original dataset
        cols = dataset.columns
        1. This step has to be done here and not later. Unfortunately, the databricks examples re-uses the variable dataset.
        Perform One Hot Encoding on columns of interest
        from pyspark.ml import Pipeline
        from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
        
        categoricalColumns = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex", "native_country"]
        stages = [] # stages in our Pipeline
        for categoricalCol in categoricalColumns:
          stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
          encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
          # Add stages.  These are not run here, but will run all at once later on.
          stages += [stringIndexer, encoder]
        
        
        Create a StringIndexer on income
        
        
        label_stringIdx = StringIndexer(inputCol = "income", outputCol = "label")
        stages += [label_stringIdx]
        
        
        Combine feature columns into a single vector column using VectorAssembler
        
        
        numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]
        assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
        assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
        stages += [assembler]
        Put data through all of the feature transformations using the stages in the pipeline
        pipeline = Pipeline(stages=stages)
        pipelineModel = pipeline.fit(dataset)
        dataset = pipelineModel.transform(dataset)
        
        
        Keep relevant columns
        
        selectedcols = ["label", "features"] + cols
        dataset = dataset.select(selectedcols)
        display(dataset)
        
        
        Split data into training and test sets
        
        
        (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
        print trainingData.count()
        print testData.count()
        1. Set seed for reproducibility
        Train logistic regression model and then make predictions
        from pyspark.ml.classification import LogisticRegression
        lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)
        lrModel = lr.fit(trainingData)
        predictions = lrModel.transform(testData)
        selected = predictions.select("prediction", "age", "occupation")
        display(selected)
        
        1. Output
          prediction  age occupation
          ----------  --- --------------
          ...
          0           20  Prof-specialty
          1           35  Prof-specialty
          ...
          
          
          1. So, a prediction of 0 means that the person earns <=50K. While a prediction of 1 means that the person earns >50K

          Summary

          In summary, we jumped in by using Python on Spark to address logistic regression. We did not read any tutorials. We did skip steps. However, links are provided in the reference section to fill in the steps if the reader so desires.

          The advantage of jumping in is that you learn by solving a problem. Also, you don't spend weeks or months learning material like that listed in the references below before you can do anything. The disadvantage is that steps are skipped and full understanding of even the provided steps won't be present. It is realized that jumping in is not for everybody. For some people, standard tutorials are the way to begin.

           

          References

          1. Book: Data Smart by John W. Foreman
            1. john-foreman.com
            2. O'Reilly
            3. Chapter 6: The Granddaddy of Supervised Artificial Intelligence - Regression
              1. Text
              2. Spreadsheet
          2. Cartoon from New Yorker Magazine: Does your car have any ide why my car pulled it over?
          3. databricks
          4.  sklearn.linear_model.LogisticRegression
          5. Spark.Apache.Org
          6. StackOverFlow.Com
          7. Wikipedia