PCA Analysis in PySpark

19,007

Spark >= 1.5.0

Although PySpark 1.5 introduces distributed data structures (pyspark.mllib.linalg.distributed) it looks like API is rather limited and there is no implementation of the computePrincipalComponents method.

It is possible to use either from pyspark.ml.feature.PCA or pyspark.mllib.feature.PCA though. In the first case an expected input is a data frame with vector column:

from pyspark.ml.feature import PCA as PCAml
from pyspark.ml.linalg import Vectors  # Pre 2.0 pyspark.mllib.linalg

df = sqlContext.createDataFrame([
   (Vectors.dense([1, 2, 0]),),
   (Vectors.dense([2, 0, 1]),),
   (Vectors.dense([0, 1, 0]),)], ("features", ))

pca = PCAml(k=2, inputCol="features", outputCol="pca")
model = pca.fit(df)
transformed = model.transform(df)

In Spark 2.0 or later you should use pyspark.ml.linalg.Vector in place of pyspark.mllib.linalg.Vector.

For mllib version you'll need a RDD of Vector:

from pyspark.mllib.feature import PCA as PCAmllib

rdd = sc.parallelize([
    Vectors.dense([1, 2, 0]),
    Vectors.dense([2, 0, 1]),
    Vectors.dense([0, 1, 0])])

model = PCAmllib(2).fit(rdd)
transformed = model.transform(rdd)

Spark < 1.5.0

PySpark <= 1.4.1 doesn't support distributed data structures yet so there is no built-in method to compute PCA. If input matrix is relatively thin you can compute covariance matrix in a distributed manner, collect the results and perform eigendecomposition locally on a driver.

Order of operations is more or less like the one bellow. Distributed steps are followed by a name of the operation, local by "*" and optional method.

  1. Create RDD[Vector] where each element is a single row from an input matrix. You can use numpy.ndarray for each row (prallelize)
  2. Compute column-wise statistics (reduce)
  3. Use results from 2. to center the matrix (map)
  4. Compute outer product for each row (map outer)
  5. Sum results to obtain covariance matrix (reduce +)
  6. Collect and compute eigendecomposition * (numpy.linalg.eigh)
  7. Choose top-n eigenvectors *
  8. Project the data (map)

Regarding Sklearn. You can use NumPy (it is already in use in Mllib), SciPy, Scikit locally on a driver or a worker the same way as usual.

Share:
19,007
lapolonio
Author by

lapolonio

Updated on June 14, 2022

Comments

  • lapolonio
    lapolonio almost 2 years

    Looking at http://spark.apache.org/docs/latest/mllib-dimensionality-reduction.html. The examples seem to only contain Java and Scala.

    Does Spark MLlib support PCA analysis for Python? If so please point me to an example. If not, how to combine Spark with scikit-learn?

  • Mehdi LAMRANI
    Mehdi LAMRANI over 6 years
    WHat about Spark > 2 ? Syntax seems to have changed
  • iratelilkid
    iratelilkid about 3 years
    @MehdiLAMRANI it worked for me. I am using databricks
  • iratelilkid
    iratelilkid about 3 years
    have a question for @zero323 How do I apply to the actual data frame? Any help would be appreciated thanks.
  • 3nomis
    3nomis over 2 years
    I still haven't understood if the PCA library automatically standardazies the data before performing PCA. Anyone knows?