Create Spark Dataframe from SQL Query
Solution 1
I found this here Bulk data migration through Spark SQL
The dbname parameter can be any query wrapped in parenthesis with an alias. So in my case, I need to do this:
val query = """
(select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d
join DialogLine as dl on dl.DialogID=d.DialogID
join DialogLineWordInstanceMatch as dlwim on dlwim.DialogLineID=dl.DialogLineID
join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID
join WordRoot as wr on wr.WordRootID=wi.WordRootID
where d.InSite=1 and dl.Active=1
limit 100) foo
"""
val df = sqlContext.format("jdbc").
option("url", "jdbc:mysql://localhost:3306/local_content").
option("driver", "com.mysql.jdbc.Driver").
option("useUnicode", "true").
option("continueBatchOnError","true").
option("useSSL", "false").
option("user", "root").
option("password", "").
option("dbtable",query).
load()
As expected, loading each table as its own Dataframe and joining them in Spark was very inefficient.
Solution 2
If you have your table
already registered in your SQLContext, you could simply use sql
method.
val resultDF = sqlContext.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
Solution 3
to save the output of a query to a new dataframe, simple set the result equal to a variable:
val newDataFrame = spark.sql("SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...")
and now newDataFrame
is a dataframe with all the dataframe functionalities available to it.
Solution 4
TL;DR: just create a view in your database.
Detail: I have a table t_city in my postgres database, on which I create a view:
create view v_city_3500 as
select asciiname, country, population, elevation
from t_city
where elevation>3500
and population>100000
select * from v_city_3500;
asciiname | country | population | elevation
-----------+---------+------------+-----------
Potosi | BO | 141251 | 3967
Oruro | BO | 208684 | 3936
La Paz | BO | 812799 | 3782
Lhasa | CN | 118721 | 3651
Puno | PE | 116552 | 3825
Juliaca | PE | 245675 | 3834
In the spark-shell:
val sx= new org.apache.spark.sql.SQLContext(sc)
var props=new java.util.Properties()
props.setProperty("driver", "org.postgresql.Driver" )
val url="jdbc:postgresql://buya/dmn?user=dmn&password=dmn"
val city_df=sx.read.jdbc(url=url,table="t_city",props)
val city_3500_df=sx.read.jdbc(url=url,table="v_city_3500",props)
Result:
city_df.count()
Long = 145725
city_3500_df.count()
Long = 6
opus111
Updated on July 09, 2022Comments
-
opus111 almost 2 years
I'm sure this is a simple SQLContext question, but I can't find any answer in the Spark docs or Stackoverflow
I want to create a Spark Dataframe from a SQL Query on MySQL
For example, I have a complicated MySQL query like
SELECT a.X,b.Y,c.Z FROM FOO as a JOIN BAR as b ON ... JOIN ZOT as c ON ... WHERE ...
and I want a Dataframe with Columns X,Y and Z
I figured out how to load entire tables into Spark, and I could load them all, and then do the joining and selection there. However, that is very inefficient. I just want to load the table generated by my SQL query.
Here is my current approximation of the code, that doesn't work. Mysql-connector has an option "dbtable" that can be used to load a whole table. I am hoping there is some way to specify a query
val df = sqlContext.format("jdbc"). option("url", "jdbc:mysql://localhost:3306/local_content"). option("driver", "com.mysql.jdbc.Driver"). option("useUnicode", "true"). option("continueBatchOnError","true"). option("useSSL", "false"). option("user", "root"). option("password", ""). sql( """ select dl.DialogLineID, dlwim.Sequence, wi.WordRootID from Dialog as d join DialogLine as dl on dl.DialogID=d.DialogID join DialogLineWordInstanceMatch as dlwim o n dlwim.DialogLineID=dl.DialogLineID join WordInstance as wi on wi.WordInstanceID=dlwim.WordInstanceID join WordRoot as wr on wr.WordRootID=wi.WordRootID where d.InSite=1 and dl.Active=1 limit 100 """ ).load()