Best way to get the max value in a Spark dataframe column

287,986

Solution 1

>df1.show()
+-----+--------------------+--------+----------+-----------+
|floor|           timestamp|     uid|         x|          y|
+-----+--------------------+--------+----------+-----------+
|    1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418|
|    1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393|
|    1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585|
|    1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073|

>row1 = df1.agg({"x": "max"}).collect()[0]
>print row1
Row(max(x)=110.33613)
>print row1["max(x)"]
110.33613

The answer is almost the same as method3. but seems the "asDict()" in method3 can be removed

Solution 2

Max value for a particular column of a dataframe can be achieved by using -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

Solution 3

Remark: Spark is intended to work on Big Data - distributed computing. The size of the example DataFrame is very small, so the order of real-life examples can be altered with respect to the small example.

Slowest: Method_1, because .describe("A") calculates min, max, mean, stddev, and count (5 calculations over the whole column).

Medium: Method_4, because, .rdd (DF to RDD transformation) slows down the process.

Faster: Method_3 ~ Method_2 ~ Method_5, because the logic is very similar, so Spark's catalyst optimizer follows very similar logic with minimal number of operations (get max of a particular column, collect a single-value dataframe; .asDict() adds a little extra-time comparing 2, 3 vs. 5)

import pandas as pd
import time

time_dict = {}

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
#--  For bigger/realistic dataframe just uncomment the following 3 lines
#lst = list(np.random.normal(0.0, 100.0, 100000))
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst})
#dfff = self.sqlContext.createDataFrame(pdf)

tic1 = int(round(time.time() * 1000))
# Method 1: Use describe()
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A'])
tac1 = int(round(time.time() * 1000))
time_dict['m1']= tac1 - tic1
print (max_val)

tic2 = int(round(time.time() * 1000))
# Method 2: Use SQL
dfff.registerTempTable("df_table")
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval']
tac2 = int(round(time.time() * 1000))
time_dict['m2']= tac2 - tic2
print (max_val)

tic3 = int(round(time.time() * 1000))
# Method 3: Use groupby()
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)']
tac3 = int(round(time.time() * 1000))
time_dict['m3']= tac3 - tic3
print (max_val)

tic4 = int(round(time.time() * 1000))
# Method 4: Convert to RDD
max_val = dfff.select("A").rdd.max()[0]
tac4 = int(round(time.time() * 1000))
time_dict['m4']= tac4 - tic4
print (max_val)

tic5 = int(round(time.time() * 1000))
# Method 5: Use agg()
max_val = dfff.agg({"A": "max"}).collect()[0][0]
tac5 = int(round(time.time() * 1000))
time_dict['m5']= tac5 - tic5
print (max_val)

print time_dict

Result on an edge-node of a cluster in milliseconds (ms):

small DF (ms): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}

bigger DF (ms): {'m1': 10260, 'm2': 452, 'm3': 465, 'm4': 916, 'm5': 373}

Solution 4

Another way of doing it:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX

On my data, I got this benchmarks:

df.select(f.max(f.col("A")).alias("MAX")).limit(1).collect()[0].MAX
CPU times: user 2.31 ms, sys: 3.31 ms, total: 5.62 ms
Wall time: 3.7 s

df.select("A").rdd.max()[0]
CPU times: user 23.2 ms, sys: 13.9 ms, total: 37.1 ms
Wall time: 10.3 s

df.agg({"A": "max"}).collect()[0][0]
CPU times: user 0 ns, sys: 4.77 ms, total: 4.77 ms
Wall time: 3.75 s

All of them give the same answer

Solution 5

The below example shows how to get the max value in a Spark dataframe column.

from pyspark.sql.functions import max

df = sql_context.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
df.show()
+---+---+
|  A|  B|
+---+---+
|1.0|4.0|
|2.0|5.0|
|3.0|6.0|
+---+---+

result = df.select([max("A")]).show()
result.show()
+------+
|max(A)|
+------+
|   3.0|
+------+

print result.collect()[0]['max(A)']
3.0

Similarly min, mean, etc. can be calculated as shown below:

from pyspark.sql.functions import mean, min, max

result = df.select([mean("A"), min("A"), max("A")])
result.show()
+------+------+------+
|avg(A)|min(A)|max(A)|
+------+------+------+
|   2.0|   1.0|   3.0|
+------+------+------+
Share:
287,986

Related videos on Youtube

xenocyon
Author by

xenocyon

Updated on September 15, 2021

Comments

  • xenocyon
    xenocyon over 2 years

    I'm trying to figure out the best way to get the largest value in a Spark dataframe column.

    Consider the following example:

    df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"])
    df.show()
    

    Which creates:

    +---+---+
    |  A|  B|
    +---+---+
    |1.0|4.0|
    |2.0|5.0|
    |3.0|6.0|
    +---+---+
    

    My goal is to find the largest value in column A (by inspection, this is 3.0). Using PySpark, here are four approaches I can think of:

    # Method 1: Use describe()
    float(df.describe("A").filter("summary = 'max'").select("A").first().asDict()['A'])
    
    # Method 2: Use SQL
    df.registerTempTable("df_table")
    spark.sql("SELECT MAX(A) as maxval FROM df_table").first().asDict()['maxval']
    
    # Method 3: Use groupby()
    df.groupby().max('A').first().asDict()['max(A)']
    
    # Method 4: Convert to RDD
    df.select("A").rdd.max()[0]
    

    Each of the above gives the right answer, but in the absence of a Spark profiling tool I can't tell which is best.

    Any ideas from either intuition or empiricism on which of the above methods is most efficient in terms of Spark runtime or resource usage, or whether there is a more direct method than the ones above?

    • zero323
      zero323 over 8 years
      Methods 2 and 3 are equivalent and use identical physical and optimized logical plans. Method 4 applies reduce with max on rdd. It can be slower than operating directly on a DataFrame. Method 1 is more or less equivalent to 2 and 3.
    • desertnaut
      desertnaut over 8 years
      @zero323 What about df.select(max("A")).collect()[0].asDict()['max(A)']? Looks equivalent to Method 2 while more compact, and also more intuitive that Method 3.
    • Danylo Zherebetskyy
      Danylo Zherebetskyy about 6 years
      - The slowest is the method 4, because you do DF to RDD conversion of the whole column and then extract max value;
  • jibiel
    jibiel over 6 years
    can someone explain why collect()[0] is needed?
  • Jason Wolosonovich
    Jason Wolosonovich about 6 years
    @jibiel collect() returns a list (in this case with a single item), so you need to access the first (only) item in the list
  • Vyom Shrivastava
    Vyom Shrivastava over 5 years
    Make sure you have the correct imports, You need to import the following: from pyspark.sql.functions import max The max we use here is the pySpark sql function not the python max It is better if you use use alias for it from pyspark.sql.functions import max as mx
  • Aliaxander
    Aliaxander about 5 years
    @Burt head() can be used instead if collect()[0].
  • Burt
    Burt about 5 years
    @Aliaxander It's been a bit long. Don't have the code and Spark installed anymore.
  • omnisius
    omnisius almost 5 years
    I prefer your solution to the accepted solution. Adding two "[0]" gives result only
  • Chris Koester
    Chris Koester over 4 years
    While .collect()[0] works, it's probably safer to use .first()[0]. By definition, collect() will "Return all the elements of the dataset as an array at the driver program.", which is a single machine. If you get the syntax wrong you could end up using an excessive amount of memory.
  • MegaIng
    MegaIng almost 4 years
    Please add a bit of context and explanation around your solution.
  • Ulf Aslak
    Ulf Aslak over 3 years
    Agree. I'm new to pyspark (old to Python) and this is more intuitive.
  • r_hudson
    r_hudson about 3 years
    extending on this answer - if you've NaN's following will work: df.select('A').dropna().select([max('A')])
  • Chris H.
    Chris H. over 2 years
    "df.limit(1).collect()[0]" can be replaced by "df.first()"