Prevent DataFrame.partitionBy() from removing partitioned columns from schema
Solution 1
I can think of one workaround, which is rather lame, but works.
import spark.implicits._
val duplicated = df.withColumn("_type", $"type").withColumn("_category", $"category")
duplicated.write.partitionBy("_type", "_category").parquet(config.outpath)
I'm answering this question in hopes that someone would have a better answer or explanation than what I have (if OP has found a better solution), though, since I have the same question.
Solution 2
In general, Ivan's answer is a fine cludge. BUT...
If you are strictly reading and writing in spark, you can just use the basePath option when reading your data.
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#partition-discovery
By passing path/to/table to either SparkSession.read.parquet or SparkSession.read.load, Spark SQL will automatically extract the partitioning information from the paths.
Example:
val dataset = spark
.read
.format("parquet")
.option("basePath", hdfsInputBasePath)
.load(hdfsInputPath)
Solution 3
I'd like to add a bit more context here and provide PySpark code instead of Scala for those who need it. You need to be careful how you read in the partitioned dataframe if you want to keep the partitioned variables (the details matter). Let's start by writting a partitioned dataframe like this:
df.write.mode("overwrite").partitionBy("Season").parquet("partitioned_parquet/")
To read the whole dataframe back in WITH the partitioning variables included...
path = "partitioned_parquet/"
parquet = spark.read.parquet(path)
parquet.show()
Result:
+-----+------+
|Value|Season|
+-----+------+
| 71| 2010|
| 77| 2010|
| 83| 2010|
| 54| 2010|
| 100| 2010|
+-----+------+
only showing top 5 rows
Note that if you include an * at end of your path name, the partitioning variables will be dropped.
path = "partitioned_parquet/*"
parquet = spark.read.parquet(path)
parquet.show(5)
Result:
+-----+
|Value|
+-----+
| 71|
| 77|
| 83|
| 54|
| 100|
+-----+
only showing top 5 rows
Now, if you want to read in only portions of the partitioned dataframe, you need to use this method in order to keep your partitioning variables (in this case "Season").
path = "partitioned_parquet/"
dataframe = spark.read.option("basePath", path).parquet(path+'Season=2010/',\
path+'Season=2011/', \
path+'Season=2012/')
dataframe.show(5)
Result:
+-----+------+
|Value|Season|
+-----+------+
| 71| 2010|
| 77| 2010|
| 83| 2010|
| 54| 2010|
| 100| 2010|
+-----+------+
only showing top 5 rows
Hope that helps folks!
Related videos on Youtube
Michael
Updated on September 15, 2022Comments
-
Michael over 1 year
I am partitioning a DataFrame as follows:
df.write.partitionBy("type", "category").parquet(config.outpath)
The code gives the expected results (i.e. data partitioned by type & category). However, the "type" and "category" columns are removed from the data / schema. Is there a way to prevent this behaviour?
-
zero323 about 8 yearsIsn't that a point? All required data is still encoded in the directory structure so there is no data loss. If you want a some-values-per-file you could try
df.repartition("type", "category").write(...)
but you won't get nice structure. -
Michael about 8 years@zero323: yes, I agree there is no data loss. However, recovering the columns used for partitioning is non-trivial for some use cases. For example, if I want to load the data in pig, how would I recover the type and category columns?
-
zero323 about 8 yearsHaven't used Pig in a while. Doesn't
ParquetLoader
understand the structure out of the box? -
Michael over 7 years@zero323: super long delay to your question... No, pig doesn't incorporate the directory structure with the parquet schema.
-
-
Michael over 7 yearsActually doesn't look that lame to me. Seems like the best approach given the behaviour of
partitionBy()
. -
aasthetic almost 5 yearsyour answer also taught me a new term for my engineering skills :D