PySpark create new column with mapping from a dict
Solution 1
Inefficient solution with UDF (version independent):
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
def translate(mapping):
def translate_(col):
return mapping.get(col)
return udf(translate_, StringType())
df = sc.parallelize([('DS', ), ('G', ), ('INVALID', )]).toDF(['key'])
mapping = {
'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S',
'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
df.withColumn("value", translate(mapping)("key"))
with the result:
+-------+-----+
| key|value|
+-------+-----+
| DS| S|
| G| NS|
|INVALID| null|
+-------+-----+
Much more efficient (Spark >= 2.0, Spark < 3.0) is to create a MapType
literal:
from pyspark.sql.functions import col, create_map, lit
from itertools import chain
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])
df.withColumn("value", mapping_expr.getItem(col("key")))
with the same result:
+-------+-----+
| key|value|
+-------+-----+
| DS| S|
| G| NS|
|INVALID| null|
+-------+-----+
but more efficient execution plan:
== Physical Plan ==
*Project [key#15, keys: [B,DNS,DS,F,E,H,C,G,A], values: [S,S,S,NS,NS,NS,S,NS,S][key#15] AS value#53]
+- Scan ExistingRDD[key#15]
compared to UDF version:
== Physical Plan ==
*Project [key#15, pythonUDF0#61 AS value#57]
+- BatchEvalPython [translate_(key#15)], [key#15, pythonUDF0#61]
+- Scan ExistingRDD[key#15]
In Spark >= 3.0 getItem
should be replaced with __getitem__
([]
), i.e:
df.withColumn("value", mapping_expr[col("key")]).show()
Solution 2
Sounds like the simplest solution would be to use the replace function: http://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.replace
mapping= {
'A': '1',
'B': '2'
}
df2 = df.replace(to_replace=mapping, subset=['yourColName'])
Solution 3
If you want to create a map col from a nested dictionary you can use this:
def create_map(d,):
if type(d) != dict:
return F.lit(d)
level_map = []
for k in d:
level_map.append(F.lit(k))
level_map.append(create_map(d[k]))
return F.create_map(level_map)
d = {'a': 1, 'b': {'c': 2, 'd': 'blah'}}
print(create_map(d)) # <- Column<b'map(a, 1, b, map(c, 2, d, blah))'>
Related videos on Youtube
ad_s
Updated on March 18, 2022Comments
-
ad_s about 2 years
Using Spark 1.6, I have a Spark
DataFrame column
(named let's saycol1
) with values A, B, C, DS, DNS, E, F, G and H and I want to create a new column (saycol2
) with the values from thedict
here below, how do I map this? (so f.i. 'A' needs to be mapped to 'S' etc..)dict = {'A': 'S', 'B': 'S', 'C': 'S', 'DS': 'S', 'DNS': 'S', 'E': 'NS', 'F': 'NS', 'G': 'NS', 'H': 'NS'}
-
Shaido almost 5 yearsThe problem here is that this will not create a new column, it will replace the values in the original one.
-
SummerEla over 4 yearsCouldn't you just first copy the old values into a new column first, then use this function?
-
Andrew Hummel about 4 yearszero323 - Having trouble understanding this method.
withColumn
takes acolName
string as first arg, but you're passing a map value instead of key?df.withColumn("value", mapping_expr.getItem(col("key")))
Also result fromcreate_map
looks like this and calling getItem() like above is not working for me:Column<b'map(key_a, val_a, key_b, val_b)'>
Any thoughts? -
Kieran Cooney almost 4 yearsReplace also requires the new values to be the same type as the original column.
-
Markus over 3 yearsI like the shortness of this solution. If you want an extra column just copy the column using
.withColumn("newColumn", "column_to_copy")
or so - The example just provides the minimum code you need to know to do this yourselfs :) Sometimes I think comments on SO are just used to be pedantic.. -
LePuppy over 3 yearsThank you so much for this solution ! It works like a charm. Also if you wish to do the same behaviour with a list instead of the fict :
from pyspark.sql.functions import array
For example:values = [1, 2, 3, 4, 5]
indexing_expr = array(*[lit(x) for x in values]) # The * is important
df.withColumn("value", indexing_expr[col("index")])
-
safex over 2 yearsif someone could benchmark this solution against the accepted one, that would be very helpful. I implemented the
replace
but I does slow down my code considerably which puzzles me.