How to save a partitioned parquet file in Spark 2.1?

38,290

Solution 1

I found a solution! According to Cloudera, is a mapred-site.xml configuration problem (check link below). Also, instead of writing the dataframe as: testDf.write.partitionBy("id", "key").parquet("/path/to/file")

I did it as follows: testDf.write.partitionBy("id", "key").parquet("hdfs://<namenode>:<port>/path/to/file"). You can substitute <namenode> and <port> with the HDFS' masternode name and port, respectively.

Special thanks to @jacek-laskowski, for his valuable contribution.

References:

https://community.cloudera.com/t5/Batch-SQL-Apache-Hive/MKDirs-failed-to-create-file/m-p/36363#M1090

Writing to HDFS in Spark/Scala

Solution 2

Interesting since...well..."it works for me".

As you describe your dataset using SimpleTest case class in Spark 2.1 you're import spark.implicits._ away to have a typed Dataset.

In my case, spark is sql.

In other words, you don't have to create testDataP and testDf (using sql.createDataFrame).

import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")

In another terminal (after saving to /tmp/testDf directory):

$ tree /tmp/testDf/
/tmp/testDf/
├── _SUCCESS
├── id=simple
│   ├── key=1
│   │   └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│   ├── key=2
│   │   └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│   └── key=3
│       └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
    ├── key=1
    │   └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    ├── key=2
    │   └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
    └── key=3
        └── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet

8 directories, 7 files
Share:
38,290
Daniel Lopez
Author by

Daniel Lopez

Updated on July 05, 2022

Comments

  • Daniel Lopez
    Daniel Lopez almost 2 years

    I am trying to test how to write data in HDFS 2.7 using Spark 2.1. My data is a simple sequence of dummy values and the output should be partitioned by the attributes: id and key.

     // Simple case class to cast the data
     case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)
    
     // Actual data to be stored
     val testData = Seq(
        SimpleTest("test", 12, 13.5.toFloat, 1),
        SimpleTest("test", 12, 13.5.toFloat, 2),
        SimpleTest("test", 12, 13.5.toFloat, 3),
        SimpleTest("simple", 12, 13.5.toFloat, 1),
        SimpleTest("simple", 12, 13.5.toFloat, 2),
        SimpleTest("simple", 12, 13.5.toFloat, 3)
     )
    
     // Spark's workflow to distribute, partition and store
     // sc and sql are the SparkContext and SparkSession, respectively
     val testDataP = sc.parallelize(testData, 6)
     val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
     testDf.write.partitionBy("id", "key").parquet("/path/to/file")
    

    I am expecting to get the following tree structure in HDFS:

    - /path/to/file
       |- /id=test/key=1/part-01.parquet
       |- /id=test/key=2/part-02.parquet
       |- /id=test/key=3/part-03.parquet
       |- /id=simple/key=1/part-04.parquet
       |- /id=simple/key=2/part-05.parquet
       |- /id=simple/key=3/part-06.parquet
    

    But when I run the previous code I get the following output:

    /path/to/file/id=/key=24/
     |-/part-01.parquet
     |-/part-02.parquet
     |-/part-03.parquet
     |-/part-04.parquet
     |-/part-05.parquet
     |-/part-06.parquet
    

    I do not know if there is something wrong in the code, or is there something else that Spark is doing.

    I'm executing spark-submit as follows:

    spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=uncompressed --conf spark.sql.parquet.filterPushdown=true --class myClass myFatJar.jar