concat_ws removes null string from output in spark data frame
Solution 1
concat_ws
does remove null
columns in the concatenation process. If you want to keep a placeholder for every null
in the concatenated result, one approach would be to create a Map
of type-dependent colName -> nullValue
for na.fill()
to transform the dataframe before the concatenation, as shown below:
val df = Seq(
(new Integer(1), "a"),
(new Integer(2), null),
(null, "c")
).toDF("col1", "col2")
df.withColumn("concat", concat_ws("|", df.columns.map(col): _*)).
show
// +----+----+------+
// |col1|col2|concat|
// +----+----+------+
// | 1| a| 1|a|
// | 2|null| 2|
// |null| c| c|
// +----+----+------+
val naMap = df.dtypes.map( t => t._2 match {
case "StringType" => (t._1, "(n/a)")
case "IntegerType" => (t._1, 0)
case "LongType" => (t._1, 0L)
// cases for other types ...
} ).toMap
// naMap: scala.collection.immutable.Map[String,Any] =
// Map(col1 -> 0, col2 -> (n/a))
df.na.fill(naMap).
withColumn("concat", concat_ws("|", df.columns.map(col): _*)).
show
// +----+-----+-------+
// |col1| col2| concat|
// +----+-----+-------+
// | 1| a| 1|a|
// | 2|(n/a)|2|(n/a)|
// | 0| c| 0|c|
// +----+-----+-------+
Solution 2
Since concat_ws
ignore the columns that contains null
you will have to handle them.
One solution is to create a Map
of type-dependent colName -> nullValue
for na.fill()
as proposed here, however you will have to specify all the cases.
Another approach, since you want to obtain a String
, is to use format_string
function:
// Proof of concept in Scala (I don't have the compiler to test it).
df
.withColumn(
"concat",
format_string(
(for (c <- df.columns) yield "%s").mkString("|"),
df.columns.map(col): _*
),
)
/*
Same solution tested in PySpark.
format_string(
'|'.join(['%s' for c in df.columns]),
*df.columns
)
*/
in this way you will avoid the Map
definition and it will be placed an empty string for any null
value in the dataframe columns.
Solution 3
You can also use an udf
like:
val concatUDF: UserDefinedFunction = udf((columns: Seq[String]) =>
columns.map(c => if (c == null) "" else c).reduceLeft((a, b) => s"$a:$b"))
df.withColumn("concatenated", concatUDF(array(columns.map(col): _*)))
where array
is org.apache.spark.sql.functions.array
. This won't replace the original columns and will return empty strings for null values, or whatever you want it to be replaced (if (c == null) ""
).
Also, you can extend the UDF to support multiple types.
Admin
Updated on June 24, 2022Comments
-
Admin almost 2 years
This is the Output of my data frame
val finaldf.show(false) +------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+ |DataPartition |TimeStamp |Source_organizationId|Source_sourceId|FilingDateTime |SourceTypeCode|DocumentId|Dcn |DocFormat|StatementDate |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!|| +------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+ |SelfSourcedPrivate|2017-11-02T10:23:59+00:00|4298009288 |80 |2017-09-28T23:00:00+00:00|10K |null |171105584 |ASFILED |2017-07-31T00:00:00+00:00|false |false |2017-07-31T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011835 |1000716240 |I|!| | |SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170 |364 |2017-08-08T17:00:00+00:00|10Q |null |null |null |2017-07-30T00:00:00+00:00|false |false |2017-07-30T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011836 |1000716240 |I|!| | |SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170 |365 |2017-10-10T17:00:00+00:00|10K |null |null |null |2017-09-30T00:00:00+00:00|false |false |2017-09-30T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011835 |1000716240 |I|!| | |SelfSourcedPublic |2017-11-21T12:17:49+00:00|4295904170 |365 |2017-10-10T17:00:00+00:00|10K |null |null |null |2017-09-30T00:00:00+00:00|false |false |2017-09-30T00:00:00+00:00 |1.0 |false |-300 |SS |1 |3011835 |1000716240 |I|!| |
When do
concat_ws
null
is getting removed from the row .val finaldf = diff.foldLeft(tempReorder){(temp2df, colName) => temp2df.withColumn(colName, lit("null"))} //finaldf.show(false) val headerColumn = data.columns.toSeq val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3) val finaldfWithDelimiter=finaldf.select(concat_ws("|^|",finaldf.schema.fieldNames.map(col): _*).as("concatenated")).withColumnRenamed("concatenated", header) finaldfWithDelimiter.show(false)
And I get below output
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |DataPartition|^|TimeStamp|^|Source_organizationId|^|Source_sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!|| +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |SelfSourcedPrivate|^|2017-11-02T10:23:59+00:00|^|4298009288|^|80|^|2017-09-28T23:00:00+00:00|^|10K|^|171105584|^|ASFILED|^|2017-07-31T00:00:00+00:00|^|false|^|false|^|2017-07-31T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011835|^|1000716240|^|I|!| | |SelfSourcedPublic|^|2017-11-21T12:09:23+00:00|^|4295904170|^|364|^|2017-08-08T17:00:00+00:00|^|10Q|^|2017-07-30T00:00:00+00:00|^|false|^|false|^|2017-07-30T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011836|^|1000716240|^|I|!| | |SelfSourcedPublic|^|2017-11-21T12:09:23+00:00|^|4295904170|^|365|^|2017-10-10T17:00:00+00:00|^|10K|^|2017-09-30T00:00:00+00:00|^|false|^|false|^|2017-09-30T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011835|^|1000716240|^|I|!|
In the output
DocumentId
which was null is replaced.Not able to figure out what am I missing?
-
Admin about 6 yearsSo do i have to write naMap for all of my columns ?...How can i incorporate in my solution ...Basically three two are there to avoid null First is withColumns and then fill ?Can you please edit in my code .. And null as string also be removed ?
-
Leo C about 6 yearsYou only need to set up the types of columns which might consist of null in
naMap
to specify what null should be replaced with accordingly. For instance, thenaMap
in my example covers columns of type String, Int, or Long which might have null. In your case, simply applyna.fill(naMap)
beforeselect
, like,val finaldfWithDelimiter = finaldf.na.fill(naMap).select(...)...
-
Admin about 6 years@LeoC I am sorry but still not working for me
naMap
do i have to create naMap ? -
Leo C about 6 yearsYes, you create
naMap
and applyna.fill(naMap)
tofinaldf
before performing concatenation. Note that what I suggested is just a way of null-filling without supplying specific columns. Method na.fill() has a few variants, hence you can pick whichever best suits your use case. For example you can pick one that lets you supply a specific list of columns of the same type. -
ruloweb over 3 yearsThe advantage of this approach is that the original columns are not modified, if you look at @Leo C solution, the values for col1 and col2 are being modified, and form some use cases that could be an issue.
-
ruloweb over 3 yearsThis solution modifies the original columns, take this in mind, it could be an issue for some use cases.
-
ruloweb over 3 yearsThe disadvantage is that null values will be replaced with the 'null' string.
-
Leo C over 3 years@ruloweb, yes it's meant to be a generalized solution for handling
null
across columns of a dataframe, hence my sample dataset showing columns additional to the one beingconcat
-ed. -
Matteo Guarnerio over 3 years@ruloweb no, it doesn’t place “null” but only “”. Therefore you would obtain something like: “col1||col3” if only col2 is null.
-
ruloweb over 3 yearsWell, it actually does it in my test, at least in spark 2.4.5: val df = List(("a", null, "c"), ("a", "b", "c")).toDF("val1", "val2", "val3") df.withColumn("concat", format_string((for (c <- df.columns) yield "%s").mkString("|"), df.columns.map(col): _*)) .show