KMeans clustering in PySpark

31,438

Solution 1

Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt & array, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...

Since

  1. you have your data already in a dataframe

  2. you want to attach the cluster membership back into your initial dataframe

you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.

Step 0 - make some toy data resembling yours:

spark.version
# u'2.2.0'

df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()
# +-----+----+------+
# |other| lat|  long|
# +-----+----+------+
# |    0|33.3| -17.5|
# |    1|40.4| -20.5| 
# |    2|28.0| -23.9|
# |    3|29.5| -19.0|
# |    4|32.8|-18.84|
# +-----+----+------+

Step 1 - assemble your features

In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features; and it provides a specific method for doing this, VectorAssembler:

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+ 
# |other| lat|  long|     features|
# +-----+----+------+-------------+
# |    0|33.3| -17.5| [33.3,-17.5]|
# |    1|40.4| -20.5| [40.4,-20.5]|
# |    2|28.0| -23.9| [28.0,-23.9]| 
# |    3|29.5| -19.0| [29.5,-19.0]|
# |    4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+ 

As perhaps already guessed, the argument inputCols serves to tell VectoeAssembler which particular columns in our dataframe are to be used as features.

Step 2 - fit your KMeans model

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

select('features') here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat & long features are no more directly used.

Step 3 - transform your initial dataframe to include cluster assignments

transformed = model.transform(new_df)
transformed.show()    
# +-----+----+------+-------------+----------+ 
# |other| lat|  long|     features|prediction|
# +-----+----+------+-------------+----------+
# |    0|33.3| -17.5| [33.3,-17.5]|         0| 
# |    1|40.4| -20.5| [40.4,-20.5]|         1|
# |    2|28.0| -23.9| [28.0,-23.9]|         0|
# |    3|29.5| -19.0| [29.5,-19.0]|         0|
# |    4|32.8|-18.84|[32.8,-18.84]|         0|
# +-----+----+------+-------------+----------+

The last column of the transformed dataframe, prediction, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.

You can further manipulate the transformed dataframe with select statements, or even drop the features column (which has now fulfilled its function and may be no longer necessary)...

Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful...

Solution 2

Despite my other general answer, and in case you, for whatever reason, must stick with MLlib & RDDs, here is what causes your error using the same toy df.

When you select columns from a dataframe to convert to RDD, as you do, the result is an RDD of Rows:

df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]

which is not suitable as an input to MLlib KMeans. You'll need a map operation for this to work:

df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]

So, your code should be like this:

from pyspark.mllib.clustering import KMeans, KMeansModel

rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]
Share:
31,438
user3245256
Author by

user3245256

Updated on July 09, 2022

Comments

  • user3245256
    user3245256 almost 2 years

    I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:

    from numpy import array
    from math import sqrt
    from pyspark.mllib.clustering import KMeans, KMeansModel
    
    # Prepare a data frame with just 2 columns:
    data = mydataframe.select('lat', 'long')
    data_rdd = data.rdd  # needs to be an RDD
    data_rdd.cache()
    
    # Build the model (cluster the data)
    clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
    

    But I am getting an error after a while:

    org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)

    I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?

  • user3245256
    user3245256 over 6 years
    Dear desertnaut, huge thank you for taking your time and writing the best stackoverflow answer I've ever read. I'll be sure to keep it an excellent source going forward. Yes, you guessed correctly - I would have asked more questions! :) I had no idea I am using some old, depreciated library and I am very glad you showed me the 'right path'. I understood everything in your excellent explanation. One tiny question (more Spark-related than kMeans related): Is this OK - from storage and memory perspective - to produce more and more new dataframes (df, then df_new) - even if df is huge?
  • desertnaut
    desertnaut over 6 years
    @user3245256 standard practice has it to assign your transformed data in new dataframes as you go. In any case, experiment and see...
  • Yamur
    Yamur over 5 years
    great adding. One thing, collect() return list and you can send dataframe to kmeans training model also.
  • desertnaut
    desertnaut about 5 years
    We use collect only for the final results; if we could use it here, there would be no reason to bother with Spark whatsoever - we would be far better off with scikit-learn or similar...
  • thentangler
    thentangler about 3 years
    As @desertnaut mentioned, converting to rdd for your ML operations is highly inefficient. That being said, alas, even the KMeans method in the pyspark.ml.clustering library still uses the collect function when getting your model outputs. This renders the spark capability useless when applying Kmeans on very large sets of data and all your worker nodes will be idle and only your driver node will be working overtime