PySpark converting a column of type 'map' to multiple columns in a dataframe
Solution 1
Since keys of the MapType
are not a part of the schema you'll have to collect these first for example like this:
from pyspark.sql.functions import explode
keys = (df
.select(explode("Parameters"))
.select("key")
.distinct()
.rdd.flatMap(lambda x: x)
.collect())
When you have this all what is left is simple select:
from pyspark.sql.functions import col
exprs = [col("Parameters").getItem(k).alias(k) for k in keys]
df.select(*exprs)
Solution 2
Performant solution
One of the question constraints is to dynamically determine the column names, which is fine, but be warned that this can be really slow. Here's how you can avoid typing and write code that'll execute quickly.
cols = list(map(
lambda f: F.col("Parameters").getItem(f).alias(str(f)),
["foo", "bar", "baz"]))
df.select(cols).show()
+---+---+---+
|foo|bar|baz|
+---+---+---+
| 1| 2|aaa|
+---+---+---+
Notice that this runs a single select operation. Don't run withColumn
multiple times because that's slower.
The fast solution is only possible if you know all the map keys. You'll need to revert to the slower solution if you don't know all the unique values for the map keys.
Slower solution
The accepted answer is good. My solution is a bit more performant because it doesn't call .rdd
or flatMap()
.
import pyspark.sql.functions as F
d = [{'Parameters': {'foo': '1', 'bar': '2', 'baz': 'aaa'}}]
df = spark.createDataFrame(d)
keys_df = df.select(F.explode(F.map_keys(F.col("Parameters")))).distinct()
keys = list(map(lambda row: row[0], keys_df.collect()))
key_cols = list(map(lambda f: F.col("Parameters").getItem(f).alias(str(f)), keys))
df.select(key_cols).show()
+---+---+---+
|bar|foo|baz|
+---+---+---+
| 2| 1|aaa|
+---+---+---+
Collecting results to the driver node can be a performance bottleneck. It's good to execute this code list(map(lambda row: row[0], keys_df.collect()))
as a separate command to make sure it's not running too slowly.
Kamil Sindi
Updated on March 05, 2021Comments
-
Kamil Sindi about 3 years
Input
I have a column
Parameters
of typemap
of the form:>>> from pyspark.sql import SQLContext >>> sqlContext = SQLContext(sc) >>> d = [{'Parameters': {'foo': '1', 'bar': '2', 'baz': 'aaa'}}] >>> df = sqlContext.createDataFrame(d) >>> df.collect() [Row(Parameters={'foo': '1', 'bar': '2', 'baz': 'aaa'})]
Output
I want to reshape it in pyspark so that all the keys (
foo
,bar
, etc.) are columns, namely:[Row(foo='1', bar='2', baz='aaa')]
Using
withColumn
works:(df .withColumn('foo', df.Parameters['foo']) .withColumn('bar', df.Parameters['bar']) .withColumn('baz', df.Parameters['baz']) .drop('Parameters') ).collect()
But I need like a solution that doesn't explicitly mention the column names as I have dozens of them.
Schema
>>> df.printSchema() root |-- Parameters: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)