concat_ws removes null string from output in spark data frame

10,440

Solution 1

concat_ws does remove null columns in the concatenation process. If you want to keep a placeholder for every null in the concatenated result, one approach would be to create a Map of type-dependent colName -> nullValue for na.fill() to transform the dataframe before the concatenation, as shown below:

val df = Seq(
  (new Integer(1), "a"),
  (new Integer(2), null),
  (null, "c")
).toDF("col1", "col2")

df.withColumn("concat", concat_ws("|", df.columns.map(col): _*)).
  show
// +----+----+------+
// |col1|col2|concat|
// +----+----+------+
// |   1|   a|   1|a|
// |   2|null|     2|
// |null|   c|     c|
// +----+----+------+

val naMap = df.dtypes.map( t => t._2 match {
  case "StringType" => (t._1, "(n/a)")
  case "IntegerType" => (t._1, 0)
  case "LongType" => (t._1, 0L)
  // cases for other types ...
} ).toMap
// naMap: scala.collection.immutable.Map[String,Any] = 
//   Map(col1 -> 0, col2 -> (n/a))

df.na.fill(naMap).
  withColumn("concat", concat_ws("|", df.columns.map(col): _*)).
  show
// +----+-----+-------+
// |col1| col2| concat|
// +----+-----+-------+
// |   1|    a|    1|a|
// |   2|(n/a)|2|(n/a)|
// |   0|    c|    0|c|
// +----+-----+-------+

Solution 2

Since concat_ws ignore the columns that contains null you will have to handle them.

One solution is to create a Map of type-dependent colName -> nullValue for na.fill() as proposed here, however you will have to specify all the cases.

Another approach, since you want to obtain a String, is to use format_string function:

// Proof of concept in Scala (I don't have the compiler to test it).
df
.withColumn(
  "concat",
  format_string(
    (for (c <- df.columns) yield "%s").mkString("|"),
    df.columns.map(col): _*
  ),
)

/*
  Same solution tested in PySpark.

  format_string(
    '|'.join(['%s' for c in df.columns]),
    *df.columns
  )
*/

in this way you will avoid the Map definition and it will be placed an empty string for any null value in the dataframe columns.

Solution 3

You can also use an udf like:

val concatUDF: UserDefinedFunction = udf((columns: Seq[String]) =>
  columns.map(c => if (c == null) "" else c).reduceLeft((a, b) => s"$a:$b"))

df.withColumn("concatenated", concatUDF(array(columns.map(col): _*)))

where array is org.apache.spark.sql.functions.array. This won't replace the original columns and will return empty strings for null values, or whatever you want it to be replaced (if (c == null) "").

Also, you can extend the UDF to support multiple types.

Share:
10,440
Admin
Author by

Admin

Updated on June 24, 2022

Comments

  • Admin
    Admin almost 2 years

    This is the Output of my data frame

    val finaldf.show(false)
    
    +------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
    |DataPartition     |TimeStamp                |Source_organizationId|Source_sourceId|FilingDateTime           |SourceTypeCode|DocumentId|Dcn       |DocFormat|StatementDate            |IsFilingDateTimeEstimated|ContainsPreliminaryData|CapitalChangeAdjustmentDate|CumulativeAdjustmentFactor|ContainsRestatement|FilingDateTimeUTCOffset|ThirdPartySourceCode|ThirdPartySourcePriority|SourceTypeId|ThirdPartySourceCodeId|FFAction|!||
    +------------------+-------------------------+---------------------+---------------+-------------------------+--------------+----------+----------+---------+-------------------------+-------------------------+-----------------------+---------------------------+--------------------------+-------------------+-----------------------+--------------------+------------------------+------------+----------------------+-----------+
    |SelfSourcedPrivate|2017-11-02T10:23:59+00:00|4298009288           |80             |2017-09-28T23:00:00+00:00|10K           |null      |171105584 |ASFILED  |2017-07-31T00:00:00+00:00|false                    |false                  |2017-07-31T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
    |SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170           |364            |2017-08-08T17:00:00+00:00|10Q           |null      |null      |null     |2017-07-30T00:00:00+00:00|false                    |false                  |2017-07-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011836     |1000716240            |I|!|       |
    |SelfSourcedPublic |2017-11-21T12:09:23+00:00|4295904170           |365            |2017-10-10T17:00:00+00:00|10K           |null      |null      |null     |2017-09-30T00:00:00+00:00|false                    |false                  |2017-09-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
    |SelfSourcedPublic |2017-11-21T12:17:49+00:00|4295904170           |365            |2017-10-10T17:00:00+00:00|10K           |null      |null      |null     |2017-09-30T00:00:00+00:00|false                    |false                  |2017-09-30T00:00:00+00:00  |1.0                       |false              |-300                   |SS                  |1                       |3011835     |1000716240            |I|!|       |
    

    When do concat_ws null is getting removed from the row .

    val finaldf = diff.foldLeft(tempReorder){(temp2df, colName) => temp2df.withColumn(colName, lit("null"))}
    //finaldf.show(false)
    
    val headerColumn = data.columns.toSeq
    val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
    
    val finaldfWithDelimiter=finaldf.select(concat_ws("|^|",finaldf.schema.fieldNames.map(col): _*).as("concatenated")).withColumnRenamed("concatenated", header)
    finaldfWithDelimiter.show(false)
    

    And I get below output

    +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |DataPartition|^|TimeStamp|^|Source_organizationId|^|Source_sourceId|^|FilingDateTime|^|SourceTypeCode|^|DocumentId|^|Dcn|^|DocFormat|^|StatementDate|^|IsFilingDateTimeEstimated|^|ContainsPreliminaryData|^|CapitalChangeAdjustmentDate|^|CumulativeAdjustmentFactor|^|ContainsRestatement|^|FilingDateTimeUTCOffset|^|ThirdPartySourceCode|^|ThirdPartySourcePriority|^|SourceTypeId|^|ThirdPartySourceCodeId|^|FFAction|!||
    +-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |SelfSourcedPrivate|^|2017-11-02T10:23:59+00:00|^|4298009288|^|80|^|2017-09-28T23:00:00+00:00|^|10K|^|171105584|^|ASFILED|^|2017-07-31T00:00:00+00:00|^|false|^|false|^|2017-07-31T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011835|^|1000716240|^|I|!|                                                                                                                                                                 |
    |SelfSourcedPublic|^|2017-11-21T12:09:23+00:00|^|4295904170|^|364|^|2017-08-08T17:00:00+00:00|^|10Q|^|2017-07-30T00:00:00+00:00|^|false|^|false|^|2017-07-30T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011836|^|1000716240|^|I|!|                                                                                                                                                                                       |
    |SelfSourcedPublic|^|2017-11-21T12:09:23+00:00|^|4295904170|^|365|^|2017-10-10T17:00:00+00:00|^|10K|^|2017-09-30T00:00:00+00:00|^|false|^|false|^|2017-09-30T00:00:00+00:00|^|1.0|^|false|^|-300|^|SS|^|1|^|3011835|^|1000716240|^|I|!|   
    

    In the output DocumentId which was null is replaced.

    Not able to figure out what am I missing?

  • Admin
    Admin about 6 years
    So do i have to write naMap for all of my columns ?...How can i incorporate in my solution ...Basically three two are there to avoid null First is withColumns and then fill ?Can you please edit in my code .. And null as string also be removed ?
  • Leo C
    Leo C about 6 years
    You only need to set up the types of columns which might consist of null in naMap to specify what null should be replaced with accordingly. For instance, the naMap in my example covers columns of type String, Int, or Long which might have null. In your case, simply apply na.fill(naMap) before select, like, val finaldfWithDelimiter = finaldf.na.fill(naMap).select(...)...
  • Admin
    Admin about 6 years
    @LeoC I am sorry but still not working for me naMap do i have to create naMap ?
  • Leo C
    Leo C about 6 years
    Yes, you create naMap and apply na.fill(naMap) to finaldf before performing concatenation. Note that what I suggested is just a way of null-filling without supplying specific columns. Method na.fill() has a few variants, hence you can pick whichever best suits your use case. For example you can pick one that lets you supply a specific list of columns of the same type.
  • ruloweb
    ruloweb over 3 years
    The advantage of this approach is that the original columns are not modified, if you look at @Leo C solution, the values for col1 and col2 are being modified, and form some use cases that could be an issue.
  • ruloweb
    ruloweb over 3 years
    This solution modifies the original columns, take this in mind, it could be an issue for some use cases.
  • ruloweb
    ruloweb over 3 years
    The disadvantage is that null values will be replaced with the 'null' string.
  • Leo C
    Leo C over 3 years
    @ruloweb, yes it's meant to be a generalized solution for handling null across columns of a dataframe, hence my sample dataset showing columns additional to the one being concat-ed.
  • Matteo Guarnerio
    Matteo Guarnerio over 3 years
    @ruloweb no, it doesn’t place “null” but only “”. Therefore you would obtain something like: “col1||col3” if only col2 is null.
  • ruloweb
    ruloweb over 3 years
    Well, it actually does it in my test, at least in spark 2.4.5: val df = List(("a", null, "c"), ("a", "b", "c")).toDF("val1", "val2", "val3") df.withColumn("concat", format_string((for (c <- df.columns) yield "%s").mkString("|"), df.columns.map(col): _*)) .show