Using Spark ML's OneHotEncoder on multiple columns

10,313

Spark >= 3.0:

In Spark 3.0 OneHotEncoderEstimator has been renamed to OneHotEncoder:

import org.apache.spark.ml.feature.{OneHotEncoder, OneHotEncoderModel}

val encoder = new OneHotEncoder()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))

Spark >= 2.3

Spark 2.3 introduced new classes OneHotEncoderEstimator, OneHotEncoderModel, which required fitting even if used outside Pipeline, and operate on multiple columns at the same time.

import org.apache.spark.ml.feature.{OneHotEncoderEstimator, OneHotEncoderModel}

val encoder = new OneHotEncoderEstimator()
  .setInputCols(indexColumns)
  .setOutputCols(indexColumns map (name => s"${name}_vec"))


encoder.fit(df_indexed).transform(df_indexed)

Spark < 2.3

Even if transformers you use don't require fitting you have to use fit method to create a PipelineModel which can be used to transform data.

one_hot_pipeline.fit(df_indexed).transform(df_indexed)

On a side note you can combine indexing and encoding into a single Pipeline:

val pipeline = new Pipeline()
  .setStages(index_transformers ++ one_hot_encoders)

val model = pipeline.fit(df)
model.transform(df)

Edit:

Error you see means that one of your columns contains an empty String. It is accepted by indexer but cannot be used for encoding. Depending on you requirements you can drop these or use a dummy label. Unfortunately you cannot use NULLs until SPARK-11569) is resolved.

Share:
10,313
Michael Discenza
Author by

Michael Discenza

Data Scientist/Data Engineer at RUN Ads

Updated on June 19, 2022

Comments

  • Michael Discenza
    Michael Discenza almost 2 years

    I've been able to create a pipeline that will allow me to index multiple string columns at once, but I am getting stuck encoding them, because unlike indexing, the encoder is not an estimator so I never call fit according to the OneHotEncoder example in the docs.

    import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, 
    
    OneHotEncoder}
    import org.apache.spark.ml.Pipeline
    
    val data = sqlContext.read.parquet("s3n://map2-test/forecaster/intermediate_data")
    
    val df = data.select("win","bid_price","domain","size", "form_factor").na.drop()
    
    
    //indexing columns
    val stringColumns = Array("domain","size", "form_factor")
    val index_transformers: Array[org.apache.spark.ml.PipelineStage] = stringColumns.map(
      cname => new StringIndexer()
        .setInputCol(cname)
        .setOutputCol(s"${cname}_index")
    )
    
    // Add the rest of your pipeline like VectorAssembler and algorithm
    val index_pipeline = new Pipeline().setStages(index_transformers)
    val index_model = index_pipeline.fit(df)
    val df_indexed = index_model.transform(df)
    
    
    //encoding columns
    val indexColumns  = df_indexed.columns.filter(x => x contains "index")
    val one_hot_encoders: Array[org.apache.spark.ml.PipelineStage] = indexColumns.map(
        cname => new OneHotEncoder()
         .setInputCol(cname)
         .setOutputCol(s"${cname}_vec")
    )
    
    
    
    val one_hot_pipeline = new Pipeline().setStages(one_hot_encoders)
    val df_encoded = one_hot_pipeline.transform(df_indexed)
    

    The OneHotEncoder object doesn't have a fit method so putting it in the same pipeline as the indexers will not work- it throws an error when I call fit on the pipeline. I can also not call transform on the pipeline that I made with the array of pipeline stages, one_hot_encoders.

    I have not found a good solution for using the OneHotEncoder without individually creating and calling transform on that transforming itself for all of the columns I want to encode