Filtering DynamicFrame with AWS Glue or PySpark

10,712

AWS Glue loads entire dataset from your JDBC source into temp s3 folder and applies filtering afterwards. If your data was in s3 instead of Oracle and partitioned by some keys (ie. /year/month/day) then you could use pushdown-predicate feature to load a subset of data:

val partitionPredicate = s"to_date(concat(year, '-', month, '-', day)) BETWEEN '${fromDate}' AND '${toDate}'"

val df = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

Unfortunately, this doesn't work for JDBC data sources yet.

Share:
10,712

Related videos on Youtube

user2752159
Author by

user2752159

Updated on June 04, 2022

Comments

  • user2752159
    user2752159 almost 2 years

    I have a table in my AWS Glue Data Catalog called 'mytable'. This table is in an on-premises Oracle database connection 'mydb'.

    I'd like to filter the resulting DynamicFrame to only rows where the X_DATETIME_INSERT column (which is a timestamp) is greater than a certain time (in this case, '2018-05-07 04:00:00'). Afterwards, I'm trying to count the rows to ensure that the count is low (the table is about 40,000 rows, but only a few rows should meet the filter criteria).

    Here is my current code:

    import boto3
    from datetime import datetime
    import logging
    import os
    import pg8000
    import pytz
    import sys
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from base64 import b64decode
    from pyspark.context import SparkContext
    from pyspark.sql.functions import lit
    ## @params: [TempDir, JOB_NAME]
    args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mydb", table_name = "mytable", transformation_ctx = "datasource0")
    
    # Try Glue native filtering    
    filtered_df = Filter.apply(frame = datasource0, f = lambda x: x["X_DATETIME_INSERT"] > '2018-05-07 04:00:00')
    filtered_df.count()
    

    This code runs for 20 minutes and times out. I've tried other variations:

    df = datasource0.toDF()
    df.where(df.X_DATETIME_INSERT > '2018-05-07 04:00:00').collect()
    

    And

    df.filter(df["X_DATETIME_INSERT"].gt(lit("'2018-05-07 04:00:00'")))
    

    Which have failed. What am I doing wrong? I'm experienced in Python but new to Glue and PySpark.

  • Infinite
    Infinite about 3 years
    could you please post supporting documentation for the same