Apache Spark: Get number of records per partition

23,995

Solution 1

You can get the number of records per partition like this :

df
  .rdd
  .mapPartitionsWithIndex{case (i,rows) => Iterator((i,rows.size))}
  .toDF("partition_number","number_of_records")
  .show

But this will also launch a Spark Job by itself (because the file must be read by spark to get the number of records).

Spark could may also read hive table statistics, but I don't know how to display those metadata..

Solution 2

I'd use built-in function. It should be as efficient as it gets:

import org.apache.spark.sql.functions.spark_partition_id

df.groupBy(spark_partition_id).count

Solution 3

For future PySpark users:

from pyspark.sql.functions  import spark_partition_id
rawDf.withColumn("partitionId", spark_partition_id()).groupBy("partitionId").count().show()

Solution 4

Spark 1.5 solution :

(sparkPartitionId() exists in org.apache.spark.sql.functions)

import org.apache.spark.sql.functions._ 

df.withColumn("partitionId", sparkPartitionId()).groupBy("partitionId").count.show

as mentioned by @Raphael Roth

mapPartitionsWithIndex is best approach, will work with all version of spark since its RDD based approach

Solution 5

Spark/scala:

val numPartitions = 20000
val a = sc.parallelize(0 until 1e6.toInt, numPartitions )
val l = a.glom().map(_.length).collect()  # get length of each partition
print(l.min, l.max, l.sum/l.length, l.length)  # check if skewed

PySpark:

num_partitions = 20000
a = sc.parallelize(range(int(1e6)), num_partitions)
l = a.glom().map(len).collect()  # get length of each partition
print(min(l), max(l), sum(l)/len(l), len(l))  # check if skewed

The same is possible for a dataframe, not just for an RDD. Just add DF.rdd.glom... into the code above.

Credits: Mike Dusenberry @ https://issues.apache.org/jira/browse/SPARK-17817

Share:
23,995
nilesh1212
Author by

nilesh1212

Updated on July 09, 2022

Comments

  • nilesh1212
    nilesh1212 almost 2 years

    I want to check how can we get information about each partition such as total no. of records in each partition on driver side when Spark job is submitted with deploy mode as a yarn cluster in order to log or print on the console.