spark 2.4 Parquet column cannot be converted in file, Column: [Impressions], Expected: bigint, Found: BINARY

13,292

When using hive table over parquet, and then read it using SPARK, SPARK takes the schema of the parquet and not of the hive table defenition.

it make sense that into ur parquet files schema Impressions is a BINARY, and it doesnt matter that in the hive table its Long, because spark take the schema from the parquet file.

Share:
13,292
Jay Cee
Author by

Jay Cee

Updated on June 28, 2022

Comments

  • Jay Cee
    Jay Cee almost 2 years

    I'm facing a weird issue that I cannot understand.

    I have source data with a column "Impressions" that is sometimes a bigint / sometimes a string (when I manually explore the data).

    The HIVE schema registered for this column is of Long.

    Thus, when loading the data:

    spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW adwords_ads_agg_Yxz AS
    
    SELECT
        a.customer_id
        , a.Campaign
        , ...
        , SUM(BIGINT(a.Impressions)) as Impressions
        , SUM(BIGINT(a.Cost))/1000000 as Cost
    FROM adwords_ad a
    LEFT JOIN ds_ad_mapping m ON BIGINT(a.Ad_ID) = BIGINT(m.adEngineId) AND a.customer_id = m.reportAccountId
    WHERE a.customer_id in (...)
    AND a.day >= DATE('2019-02-01')
    GROUP BY
        a.customer_id
        , ...
    """)
    

    I'm making sure that everything get converted to BIGINT. The error happens later on, on the step:

    spark.sql("CACHE TABLE adwords_ads_agg_Yxz")
    

    After seeing this error, I ran the same code in a notebook and tried to have more debug, first of all by making sure that the conversion happens to the column for BIGINT / long:

    from pyspark.sql.types import LongType
    
    df = df.withColumn("Impressions", f.col("Impressions").cast(LongType()))
    df.createOrReplaceTempView('adwords_ads_agg_Yxz')
    
    

    and then printing the schema from this freshly converted df:

    root
     |-- customer_id: long (nullable = true)
     |-- Campaign: string (nullable = true)
     |-- MatchType: string (nullable = true)
     |-- League: string (nullable = false)
     |-- Ad_Group: string (nullable = true)
     |-- Impressions: long (nullable = true) <- Here!
     |-- Cost: double (nullable = true)
    

    and then doing the caching, but the error remains:

    Spark Job Progress An error occurred while calling o84.sql. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 9 in stage 47.0 failed 4 times, most recent failure: Lost task 9.3 in stage 47.0 (TID 2256, ip-172-31-00-00.eu-west-1.compute.internal, executor 10): org.apache.spark.sql.execution.QueryExecutionException: Parquet column cannot be converted in file s3a://bucket/prod/reports/adwords_ad/customer_id=1111111/date=2019-11-21/theparquetfile.snappy.parquet. Column: [Impressions], Expected: bigint, Found: BINARY

    Has anyone encounter this problem and / or would have an idea what is causing this?

    If I remove the caching, the error will happen when trying to write the data to parquet. I don't know either why it is mentioning adwords_ad table at this point when I'm trying to refresh / write a temporary table