Spark Dataframe distinguish columns with duplicated name
Solution 1
I would recommend that you change the column names for your join
.
df1.select(col("a") as "df1_a", col("f") as "df1_f")
.join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))
The resulting DataFrame
will have schema
(df1_a, df1_f, df2_a, df2_f)
Solution 2
Lets start with some data:
from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row
df1 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=125231, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])
df2 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])
There are a few ways you can approach this problem. First of all you can unambiguously reference child table columns using parent columns:
df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
You can also use table aliases:
from pyspark.sql.functions import col
df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")
df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Finally you can programmatically rename columns:
df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))
df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)
## +--------------------+
## | f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
Solution 3
There is a simpler way than writing aliases for all of the columns you are joining on by doing:
df1.join(df2,['a'])
This works if the key that you are joining on is the same in both tables.
See https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html
Solution 4
You can use def drop(col: Column)
method to drop the duplicated column,for example:
DataFrame:df1
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
DataFrame:df2
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
when I join df1 with df2, the DataFrame will be like below:
val newDf = df1.join(df2,df1("a")===df2("a"))
DataFrame:newDf
+-------+-----+-------+-----+
| a | f | a | f |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+
Now, we can use def drop(col: Column)
method to drop the duplicated column 'a' or 'f', just like as follows:
val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
Solution 5
This is how we can join two Dataframes on same column names in PySpark.
df = df1.join(df2, ['col1','col2','col3'])
If you do printSchema()
after this then you can see that duplicate columns have been removed.
Related videos on Youtube
Comments
-
resec about 2 years
So as I know in Spark Dataframe, that for multiple columns can have the same name as shown in below dataframe snapshot:
[ Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})), Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042})) ]
Above result is created by join with a dataframe to itself, you can see there are
4
columns with both twoa
andf
.The problem is is there when I try to do more calculation with the
a
column, I cant find a way to select thea
, I have trydf[0]
anddf.select('a')
, both returned me below error mesaage:AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.
Is there anyway in Spark API that I can distinguish the columns from the duplicated names again? or maybe some way to let me change the column names?
-
resec over 8 yearsThanks for your editing for showing so many ways of getting the correct column in those ambiguously cases, I do think your examples should go into the Spark programming guide. I've learned a lot!
-
Sameh Sharaf over 6 yearsYou may need to fix your answer since the quotes aren't adjusted properly between column names.
-
Glennie Helles Sindholt over 6 years@SamehSharaf I assume that you are the one down voting my answer? But the answer is in fact 100% correct - I'm simply using the scala
'
-shorthand for column selection, so there is in fact no problem with quotes. -
Jorge Leitao about 6 years@GlennieHellesSindholt, fair point. It is confusing because the answer is tagged as
python
andpyspark
. -
Matt over 5 yearsthis is the actual answer as of Spark 2+
-
mauriciojost about 5 yearsAnd for Scala: df1.join(df2, Seq("a"))
-
bogdan.rusu almost 5 yearspage was moved to: kb.databricks.com/data/…
-
Vzzarr over 4 yearssmall correction:
df2_r = **df2** .select(*(col(x).alias(x + '_df2') for x in df2.columns))
instead ofdf2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))
. For the rest, good stuff -
Bikash Gyawali about 4 yearsWhat if each dataframe contains 100+ columns and we just need to rename one column name that is the same? Surely, can't manually type in all those column names in the select clause
-
Glennie Helles Sindholt about 4 yearsIn that case you could go with
df1.withColumnRenamed("a", "df1_a")
-
Pablo Adames about 4 yearsI agree with this should be part of the Spark programming guide. Pure gold. I was able to finally untangle the source of ambiguity selecting columns by the old names before doing the join. The solution of programmatically appending suffixes to the names of the columns before doing the join all the ambiguity wnet away.
-
Topde almost 4 years@GlennieHellesSindholt would you be able to write an pyspark equivalent of this answer? please
-
Glennie Helles Sindholt almost 4 years@Dee Just have a look at the answer below from zero323.
-
prafi almost 4 yearsWould this approach work if you are doing an outer join and the two columns have some dissimilar values?
-
thebluephantom over 3 yearsYou may not want to drop if different relations with same schema.
-
Abhi over 3 years@GlennieHellesSindholt Wondering if schema change approach could solve my issue: stackoverflow.com/questions/63966039/…
-
Paul Fornia about 3 yearsGlad I kept scrolling, THIS is the much better answer. If columns have different names, then no ambiguity issue. If columns have the same name, do this. There is little reason to every need to deal with ambiguous col names with this method.
-
Sheldore about 3 years@resec : Did you understand why the renaming was needed
df1_a = df1.alias("df1_a")
and why we can't usedf1
anddf2
directly? This answer did not explain why the renaming was needed to makeselect('df1_a.f')
work -
timctran about 3 years@Sheldore It's in application to the original problem where there is one table
df
being joined with itself. Perhaps the solution would make more sense if it had writtendf.alias("df1_a")
anddf.alias("df2_a")
.