KMeans clustering in PySpark
Solution 1
Since, based on another recent question of yours, I guess you are in your very first steps with Spark clustering (you are even importing sqrt
& array
, without ever using them, probably because it is like that in the docs example), let me offer advice in a more general level rather than in the specific question you are asking here (hopefully also saving you from subsequently opening 3-4 more questions, trying to get your cluster assignments back into your dataframe)...
Since
you have your data already in a dataframe
you want to attach the cluster membership back into your initial dataframe
you have no reason to revert to an RDD and use the (soon to be deprecated) MLlib package; you will do your job much more easily, elegantly, and efficiently using the (now recommended) ML package, which works directly with dataframes.
Step 0 - make some toy data resembling yours:
spark.version
# u'2.2.0'
df = spark.createDataFrame([[0, 33.3, -17.5],
[1, 40.4, -20.5],
[2, 28., -23.9],
[3, 29.5, -19.0],
[4, 32.8, -18.84]
],
["other","lat", "long"])
df.show()
# +-----+----+------+
# |other| lat| long|
# +-----+----+------+
# | 0|33.3| -17.5|
# | 1|40.4| -20.5|
# | 2|28.0| -23.9|
# | 3|29.5| -19.0|
# | 4|32.8|-18.84|
# +-----+----+------+
Step 1 - assemble your features
In contrast to most ML packages out there, Spark ML requires your input features to be gathered in a single column of your dataframe, usually named features
; and it provides a specific method for doing this, VectorAssembler
:
from pyspark.ml.feature import VectorAssembler
vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+
# |other| lat| long| features|
# +-----+----+------+-------------+
# | 0|33.3| -17.5| [33.3,-17.5]|
# | 1|40.4| -20.5| [40.4,-20.5]|
# | 2|28.0| -23.9| [28.0,-23.9]|
# | 3|29.5| -19.0| [29.5,-19.0]|
# | 4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+
As perhaps already guessed, the argument inputCols
serves to tell VectoeAssembler
which particular columns in our dataframe are to be used as features.
Step 2 - fit your KMeans model
from pyspark.ml.clustering import KMeans
kmeans = KMeans(k=2, seed=1) # 2 clusters here
model = kmeans.fit(new_df.select('features'))
select('features')
here serves to tell the algorithm which column of the dataframe to use for clustering - remember that, after Step 1 above, your original lat
& long
features are no more directly used.
Step 3 - transform your initial dataframe to include cluster assignments
transformed = model.transform(new_df)
transformed.show()
# +-----+----+------+-------------+----------+
# |other| lat| long| features|prediction|
# +-----+----+------+-------------+----------+
# | 0|33.3| -17.5| [33.3,-17.5]| 0|
# | 1|40.4| -20.5| [40.4,-20.5]| 1|
# | 2|28.0| -23.9| [28.0,-23.9]| 0|
# | 3|29.5| -19.0| [29.5,-19.0]| 0|
# | 4|32.8|-18.84|[32.8,-18.84]| 0|
# +-----+----+------+-------------+----------+
The last column of the transformed
dataframe, prediction
, shows the cluster assignment - in my toy case, I have ended up with 4 records in cluster #0 and 1 record in cluster #1.
You can further manipulate the transformed
dataframe with select
statements, or even drop
the features
column (which has now fulfilled its function and may be no longer necessary)...
Hopefully you are much closer now to what you actually wanted to achieve in the first place. For extracting cluster statistics etc., another recent answer of mine might be helpful...
Solution 2
Despite my other general answer, and in case you, for whatever reason, must stick with MLlib & RDDs, here is what causes your error using the same toy df
.
When you select
columns from a dataframe to convert to RDD, as you do, the result is an RDD of Rows:
df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]
which is not suitable as an input to MLlib KMeans. You'll need a map
operation for this to work:
df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]
So, your code should be like this:
from pyspark.mllib.clustering import KMeans, KMeansModel
rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]
user3245256
Updated on July 09, 2022Comments
-
user3245256 almost 2 years
I have a spark dataframe 'mydataframe' with many columns. I am trying to run kmeans on only two columns: lat and long (latitude & longitude) using them as simple values). I want to extract 7 clusters based on just those 2 columns and then I want to attach the cluster asignment to my original dataframe. I've tried:
from numpy import array from math import sqrt from pyspark.mllib.clustering import KMeans, KMeansModel # Prepare a data frame with just 2 columns: data = mydataframe.select('lat', 'long') data_rdd = data.rdd # needs to be an RDD data_rdd.cache() # Build the model (cluster the data) clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")
But I am getting an error after a while:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5191.0 failed 4 times, most recent failure: Lost task 1.3 in stage 5191.0 (TID 260738, 10.19.211.69, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last)
I've tried to detach and re-attach the cluster. Same result. What am I doing wrong?
-
user3245256 over 6 yearsDear desertnaut, huge thank you for taking your time and writing the best stackoverflow answer I've ever read. I'll be sure to keep it an excellent source going forward. Yes, you guessed correctly - I would have asked more questions! :) I had no idea I am using some old, depreciated library and I am very glad you showed me the 'right path'. I understood everything in your excellent explanation. One tiny question (more Spark-related than kMeans related): Is this OK - from storage and memory perspective - to produce more and more new dataframes (df, then df_new) - even if df is huge?
-
desertnaut over 6 years@user3245256 standard practice has it to assign your transformed data in new dataframes as you go. In any case, experiment and see...
-
Yamur over 5 yearsgreat adding. One thing,
collect()
return list and you can send dataframe to kmeans training model also. -
desertnaut about 5 yearsWe use
collect
only for the final results; if we could use it here, there would be no reason to bother with Spark whatsoever - we would be far better off with scikit-learn or similar... -
thentangler about 3 yearsAs @desertnaut mentioned, converting to rdd for your ML operations is highly inefficient. That being said, alas, even the
KMeans
method in thepyspark.ml.clustering
library still uses thecollect
function when getting your model outputs. This renders the spark capability useless when applying Kmeans on very large sets of data and all your worker nodes will be idle and only your driver node will be working overtime