Partition Athena query by S3 created date

13,403

Solution 1

There is no way to make Athena use things like S3 object metadata for query planning. The only way to make Athena skip reading objects is to organize the objects in a way that makes it possible to set up a partitioned table, and then query with filters on the partition keys.

It sounds like you have an idea of how partitioning in Athena works, and I assume there is a reason that you are not using it. However, for the benefit of others with similar problems coming across this question I'll start by explaining what you can do if you can change the way the objects are organized. I'll give an alternative suggestion at the end, you may want to jump straight to that.

I would suggest you organize the JSON objects using prefixes that contain some part of the timestamps of the objects. Exactly how much depends on the way you query the data. You don't want it too granular and not too coarse. Making it too granular will make Athena spend more time listing files on S3, making it too coarse will make it read too many files. If the most common time period of queries is a month, that is a good granularity, if the most common period is a couple of days then day is probably better.

For example, if day is the best granularity for your dataset you could organize the objects using keys like this:

s3://some-bucket/data/2019-03-07/object0.json
s3://some-bucket/data/2019-03-07/object1.json
s3://some-bucket/data/2019-03-08/object0.json
s3://some-bucket/data/2019-03-08/object1.json
s3://some-bucket/data/2019-03-08/object2.json

You can also use a Hive-style partitioning scheme, which is what other tools like Glue, Spark, and Hive expect, so unless you have reasons not to it can save you grief in the future:

s3://some-bucket/data/created_date=2019-03-07/object0.json
s3://some-bucket/data/created_date=2019-03-07/object1.json
s3://some-bucket/data/created_date=2019-03-08/object0.json

I chose the name created_date here, I don't know what would be a good name for your data. You can use just date, but remember to always quote it (and quote it in different ways in DML and DDL…) since it's a reserved word.

Then you create a partitioned table:

CREATE TABLE my_data (
  column0 string,
  column1 int
)
PARTITIONED BY (created_date date)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://some-bucket/data/'
TBLPROPERTIES ('has_encrypted_data'='false')

Some guides will then tell you to run MSCK REPAIR TABLE to load the partitions for the table. If you use Hive-style partitioning (i.e. …/created_date=2019-03-08/…) you can do this, but it will take a long time and I wouldn't recommend it. You can do a much better job of it by manually adding the partitions, which you do like this:

ALTER TABLE my_data ADD
  PARTITION (created_date = '2019-03-07') LOCATION 's3://some-bucket/data/created_date=2019-03-07/'
  PARTITION (created_date = '2019-03-08') LOCATION 's3://some-bucket/data/created_date=2019-03-08/'

Finally, when you query the table make sure to include the created_date column to give Athena the information it needs to read only the objects that are relevant for the query:

SELECT COUNT(*)
FROM my_data
WHERE created_date >= DATE '2019-03-07'

You can verify that the query will be cheaper by observing the difference in the data scanned when you change from for example created_date >= DATE '2019-03-07' to created_date = DATE '2019-03-07'.


If you are not able to change the way the objects are organized on S3, there is a poorly documented feature that makes it possible to create a partitioned table even when you can't change the data objects. What you do is you create the same prefixes as I suggest above, but instead of moving the JSON objects into this structure you put a file called symlink.txt in each partition's prefix:

s3://some-bucket/data/created_date=2019-03-07/symlink.txt
s3://some-bucket/data/created_date=2019-03-08/symlink.txt

In each symlink.txt you put the full S3 URI of the files that you want to include in that partition. For example, in the first file you could put:

s3://data-bucket/data/object0.json
s3://data-bucket/data/object1.json

and the second file:

s3://data-bucket/data/object2.json
s3://data-bucket/data/object3.json
s3://data-bucket/data/object4.json

Then you create a table that looks very similar to the table above, but with one small difference:

CREATE TABLE my_data (
  column0 string,
  column1 int
)
PARTITIONED BY (created_date date)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://some-bucket/data/'
TBLPROPERTIES ('has_encrypted_data'='false')

Notice the value of the INPUTFORMAT property.

You add partitions just like you do for any partitioned table:

ALTER TABLE my_data ADD
  PARTITION (created_date = '2019-03-07') LOCATION 's3://some-bucket/data/created_date=2019-03-07/'
  PARTITION (created_date = '2019-03-08') LOCATION 's3://some-bucket/data/created_date=2019-03-08/'

The only Athena-related documentation of this feature that I have come across for this is the S3 Inventory docs for integrating with Athena.

Solution 2

I started working with Theo's answer and it was very close (Thank you, Theo for the excellent and very detailed response), but when adding multiple partitions according to the documentation you only need to specify "ADD" once near the beginning of the query.

I tried specifying "ADD" on each line per Theo's example but received an error. It works when only specified once, though. Below is the format I used which was successful:

ALTER TABLE db.table_name ADD IF NOT EXISTS
 PARTITION (event_date = '2019-03-01') LOCATION 's3://bucket-name/2019-03-01/'
 PARTITION (event_date = '2019-03-02') LOCATION 's3://bucket-name/2019-03-02/'
 PARTITION (event_date = '2019-03-03') LOCATION 's3://bucket-name/2019-03-03/'
 ...
Share:
13,403
waquner
Author by

waquner

Updated on June 13, 2022

Comments

  • waquner
    waquner almost 2 years

    I have a S3 bucket with ~ 70 million JSONs (~ 15TB) and an athena table to query by timestamp and some other keys definied in the JSON.

    It is guaranteed, that the timestamp in the JSON is more or less equal to the S3-createdDate of the JSON (or at least equal enough for the purpose of my query)

    Can I somehow improve querying-performance (and cost) by adding the createddate as something like a "partition" - which I unterstand seems only to be possible for prefixes/folders?

    edit: I currently simulate that by using the S3 inventory CSV to pre-filter by createdDate and then download all JSONs and do the rest of the filtering, but I'd like to do that completely inside athena, if possible

  • Ben Swinburne
    Ben Swinburne about 4 years
    You said "You can use just date" in your answer above, but I tried creating the partition named date and it wouldn't run the query. I tried with single and double quotes and backticks but it wouldn't run. I aliased the index date as dt but then all my files in s3 are prefixed with date, not dt. Presumably the file prefix in s3 would need to be dt in this example to be used?
  • Ben Swinburne
    Ben Swinburne about 4 years
    Also, you've got daily partitions there but suggest not using MSCK REPAIR TABLE. How would these indexes typically get updated then in an automated way? Obviously running an alter table or even repair table manually each day is impractical.
  • Theo
    Theo about 4 years
    Sorry about the bad syntax in my answer, I've fixed it.
  • Theo
    Theo about 4 years
    Using "date" as a column name is tricky because, as I noted in the answer, the quoting is different in different contexts, avoid it if you can.
  • Theo
    Theo about 4 years
    @BenSwinburne I have two suggestions for how to automate partition creation: if you only partition on time (e.g. "created_date") you can run a Lambda function on the last day of each month (set up a schedule with Event Bridge) that adds the next month's partitions (there doesn't have to be any data on S3, partitions are just metadata). If you partition on more than time use an S3 trigger that runs a Lambda function for each new object and check if a new partition needs to be added for every object (you can also put the events in a queue to avoid running on every new object).
  • Theo
    Theo about 4 years
    When using Lambda I would recommend using the Glue API directly to create partitions, using the BatchCreatePartition or CreatePartition calls. It's very different from executing SQL in Athena, and you need to specify much more, but it's faster.
  • Ben Swinburne
    Ben Swinburne about 4 years
    Great, thanks for the feedback. I ditched the date= prefix but had to reimport all my data, using dt= now. I've got daily (dt=yyyy-mm-dd/) partitions as i didn't really see the value in splitting into /y=/m=/d=/h=, but that may bite me later. I will run a scheduled lamda to prep for the next month that's easy enough. I have a CTAS table which gets queried much more often throughout the day on top of the daily partitioned one so that is immediately out of date. Any suggestions on how to keep that up to date (shortest 5 min intervals)? Are you on the og-aws slack or anything per chance?