How to convert type Row into Vector to feed to the KMeans

11,750

Solution 1

ML

The problem is that you missed the documentation's example, and it's pretty clear that the method train requires a DataFrame with a Vector as features.

To modify your current data's structure you can use a VectorAssembler. In your case it could be something like:

from pyspark.sql.functions import *

vectorAssembler = VectorAssembler(inputCols=["latitude", "longitude"],
                                  outputCol="features")

# For your special case that has string instead of doubles you should cast them first.
expr = [col(c).cast("Double").alias(c) 
        for c in vectorAssembler.getInputCols()]

df2 = df2.select(*expr)
df = vectorAssembler.transform(df2)

Besides, you should also normalize your features using the class MinMaxScaler to obtain better results.

MLLib

In order to achieve this using MLLib you need to use a map function first, to convert all your string values into Double, and merge them together in a DenseVector.

rdd = df2.map(lambda data: Vectors.dense([float(c) for c in data]))

After this point you can train your MLlib's KMeans model using the rdd variable.

Solution 2

I got PySpark 2.3.1 to perform KMeans on a DataFrame as follows:

  1. Write a list of the columns you want to include in the clustering analysis:
feat_cols = ['latitude','longitude']`
  1. You need all of the columns to be numeric values:
expr = [col(c).cast("Double").alias(c) for c in feat_cols]
df2 = df2.select(*expr)
  1. Create your features vector with mllib.linalg.Vectors:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
df3 = assembler.transform(df2).select('features')
  1. You should normalize your features as normalization is not always required, but it rarely hurts (more about this here):
from pyspark.ml.feature import StandardScaler
scaler = StandardScaler(
    inputCol="features",
    outputCol="scaledFeatures",
    withStd=True,
    withMean=False)
scalerModel = scaler.fit(df3)
df4 = scalerModel.transform(df3).drop('features')\
                     .withColumnRenamed('scaledFeatures', 'features')
  1. Turn your DataFrame object df4 into a dense vector RDD:
from pyspark.mllib.linalg import Vectors
data5 = df4.rdd.map(lambda row: Vectors.dense([x for x in row['features']]))
  1. Use the obtained RDD object as input for KMeans training:
from pyspark.mllib.clustering import KMeans
model = KMeans.train(data5, k=3, maxIterations=10)
  1. Example: classify a point p in your vector space:
prediction = model.predict(p)
Share:
11,750
chessosapiens
Author by

chessosapiens

Updated on June 17, 2022

Comments

  • chessosapiens
    chessosapiens almost 2 years

    when i try to feed df2 to kmeans i get the following error

    clusters = KMeans.train(df2, 10, maxIterations=30,
                            runs=10, initializationMode="random")
    

    The error i get:

    Cannot convert type <class 'pyspark.sql.types.Row'> into Vector
    

    df2 is a dataframe created as follow:

    df = sqlContext.read.json("data/ALS3.json")
    df2 = df.select('latitude','longitude')
    
    df2.show()
    
    
         latitude|       longitude|
    
       60.1643075|      24.9460844|
       60.4686748|      22.2774728|
    

    how can i convert this two columns to Vector and feed it to KMeans?