Reading DataFrame from partitioned parquet file

86,210

Solution 1

sqlContext.read.parquet can take multiple paths as input. If you want just day=5 and day=6, you can simply add two paths like:

val dataframe = sqlContext
      .read.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
                    "file:///your/path/data=jDD/year=2015/month=10/day=6/")

If you have folders under day=X, like say country=XX, country will automatically be added as a column in the dataframe.

EDIT: As of Spark 1.6 one needs to provide a "basepath"-option in order for Spark to generate columns automatically. In Spark 1.6.x the above would have to be re-written like this to create a dataframe with the columns "data", "year", "month" and "day":

val dataframe = sqlContext
     .read
     .option("basePath", "file:///your/path/")
     .parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/", 
                    "file:///your/path/data=jDD/year=2015/month=10/day=6/")

Solution 2

If you want to read for multiple days, for example day = 5 and day = 6 and want to mention the range in the path itself, wildcards can be used:

val dataframe = sqlContext
  .read
  .parquet("file:///your/path/data=jDD/year=2015/month=10/day={5,6}/*")

Wildcards can also be used to specify a range of days:

val dataframe = sqlContext
  .read
  .parquet("file:///your/path/data=jDD/year=2015/month=10/day=[5-10]/*")

This matches all days from 5 to 10.

Solution 3

you need to provide mergeSchema = true option. like mentioned below (this is from 1.6.0):

val dataframe = sqlContext.read.option("mergeSchema", "true").parquet("file:///your/path/data=jDD")

This will read all the parquet files into dataframe and also creates columns year, month and day in the dataframe data.

Ref: https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#schema-merging

Share:
86,210
WoodChopper
Author by

WoodChopper

Updated on September 23, 2021

Comments

  • WoodChopper
    WoodChopper over 2 years

    How to read partitioned parquet with condition as dataframe,

    this works fine,

    val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*")
    

    Partition is there for day=1 to day=30 is it possible to read something like(day = 5 to 6) or day=5,day=6,

    val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*")
    

    If I put * it gives me all 30 days data and it too big.

  • WoodChopper
    WoodChopper over 8 years
    First thanks for response, I was looking for more simple way. In case some 20 days as subset this way will be kind of difficult. I would be filtering often to check the data accuracy.
  • Glennie Helles Sindholt
    Glennie Helles Sindholt over 8 years
    Then why not simply do val dataframe = sqlContext.read.parquet("file:///your/path/data=jDD/year=201‌​5/month=10/")? day` is added as a column in the dataframe, which you can then filter on.
  • WoodChopper
    WoodChopper over 8 years
    Actually, it very huge data running. Data is from 2007 to 2015. On an average 5 billion row of raw logs are processed and stored. I would be asked for particular data report on demand
  • Glennie Helles Sindholt
    Glennie Helles Sindholt over 8 years
    Right, so the first thing you do is a filter operation. Since Spark does lazy evaluation you should have no problems with the size of the data set. The filter will be applied before any actions and only the data you are interested in will be kept in memory.
  • WoodChopper
    WoodChopper over 8 years
    Well it seems only answer is this!
  • Eric M
    Eric M almost 8 years
    Have an upvote, Glennie! Your remark about using filter() on the read.parquet() was exactly what I needed.
  • mightymephisto
    mightymephisto about 7 years
    Schema Merging is only required if the schema's are different, if they are the same then you do not need this.
  • Vijay Krishna
    Vijay Krishna almost 7 years
    Glennie where did you read about the option on sqlcontext.read? I could not find anything in spark docs.
  • Glennie Helles Sindholt
    Glennie Helles Sindholt almost 7 years
    It is written in the documentation. Look here. When they changed it from the old way to the new way, I remember that I stubbled across it in the release notes.
  • Shankar
    Shankar about 6 years
    @GlennieHellesSindholt: i'm trying to read the AVRO files using Spark avro reader , i gave the basePath option but still the partition columns are not available in the DataFrame. any idea?
  • Auren Ferguson
    Auren Ferguson almost 5 years
    Is this exclusively for scala? I'm trying it with pyspark, it works with {} notation but not []. I'm trying to read in a range.
  • samthebest
    samthebest over 4 years
    Is there a way to have wildcards in the basePath I seem to be getting error : java.lang.IllegalArgumentException: Option 'basePath' must be a directory
  • Vivek Sharma
    Vivek Sharma over 4 years
    Does this work for specifying range of years and months in the same fashion like "file:///your/path/data=mydata/year=[2015-2018]/month=[1-6]/‌​day=[5-10]/*")
  • ben
    ben almost 4 years
    @samthebest did you got any solution of adding wildcards in basepath?
  • hui chen
    hui chen over 2 years
    It's so strange that the second method is not implemented in pyspark. It would be really handy to have it.