Reading DataFrame from partitioned parquet file
Solution 1
sqlContext.read.parquet
can take multiple paths as input. If you want just day=5
and day=6
, you can simply add two paths like:
val dataframe = sqlContext
.read.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/",
"file:///your/path/data=jDD/year=2015/month=10/day=6/")
If you have folders under day=X
, like say country=XX
, country
will automatically be added as a column in the dataframe
.
EDIT: As of Spark 1.6 one needs to provide a "basepath"-option in order for Spark to generate columns automatically. In Spark 1.6.x the above would have to be re-written like this to create a dataframe with the columns "data", "year", "month" and "day":
val dataframe = sqlContext
.read
.option("basePath", "file:///your/path/")
.parquet("file:///your/path/data=jDD/year=2015/month=10/day=5/",
"file:///your/path/data=jDD/year=2015/month=10/day=6/")
Solution 2
If you want to read for multiple days, for example day = 5
and day = 6
and want to mention the range in the path itself, wildcards can be used:
val dataframe = sqlContext
.read
.parquet("file:///your/path/data=jDD/year=2015/month=10/day={5,6}/*")
Wildcards can also be used to specify a range of days:
val dataframe = sqlContext
.read
.parquet("file:///your/path/data=jDD/year=2015/month=10/day=[5-10]/*")
This matches all days from 5 to 10.
Solution 3
you need to provide mergeSchema = true
option. like mentioned below (this is from 1.6.0):
val dataframe = sqlContext.read.option("mergeSchema", "true").parquet("file:///your/path/data=jDD")
This will read all the parquet files into dataframe and also creates columns year, month and day in the dataframe data.
Ref: https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#schema-merging
WoodChopper
Updated on September 23, 2021Comments
-
WoodChopper over 2 years
How to read partitioned parquet with condition as dataframe,
this works fine,
val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=25/*")
Partition is there for
day=1 to day=30
is it possible to read something like(day = 5 to 6)
orday=5,day=6
,val dataframe = sqlContext.read.parquet("file:///home/msoproj/dev_data/dev_output/aln/partitions/data=jDD/year=2015/month=10/day=??/*")
If I put
*
it gives me all 30 days data and it too big. -
WoodChopper over 8 yearsFirst thanks for response, I was looking for more simple way. In case some 20 days as subset this way will be kind of difficult. I would be filtering often to check the data accuracy.
-
Glennie Helles Sindholt over 8 yearsThen why not simply do
val dataframe = sqlContext.read.parquet("file:///your/path/data=jDD/year=2015/month=10/")?
day` is added as a column in the dataframe, which you can then filter on. -
WoodChopper over 8 yearsActually, it very huge data running. Data is from 2007 to 2015. On an average 5 billion row of raw logs are processed and stored. I would be asked for particular data report on demand
-
Glennie Helles Sindholt over 8 yearsRight, so the first thing you do is a
filter
operation. Since Spark does lazy evaluation you should have no problems with the size of the data set. The filter will be applied before any actions and only the data you are interested in will be kept in memory. -
WoodChopper over 8 yearsWell it seems only answer is this!
-
Eric M almost 8 yearsHave an upvote, Glennie! Your remark about using filter() on the read.parquet() was exactly what I needed.
-
mightymephisto about 7 yearsSchema Merging is only required if the schema's are different, if they are the same then you do not need this.
-
Vijay Krishna almost 7 yearsGlennie where did you read about the option on sqlcontext.read? I could not find anything in spark docs.
-
Glennie Helles Sindholt almost 7 yearsIt is written in the documentation. Look here. When they changed it from the old way to the new way, I remember that I stubbled across it in the release notes.
-
Shankar about 6 years@GlennieHellesSindholt: i'm trying to read the AVRO files using Spark avro reader , i gave the basePath option but still the partition columns are not available in the DataFrame. any idea?
-
Auren Ferguson almost 5 yearsIs this exclusively for scala? I'm trying it with pyspark, it works with
{}
notation but not[]
. I'm trying to read in a range. -
samthebest over 4 yearsIs there a way to have wildcards in the
basePath
I seem to be getting error :java.lang.IllegalArgumentException: Option 'basePath' must be a directory
-
Vivek Sharma over 4 yearsDoes this work for specifying range of years and months in the same fashion like "file:///your/path/data=mydata/year=[2015-2018]/month=[1-6]/day=[5-10]/*")
-
ben almost 4 years@samthebest did you got any solution of adding wildcards in basepath?
-
hui chen over 2 yearsIt's so strange that the second method is not implemented in pyspark. It would be really handy to have it.