Create Spark Dataframe from SQL Query

48,629

Solution 1

I found this here Bulk data migration through Spark SQL

The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:

val query = """
  (select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100) foo
"""

val df = sqlContext.format("jdbc").
  option("url", "jdbc:mysql://localhost:3306/local_content").
  option("driver", "com.mysql.jdbc.Driver").
  option("useUnicode", "true").
  option("continueBatchOnError","true").
  option("useSSL", "false").
  option("user", "root").
  option("password", "").
  option("dbtable",query).
  load()

As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.

Solution 2

If you have your table already registered in your SQLContext, you could simply use sql method.

val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

Solution 3

to save the output of a query to a new dataframe, simple set the result equal to a variable:

val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")

and now newDataFrame is a dataframe with all the dataframe functionalities available to it.

Solution 4

TL;DR: just create a view in your database.

Detail: I have a table t_city in my postgres database, on which I create a view:

create view v_city_3500 as
    select asciiname, country, population, elevation
    from t_city
    where elevation>3500
    and population>100000

select * from v_city_3500;

 asciiname | country | population | elevation
-----------+---------+------------+-----------
 Potosi    | BO      |     141251 |      3967
 Oruro     | BO      |     208684 |      3936
 La Paz    | BO      |     812799 |      3782
 Lhasa     | CN      |     118721 |      3651
 Puno      | PE      |     116552 |      3825
 Juliaca   | PE      |     245675 |      3834

In the spark-shell:

val sx= new org.apache.spark.sql.SQLContext(sc)

var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"

val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)

Result:

city_df.count()
Long = 145725

city_3500_df.count()
Long = 6
Share:
48,629
opus111
Author by

opus111

Updated on July 09, 2022

Comments

  • opus111
    opus111 almost 2 years

    I'm sure this is a simple SQLContext question, but I can't find any answer in the Spark docs or Stackoverflow

    I want to create a Spark Dataframe from a SQL Query on MySQL

    For example, I have a complicated MySQL query like

    SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...
    

    and I want a Dataframe with Columns X,Y and Z

    I figured out how to load entire tables into Spark, and I could load them all, and then do the joining and selection there. However, that is very inefficient. I just want to load the table generated by my SQL query.

    Here is my current approximation of the code, that doesn't work. Mysql-connector has an option "dbtable" that can be used to load a whole table. I am hoping there is some way to specify a query

      val df = sqlContext.format("jdbc").
        option("url", "jdbc:mysql://localhost:3306/local_content").
        option("driver", "com.mysql.jdbc.Driver").
        option("useUnicode", "true").
        option("continueBatchOnError","true").
        option("useSSL", "false").
        option("user", "root").
        option("password", "").
        sql(
    """
    select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
    join DialogLine as dl on dl.DialogID=d.DialogID
    join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID
    join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
    join WordRoot as wr on wr.WordRootID=wi.WordRootID
    where d.InSite=1 and dl.Active=1
    limit 100
    """
        ).load()