How do you perform basic joins of two RDD tables in Spark using Python?

56,666

It can be done either using PairRDDFunctions or Spark Data Frames. Since data frame operations benefit from Catalyst Optimizer the second option is worth considering.

Assuming your data looks as follows:

rdd1 =  sc.parallelize([("foo", 1), ("bar", 2), ("baz", 3)])
rdd2 =  sc.parallelize([("foo", 4), ("bar", 5), ("bar", 6)])

With PairRDDs:

Inner join:

rdd1.join(rdd2)

Left outer join:

rdd1.leftOuterJoin(rdd2)

Cartesian product (doesn't require RDD[(T, U)]):

rdd1.cartesian(rdd2)

Broadcast join (doesn't require RDD[(T, U)]):

Finally there is cogroup which has no direct SQL equivalent but can be useful in some situations:

cogrouped = rdd1.cogroup(rdd2)

cogrouped.mapValues(lambda x: (list(x[0]), list(x[1]))).collect()
## [('foo', ([1], [4])), ('bar', ([2], [5, 6])), ('baz', ([3], []))]

With Spark Data Frames

You can use either SQL DSL or execute raw SQL using sqlContext.sql.

df1 = spark.createDataFrame(rdd1, ('k', 'v1'))
df2 = spark.createDataFrame(rdd2, ('k', 'v2'))

# Register temporary tables to be able to use `sparkSession.sql`
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')

Inner join:

# inner is a default value so it could be omitted
df1.join(df2, df1.k == df2.k, how='inner') 
spark.sql('SELECT * FROM df1 JOIN df2 ON df1.k = df2.k')

Left outer join:

df1.join(df2, df1.k == df2.k, how='left_outer')
spark.sql('SELECT * FROM df1 LEFT OUTER JOIN df2 ON df1.k = df2.k')

Cross join (explicit cross join or configuration changes are required in Spark. 2.0 - spark.sql.crossJoin.enabled for Spark 2.x):

df1.crossJoin(df2)
spark.sql('SELECT * FROM df1 CROSS JOIN df2')

df1.join(df2)
sqlContext.sql('SELECT * FROM df JOIN df2')

Since 1.6 (1.5 in Scala) each of these can be combined with broadcast function:

from pyspark.sql.functions import broadcast

df1.join(broadcast(df2), df1.k == df2.k)

to perform broadcast join. See also Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark

Share:
56,666
invoketheshell
Author by

invoketheshell

CW - CW - CW Data Scientist | Scientific Computing | Hadoop Ecosystem | Database | ETL | Fun SOreadyTOhelp

Updated on July 20, 2022

Comments

  • invoketheshell
    invoketheshell almost 2 years

    How would you perform basic joins in Spark using python? In R you could use merg() to do this. What is the syntax using python on spark for:

    1. Inner Join
    2. Left Outer Join
    3. Cross Join

    With two tables (RDD) with a single column in each that has a common key.

    RDD(1):(key,U)
    RDD(2):(key,V)
    

    I think an inner join is something like this:

    rdd1.join(rdd2).map(case (key, u, v) => (key, ls ++ rs));
    

    Is that right? I have searched the internet and can't find a good example of joins. Thanks in advance.

  • soMuchToLearnAndShare
    soMuchToLearnAndShare almost 8 years
    just a note: cartesian is in fact available on RDD (not PairRDD)
  • paradox
    paradox about 7 years
    df1.join(df2, df1.k == df2.k, joinType='left_outer') how do you multiple logic into the parameter input ? df1.k == df2.k | df1.k2 == df2.k2 ?
  • zero323
    zero323 about 7 years
    @paradox (df1.k == df2.k) | (df1.k2 == df2.k2) but it would make more sense to make it an union or melt and convert to equi-join.
  • MTT
    MTT almost 7 years
    @zero323: Excellent answer. the only change I recommend is that from version 2.0, they changed 'joinType' to 'how'.
  • Nikhil Baby
    Nikhil Baby over 6 years
    Is it possible to add a function to the join condition. Say I have a function that checks the similarity of two strings and return an percentage of similarity. For example: df1.join(df2, stringFunction(df1.k ,df2.k) > 80, how='left_outer')