How to solve this error org.apache.spark.sql.catalyst.errors.package$TreeNodeException

40,537

Solution 1

I'm not sure about the real cause, the only thing that caught my eye is following SQL expression: (SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T - what the T means here?

Regarding the overall design, I would recommend completely different approach. In your case you have 2 processors that work on the same data collected from Oracle, and each processor fetches the data separately. I would recommend to move the reading of Oracle data into separate procedure that will return data frame (you need to cache it), and then your processors will work on that data frame and persist data into Cassandra.

Or as it was recommended before, you can separate the job into 2 pieces - one that pulls all the data from Oracle, and store the dataframe into disk (not persist, but using the write), for example, as Parquet file. And then separate job(s) that will take data from disk, and perform necessary transformations.

In both scenarios you

Solution 2

I was closing the sparkSession in finally block in the first processor/called class. I moved it out of the processor and placed inside the calling class which solved the issue.

Solution 3

I have faced this same issue and I would associate this problem to skewed column when you are reading from oracle in your process and is leading to single partition in Spark. Would recommend anyone facing this issue to use a balanced partition column.

Share:
40,537
BdEngineer
Author by

BdEngineer

Updated on August 27, 2020

Comments

  • BdEngineer
    BdEngineer over 3 years

    I have two procesess each process do 1) connect oracle db read a specific table 2) form dataframe and process it. 3) save the df to cassandra.

    If I am running both process parallelly , both try to read from oracle and I am getting below error while second process read the data

     ERROR ValsProcessor2: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
    Exchange SinglePartition
    +- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#290L])
       +- *(1) Scan JDBCRelation((SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T) [numPartitions=2] [] PushedFilters: [], ReadSchema: struct<>
    at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
    at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
    at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
    at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
    at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
    at com.snp.processors.BenchmarkModelValsProcessor2.process(BenchmarkModelValsProcessor2.scala:43)
    at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors$2.apply(Utils.scala:28)
    at com.snp.utils.Utils$$anonfun$getAllDefinedProcessors$2.apply(Utils.scala:28)
    at com.sp.MigrationDriver$$anonfun$main$2$$anonfun$apply$1.apply(MigrationDriver.scala:78)
    at com.sp.MigrationDriver$$anonfun$main$2$$anonfun$apply$1.apply(MigrationDriver.scala:78)
    at scala.Option.map(Option.scala:146)
    at com.sp.MigrationDriver$$anonfun$main$2.apply(MigrationDriver.scala:75)
    at com.sp.MigrationDriver$$anonfun$main$2.apply(MigrationDriver.scala:74)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
    at com.sp.MigrationDriver$.main(MigrationDriver.scala:74)
    at com.sp.MigrationDriver.main(MigrationDriver.scala)
    Caused by: java.lang.NullPointerException
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.needToCopyObjectsBeforeShuffle(ShuffleExchangeExec.scala:163)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:300)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:91)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
        at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
        ... 37 more
    

    What am i doing wrong here ? How to fix this ?

  • BdEngineer
    BdEngineer over 5 years
    Sir "(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T" is a sample simple oracle test query .... with out T it is not working ...i.e might be spark-sql formality.... 2) here i need to run different processor each one for reading different data from oracle ( to simulate the process of reading/running many processors I started with two ...in my case both reading from same query... but my objective is different queries ...3) (you need to cache it) ??? where should i cache it ? RAM is okey ?
  • BdEngineer
    BdEngineer over 5 years
    4) "store the dataframe into disk (not persist, but using the write), for example, as Parquet file" --- how to write ? you mean I need to store on disk? does it work or add latency? network issues ?
  • Alex Ott
    Alex Ott over 5 years
    I really recommend to read initial chapters of "Spark: The definitive guide" - it will provide enough background to write spark code
  • BdEngineer
    BdEngineer over 5 years
    thank you sir , sure. Sir one think I dont understand is the error coming when reading the data from oracle that too second time ... not sure if some spark settings are wrong in my case..
  • BdEngineer
    BdEngineer over 5 years
    sir is this piece of reading data from oracle scalable in my shared code ?? If not what is best practice for production grade approach ? i.e. "/* * Load the data from oracle for given schema and query. */ val ora_m_vals_df = DbUtils.readOracleData(oraOptionDfConfig, "OracleSchemaTest" , PARTITION_COLUMN, "(SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T" ); "
  • Yan Sklyarenko
    Yan Sklyarenko about 4 years
    It sounds more like a comment than an answer.