Spark 1.6: filtering DataFrames generated by describe()

14,660

Solution 1

I have considered a toy dataset I had containing some health disease data

val stddev_tobacco = rawData.describe().rdd.map{ 
    case r : Row => (r.getAs[String]("summary"),r.get(1))
}.filter(_._1 == "stddev").map(_._2).collect

Solution 2

You can select from the dataframe:

from pyspark.sql.functions import mean, min, max
df.select([mean('uniform'), min('uniform'), max('uniform')]).show()
+------------------+-------------------+------------------+
|      AVG(uniform)|       MIN(uniform)|      MAX(uniform)|
+------------------+-------------------+------------------+
|0.5215336029384192|0.19657711634539565|0.9970412477032209|
+------------------+-------------------+------------------+

You can also register it as a table and query the table:

val t = x.describe()
t.registerTempTable("dt")

%sql 
select * from dt

Solution 3

Another option would be to use selectExpr() which also runs optimized, e.g. to obtain the min:

myDataFrame.selectExpr('MIN(count)').head()[0]

Solution 4

myDataFrame.describe().filter($"summary"==="stddev").show()

This worked quite nicely on Spark 2.3.0

Share:
14,660
Rami
Author by

Rami

I am a Data Scientist with experience in Machine Learning, Big Data Analytics and Computer Vision. I am currently working on advanced Machine Learning technologies for Mobile Operators Subscribers Analytics to optimise advertisement, recommendation and costumer care services. Co-founder and organiser of the Lifelogging@Dublin Meetup group, I am an active long-term Lifelogger with particular interest in human behaviour tracking and analytics.

Updated on July 25, 2022

Comments

  • Rami
    Rami almost 2 years

    The problem arises when I call describe function on a DataFrame:

    val statsDF = myDataFrame.describe()
    

    Calling describe function yields the following output:

    statsDF: org.apache.spark.sql.DataFrame = [summary: string, count: string]
    

    I can show statsDF normally by calling statsDF.show()

    +-------+------------------+
    |summary|             count|
    +-------+------------------+
    |  count|             53173|
    |   mean|104.76128862392568|
    | stddev|3577.8184333911513|
    |    min|                 1|
    |    max|            558407|
    +-------+------------------+
    

    I would like now to get the standard deviation and the mean from statsDF, but when I am trying to collect the values by doing something like:

    val temp = statsDF.where($"summary" === "stddev").collect()
    

    I am getting Task not serializable exception.

    I am also facing the same exception when I call:

    statsDF.where($"summary" === "stddev").show()
    

    It looks like we cannot filter DataFrames generated by describe() function?