Encode and assemble multiple features in PySpark

29,800

Spark >= 2.3, >= 3.0

Since Spark 2.3 OneHotEncoder is deprecated in favor of OneHotEncoderEstimator. If you use a recent release please modify encoder code

from pyspark.ml.feature import OneHotEncoderEstimator

encoder = OneHotEncoderEstimator(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

In Spark 3.0 this variant has been renamed to OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(
    inputCols=["gender_numeric"],  
    outputCols=["gender_vector"]
)

Additionally StringIndexer has been extended to support multiple input columns:

StringIndexer(inputCols=["gender"], outputCols=["gender_numeric"])

Spark < 2.3

Well, you can write an UDF but why would you? There are already quite a few tools designed to handle this category of tasks:

from pyspark.sql import Row
from pyspark.ml.linalg import DenseVector

row = Row("gender", "foo", "bar")

df = sc.parallelize([
  row("0", 3.0, DenseVector([0, 2.1, 1.0])),
  row("1", 1.0, DenseVector([0, 1.1, 1.0])),
  row("1", -1.0, DenseVector([0, 3.4, 0.0])),
  row("0", -3.0, DenseVector([0, 4.1, 0.0]))
]).toDF()

First of all StringIndexer.

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="gender", outputCol="gender_numeric").fit(df)
indexed_df = indexer.transform(df)
indexed_df.drop("bar").show()

## +------+----+--------------+
## |gender| foo|gender_numeric|
## +------+----+--------------+
## |     0| 3.0|           0.0|
## |     1| 1.0|           1.0|
## |     1|-1.0|           1.0|
## |     0|-3.0|           0.0|
## +------+----+--------------+

Next OneHotEncoder:

from pyspark.ml.feature import OneHotEncoder

encoder = OneHotEncoder(inputCol="gender_numeric", outputCol="gender_vector")
encoded_df = encoder.transform(indexed_df)
encoded_df.drop("bar").show()

## +------+----+--------------+-------------+
## |gender| foo|gender_numeric|gender_vector|
## +------+----+--------------+-------------+
## |     0| 3.0|           0.0|(1,[0],[1.0])|
## |     1| 1.0|           1.0|    (1,[],[])|
## |     1|-1.0|           1.0|    (1,[],[])|
## |     0|-3.0|           0.0|(1,[0],[1.0])|
## +------+----+--------------+-------------+

VectorAssembler:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["gender_vector", "bar", "foo"], outputCol="features")

encoded_df_with_indexed_bar = (vector_indexer
    .fit(encoded_df)
    .transform(encoded_df))

final_df = assembler.transform(encoded_df)

If bar contained categorical variables you could use VectorIndexer to set required metadata:

from pyspark.ml.feature import VectorIndexer

vector_indexer = VectorIndexer(inputCol="bar", outputCol="bar_indexed")

but it is not the case here.

Finally you can wrap all of that using pipelines:

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[indexer, encoder, vector_indexer, assembler])
model = pipeline.fit(df)
transformed = model.transform(df)

Arguably it is much robust and clean approach than writing everything from scratch. There are some caveats especially when you need consistent encoding between different datasets. You can read more in the official documentation for StringIndexer and VectorIndexer.

Another way to get a comparable output is RFormula which:

RFormula produces a vector column of features and a double or string column of label. Like when formulas are used in R for linear regression, string input columns will be one-hot encoded, and numeric columns will be cast to doubles. If the label column is of type string, it will be first transformed to double with StringIndexer. If the label column does not exist in the DataFrame, the output label column will be created from the specified response variable in the formula.

from pyspark.ml.feature import RFormula

rf = RFormula(formula="~ gender +  bar + foo - 1")
final_df_rf = rf.fit(df).transform(df)

As you can see it is much more concise, but harder to compose doesn't allow much customization. Nevertheless the result for a simple pipeline like this one will be identical:

final_df_rf.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+


final_df.select("features").show(4, False)

## +----------------------+
## |features              |
## +----------------------+
## |[1.0,0.0,2.1,1.0,3.0] |
## |[0.0,0.0,1.1,1.0,1.0] |
## |(5,[2,4],[3.4,-1.0])  |
## |[1.0,0.0,4.1,0.0,-3.0]|
## +----------------------+

Regarding your questions:

make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose)

It is just an UDF like any other. Make sure you use supported types and beyond that everything should work just fine.

take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import StructType, StructField

schema = StructType([StructField("features", VectorUDT(), True)])
row = Row("features")
result.map(lambda x: row(DenseVector(x))).toDF(schema)

Note:

For Spark 1.x replace pyspark.ml.linalg with pyspark.mllib.linalg.

Share:
29,800

Related videos on Youtube

moustachio
Author by

moustachio

I'm a PhD student in cognitive science and psychology at Indiana University. My research focuses on human decision making in Web environments (especially in tagging systems), and has forced me to learn a lot of programming skills in relatively short time. I'm far from a pro, but here I am (and hoping I can answer a few questions on occasion and not just ask them!).

Updated on July 09, 2022

Comments

  • moustachio
    moustachio over 1 year

    I have a Python class that I'm using to load and process some data in Spark. Among various things I need to do, I'm generating a list of dummy variables derived from various columns in a Spark dataframe. My problem is that I'm not sure how to properly define a User Defined Function to accomplish what I need.

    I do currently have a method that, when mapped over the underlying dataframe RDD, solves half the problem (remember that this is a method in a larger data_processor class):

    def build_feature_arr(self,table):
        # this dict has keys for all the columns for which I need dummy coding
        categories = {'gender':['1','2'], ..}
    
        # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file
        if table == 'users':
            iter_over = self.config.dyadic_features_to_include
        elif table == 'activty':
            iter_over = self.config.user_features_to_include
    
        def _build_feature_arr(row):
            result = []
            row = row.asDict()
            for col in iter_over:
                column_value = str(row[col]).lower()
                cats = categories[col]
                result += [1 if column_value and cat==column_value else 0 for cat in cats]
            return result
        return _build_feature_arr
    

    Essentially what this does is, for the specified dataframe, takes the categorical variable values for the specified columns, and returns a list of the values of these new dummy variables. That means the following code:

    data = data_processor(init_args)
    result = data.user_data.rdd.map(self.build_feature_arr('users'))
    

    returns something like:

    In [39]: result.take(10)
    Out[39]:
    [[1, 0, 0, 0, 1, 0],
     [1, 0, 0, 1, 0, 0],
     [1, 0, 0, 0, 0, 0],
     [1, 0, 1, 0, 0, 0],
     [1, 0, 0, 1, 0, 0],
     [1, 0, 0, 1, 0, 0],
     [0, 1, 1, 0, 0, 0],
     [1, 0, 1, 1, 0, 0],
     [1, 0, 0, 1, 0, 0],
     [1, 0, 0, 0, 0, 1]]
    

    This is exactly what I want in terms of generating the list of dummy variables I want, but here's my question: How can I either (a) make a UDF with similar functionality that I can use in a Spark SQL query (or some other way, I suppose), or (b) take the RDD resulting from the map described above and add it as a new column to the user_data dataframe?

    Either way, what I need to do is generate a new dataframe containing the columns from user_data, along with a new column (let's call it feature_array) containing the output of the function above (or something functionally equivalent).

  • zero323
    zero323 over 7 years
    @DavidArenburg In this particular context it is because OP wants to obtain dummy variables (like model.matrix in R). Most likely to train some type of linear model. Rish explanation - string indexer kind of creates factor-like column from strings, one hot calls model.matrix :)
  • EnriqueH
    EnriqueH over 6 years
    thanks @zero323! Just one note: from Spark 2.0+ from pyspark.mllib.linalg import DenseVector should be replaced with from pyspark.ml.linalg import DenseVector, otherwise you might get error of types in the VectorIndexer stage
  • mathopt
    mathopt over 6 years
    I have a question....if I run a randomforest_Classifier on this data I would get the random forest leaves in terms of numbers(because of indexing). How can I tie it back to the original descriptions (i.e english text) in a neat way. For example the random forest classifier does not have meatadata and it becomes a difficult task. I have a vague idea that I have to use something like IndexToString() but I am not sure how to use it
  • Michelle Owen
    Michelle Owen about 6 years
    @zero323 can you explain how to read the gender_vector after oneHotEncoding in your example?
  • zero323
    zero323 about 6 years
    @MichelleOwen What do you mean by read? If it is about mapping / interpretation then it is all about metadata This stackoverflow.com/a/35305314/1560062 and this github.com/awesome-spark/spark-gotchas/blob/… should shed some light.
  • Michelle Owen
    Michelle Owen about 6 years
    @zero323 thanks. I already figured out from another post. the gender_vector is in sparse form.
  • gannawag
    gannawag over 5 years
    What is the best way to do this if I have hundreds of columns to encode? Is there anything faster than just looping over the column names?
  • Mario
    Mario about 2 years
    @zero323 May I kindly draw your attention to a similar question for finding a quick solution.