Prevent DataFrame.partitionBy() from removing partitioned columns from schema

13,128

Solution 1

I can think of one workaround, which is rather lame, but works.

import spark.implicits._

val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category")
duplicated.write.partitionBy("_type", "_category").parquet(config.outpath)

I'm answering this question in hopes that someone would have a better answer or explanation than what I have (if OP has found a better solution), though, since I have the same question.

Solution 2

In general, Ivan's answer is a fine cludge. BUT...

If you are strictly reading and writing in spark, you can just use the basePath option when reading your data.

https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery

By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths.

Example:

     val dataset = spark
      .read
      .format("parquet")
      .option("basePath", hdfsInputBasePath)
      .load(hdfsInputPath)

Solution 3

I'd like to add a bit more context here and provide PySpark code instead of Scala for those who need it. You need to be careful how you read in the partitioned dataframe if you want to keep the partitioned variables (the details matter). Let's start by writting a partitioned dataframe like this:

df.write.mode("overwrite").partitionBy("Season").parquet("partitioned_parquet/")

To read the whole dataframe back in WITH the partitioning variables included...

path = "partitioned_parquet/"
parquet = spark.read.parquet(path)
parquet.show()

Result:

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
+-----+------+
only showing top 5 rows

Note that if you include an * at end of your path name, the partitioning variables will be dropped.

path = "partitioned_parquet/*"
parquet = spark.read.parquet(path)
parquet.show(5)

Result:

+-----+
|Value|
+-----+
|   71|
|   77|
|   83|
|   54|
|  100|
+-----+
only showing top 5 rows

Now, if you want to read in only portions of the partitioned dataframe, you need to use this method in order to keep your partitioning variables (in this case "Season").

path = "partitioned_parquet/"
dataframe = spark.read.option("basePath", path).parquet(path+'Season=2010/',\
                                                                path+'Season=2011/', \
                                                                path+'Season=2012/')
dataframe.show(5)

Result:

+-----+------+
|Value|Season|
+-----+------+
|   71|  2010|
|   77|  2010|
|   83|  2010|
|   54|  2010|
|  100|  2010|
+-----+------+
only showing top 5 rows

Hope that helps folks!

Share:
13,128

Related videos on Youtube

Michael
Author by

Michael

Updated on September 15, 2022

Comments

  • Michael
    Michael over 1 year

    I am partitioning a DataFrame as follows:

    df.write.partitionBy("type", "category").parquet(config.outpath)
    

    The code gives the expected results (i.e. data partitioned by type & category). However, the "type" and "category" columns are removed from the data / schema. Is there a way to prevent this behaviour?

    • zero323
      zero323 about 8 years
      Isn't that a point? All required data is still encoded in the directory structure so there is no data loss. If you want a some-values-per-file you could try df.repartition("type", "category").write(...) but you won't get nice structure.
    • Michael
      Michael about 8 years
      @zero323: yes, I agree there is no data loss. However, recovering the columns used for partitioning is non-trivial for some use cases. For example, if I want to load the data in pig, how would I recover the type and category columns?
    • zero323
      zero323 about 8 years
      Haven't used Pig in a while. Doesn't ParquetLoader understand the structure out of the box?
    • Michael
      Michael over 7 years
      @zero323: super long delay to your question... No, pig doesn't incorporate the directory structure with the parquet schema.
  • Michael
    Michael over 7 years
    Actually doesn't look that lame to me. Seems like the best approach given the behaviour of partitionBy().
  • aasthetic
    aasthetic almost 5 years
    your answer also taught me a new term for my engineering skills :D