How to improve performance for slow Spark jobs using DataFrame and JDBC connection?

25,524

Solution 1

All of the aggregation operations are performed after the whole dataset is retrieved into memory into a DataFrame collection. So doing the count in Spark will never be as efficient as it would be directly in TeraData. Sometimes it's worth it to push some computation into the database by creating views and then mapping those views using the JDBC API.

Every time you use the JDBC driver to access a large table you should specify the partitioning strategy otherwise you will create a DataFrame/RDD with a single partition and you will overload the single JDBC connection.

Instead you want to try the following AI (since Spark 1.4.0+):

sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  columnName = "<INTEGRAL_COLUMN_TO_PARTITION>", 
  lowerBound = minValue,
  upperBound = maxValue,
  numPartitions = 20,
  connectionProperties = new java.util.Properties()
)

There is also an option to push down some filtering.

If you don't have an uniformly distributed integral column you want to create some custom partitions by specifying custom predicates (where statements). For example let's suppose you have a timestamp column and want to partition by date ranges:

    val predicates = 
  Array(
    "2015-06-20" -> "2015-06-30",
    "2015-07-01" -> "2015-07-10",
    "2015-07-11" -> "2015-07-20",
    "2015-07-21" -> "2015-07-31"
  )
  .map {
    case (start, end) => 
      s"cast(DAT_TME as date) >= date '$start'  AND cast(DAT_TME as date) <= date '$end'"
  }

 predicates.foreach(println) 

// Below is the result of how predicates were formed 
//cast(DAT_TME as date) >= date '2015-06-20'  AND cast(DAT_TME as date) <= date '2015-06-30'
//cast(DAT_TME as date) >= date '2015-07-01'  AND cast(DAT_TME as date) <= date '2015-07-10'
//cast(DAT_TME as date) >= date '2015-07-11'  AND cast(DAT_TME as date) <= date //'2015-07-20'
//cast(DAT_TME as date) >= date '2015-07-21'  AND cast(DAT_TME as date) <= date '2015-07-31'


sqlctx.read.jdbc(
  url = "<URL>",
  table = "<TABLE>",
  predicates = predicates,
  connectionProperties = new java.util.Properties()
)

It will generate a DataFrame where each partition will contain the records of each subquery associated to the different predicates.

Check the source code at DataFrameReader.scala

Solution 2

Does the unserialized table fit into 40 GB? If it starts swapping on disk performance will decrease drammatically.

Anyway when you use a standard JDBC with ansi SQL syntax you leverage the DB engine, so if teradata ( I don't know teradata ) holds statistics about your table, a classic "select count(*) from table" will be very fast. Instead spark, is loading your 100 million rows in memory with something like "select * from table" and then will perform a count on RDD rows. It's a pretty different workload.

Share:
25,524
Dev Patel
Author by

Dev Patel

Updated on August 20, 2020

Comments

  • Dev Patel
    Dev Patel over 3 years

    I am trying to access a mid-size Teradata table (~100 million rows) via JDBC in standalone mode on a single node (local[*]).

    I am using Spark 1.4.1. and is setup on a very powerful machine(2 cpu, 24 cores, 126G RAM).

    I have tried several memory setup and tuning options to make it work faster, but neither of them made a huge impact.

    I am sure there is something I am missing and below is my final try that took about 11 minutes to get this simple counts vs it only took 40 seconds using a JDBC connection through R to get the counts.

    bin/pyspark --driver-memory 40g --executor-memory 40g
    
    df = sqlContext.read.jdbc("jdbc:teradata://......)
    df.count()
    

    When I tried with BIG table (5B records) then no results returned upon completion of query.

    • zero323
      zero323 over 8 years
      How do you count using R?
    • Dev Patel
      Dev Patel over 8 years
      @zero323 - simply using RJDBC and teradataR packages after setting up connection using Teradata JARS..and then tdQuery("SELECT COUNT(*) FROM your_table)
    • zero323
      zero323 over 8 years
      As far as I know Spark JDBC Data Source can push down predicates but actual executing is done in Spark. It means you have to transfer your data to the Spark cluster. So it is not the same as executing SQL query over JDBC (R case). First you should do is to cache your data after loading. It won't improve performance for the first query though.
    • Dev Patel
      Dev Patel over 8 years
      @zero323 - thanks, I realized that after doing some more research on this. I do have a quick question thought - what would be the fastest way to read data in apache spark? is it through Parquet file structure?
    • zero323
      zero323 over 8 years
      It is probably a good choice but the first thing you can try before you go this way is to use Teradata Hadoop conector. It looks like it supports multiple export options including Hive tables. With a single machine network and disk IO can be still a limiting factor though.
    • samthebest
      samthebest over 8 years
      Suggest accepting Gianmarios answer.
  • Dev Patel
    Dev Patel over 8 years
    I think it would and I also tried increasing memory to 100 GB, but didn't see any improvement. I am not trying to load 100 million rows in memory, but running some aggregated operation such as count() on dataframe or count(*) on temp table, but Spark take too long. I also tried registering a DF as temp table and did a simple count, but takes about the same time. ra1.registerTempTable("ra_dt"); total = sqlContext.sql("select count(*) from ra_dt")
  • axlpado - Agile Lab
    axlpado - Agile Lab over 8 years
    Yes, but I think that spark is not pruning the count operation on DB engine, so it will load all rows in memory and then will perform a count on DF.
  • axlpado - Agile Lab
    axlpado - Agile Lab over 8 years
    How many columns do you have into that table, with 100 million rows is pretty easy to reach 100 GB of unserialized objects. Could you post your table schema ?
  • Dev Patel
    Dev Patel over 8 years
    I think you're right, I was reading few other posts online and found that Spark is trying to load the data before applying count operation. In that case, what would be ideal way to read this type of data faster in Spark? In other words what would be the fastest way to read data in apache spark? Here is my table schema: root |-- field1: decimal(18,0) (nullable = true) |-- field2: string (nullable = true) |-- field3: date (nullable = true) |-- field4: date (nullable = true) |-- field5: integer (nullable = true) |-- field6: string (nullable = true)
  • axlpado - Agile Lab
    axlpado - Agile Lab over 8 years
    Spark is a distributed processing engine, so the best way to load data in spark is from a distributed file system or dbms. In your case, working on a signle instance, I think you can only improve performance specifying partitionColumn, lowerBound, upperBound, numPartition to improve reading parallelism. If you need to perform other queries after the count you can cache the DF before count it, so the first count will take its time but then next queries will be in memory and will be faster.
  • Dev Patel
    Dev Patel over 8 years
    Makes sense! Thanks for the answer!
  • Boggio
    Boggio over 8 years
    How many executors are you running, and how many --executor-cores?
  • y2k-shubham
    y2k-shubham about 6 years
    @zero323, @Gianmario Spacagna if I actually need to read the entire MySQL table (and not just get count), then how can I improve the sluggish performance of Spark-SQL? I'm already parallelizing the read operation using spark.read.jdbc(..numPartitions..) method.
  • y2k-shubham
    y2k-shubham about 6 years
    My MySQL (InnoDB) table has ~ 186M records weighing around 149 GB (as per stats shown by phpMyAdmin) and I'm using numPartitions = 32. [Spark 2.2.0] I'm on EMR 5.12.0 with 1 master, 1 task and 1 core (all r3.xlarge, 8 vCore, 30.5 GiB memory, 80 SSD GB storage). I've found that reading MySQL table into DataFrame fails if I DON'T limit the records to ~ 1.5-2M. It gives a long stack-trace that has javax.servlet.ServletException: java.util.NoSuchElementException: None.get & java.sql.SQLException: Incorrect key file for table..