How to join on multiple columns in Pyspark?

166,889

Solution 1

You should use & / | operators and be careful about operator precedence (== has lower precedence than bitwise AND and OR):

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x3"))

df = df1.join(df2, (df1.x1 == df2.x1) & (df1.x2 == df2.x2))
df.show()

## +---+---+---+---+---+---+
## | x1| x2| x3| x1| x2| x3|
## +---+---+---+---+---+---+
## |  2|  b|3.0|  2|  b|0.0|
## +---+---+---+---+---+---+

Solution 2

An alternative approach would be:

df1 = sqlContext.createDataFrame(
    [(1, "a", 2.0), (2, "b", 3.0), (3, "c", 3.0)],
    ("x1", "x2", "x3"))

df2 = sqlContext.createDataFrame(
    [(1, "f", -1.0), (2, "b", 0.0)], ("x1", "x2", "x4"))

df = df1.join(df2, ['x1','x2'])
df.show()

which outputs:

+---+---+---+---+
| x1| x2| x3| x4|
+---+---+---+---+
|  2|  b|3.0|0.0|
+---+---+---+---+

With the main advantage being that the columns on which the tables are joined are not duplicated in the output, reducing the risk of encountering errors such as org.apache.spark.sql.AnalysisException: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L.


Whenever the columns in the two tables have different names, (let's say in the example above, df2 has the columns y1, y2 and y4), you could use the following syntax:

df = df1.join(df2.withColumnRenamed('y1','x1').withColumnRenamed('y2','x2'), ['x1','x2'])

Solution 3

test = numeric.join(Ref, 
   on=[
     numeric.ID == Ref.ID, 
     numeric.TYPE == Ref.TYPE,
     numeric.STATUS == Ref.STATUS 
   ], how='inner')
Share:
166,889
user3803714
Author by

user3803714

Updated on May 12, 2021

Comments

  • user3803714
    user3803714 almost 3 years

    I am using Spark 1.3 and would like to join on multiple columns using python interface (SparkSQL)

    The following works:

    I first register them as temp tables.

    numeric.registerTempTable("numeric")
    Ref.registerTempTable("Ref")
    
    test  = numeric.join(Ref, numeric.ID == Ref.ID, joinType='inner')
    

    I would now like to join them based on multiple columns.

    I get SyntaxError: invalid syntax with this:

    test  = numeric.join(Ref,
       numeric.ID == Ref.ID AND numeric.TYPE == Ref.TYPE AND
       numeric.STATUS == Ref.STATUS ,  joinType='inner')
    
  • Chogg
    Chogg about 6 years
    When you say 'be careful about operator precedence', what do you mean? Do you mean I should put parentheses in the right place to AND the correct tables together?
  • Devarshi Mandal
    Devarshi Mandal almost 5 years
    what if I do an outer join and like to keep only a single occurrence of the key
  • seth127
    seth127 over 4 years
    This is probably my least favorite pyspark error: Reference 'x1' is ambiguous, could be: x1#50L, x1#57L. I don't understand why it lets you do something like df = df1.join(df2, df1.x1 == df2.x1) and then errors as soon as you try to do almost anything with the resulting df. That's just a minor rant, but is there any reason why you'd ever want the resulting df with duplicated names?
  • Brendan
    Brendan over 3 years
    @Chogg, what he means is that if you're not careful with parentheses, the phrase df1.x1 == df2.x1 & df1.x2 == df2.x2, (parentheses removed) would be evaluated by the Python interpreter as df1.x1 == (df2.x1 & df1.x2) == df2.x2, which would potentially throw a confusing and non-descriptive error.
  • Dominik
    Dominik almost 3 years
    Welcome to StackOverflow. Can you maybe explain your code a bit more? Why is it structured like this? How does it work? etc.
  • Thavas Antonio
    Thavas Antonio almost 3 years
    Answer's are great. But for best practices, please provide an explanation. You only posting code makes the OP and future commers copy and paste your answer without understanding the logic behind the answer. Please provide an answer with some explanation. Thank You!